X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fplugins%2Fikev2%2Ftest%2Ftest_ikev2.py;h=d065d46e8eb3883db08b20027b2724bc9a807ae7;hb=d7fc12f07;hp=f75a517f82490a7dd1b3bc313799dd68ab742b8a;hpb=ec112e5a9eb708c1ee85faf569fef6fa40178294;p=vpp.git diff --git a/src/plugins/ikev2/test/test_ikev2.py b/src/plugins/ikev2/test/test_ikev2.py index f75a517f824..d065d46e8eb 100644 --- a/src/plugins/ikev2/test/test_ikev2.py +++ b/src/plugins/ikev2/test/test_ikev2.py @@ -113,9 +113,7 @@ class CryptoAlgo(object): self.mode(nonce, icv, len(icv)), default_backend()).decryptor() decryptor.authenticate_additional_data(aad) - pt = decryptor.update(ct) + decryptor.finalize() - pad_len = pt[-1] + 1 - return pt[:-pad_len] + return decryptor.update(ct) + decryptor.finalize() def pad(self, data): pad_len = (len(data) // self.bs + 1) * self.bs - len(data) @@ -153,10 +151,28 @@ PRF_ALGOS = { hashes.SHA256, 32), } +CRYPTO_IDS = { + 12: 'AES-CBC', + 20: 'AES-GCM-16ICV', +} + +INTEG_IDS = { + 2: 'HMAC-SHA1-96', + 12: 'SHA2-256-128', + 13: 'SHA2-384-192', + 14: 'SHA2-512-256', +} + class IKEv2ChildSA(object): - def __init__(self, local_ts, remote_ts, spi=None): - self.spi = spi or os.urandom(4) + def __init__(self, local_ts, remote_ts, is_initiator): + spi = os.urandom(4) + if is_initiator: + self.ispi = spi + self.rspi = None + else: + self.rspi = spi + self.ispi = None self.local_ts = local_ts self.remote_ts = remote_ts @@ -195,7 +211,8 @@ class IKEv2SA(object): self.rspi = spi self.ispi = 8 * b'\x00' self.r_nonce = nonce - self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)] + self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, + self.is_initiator)] def new_msg_id(self): self.msg_id += 1 @@ -435,14 +452,17 @@ class IKEv2SA(object): aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2()) ct = ep.load[:-GCM_ICV_SIZE] tag = ep.load[-GCM_ICV_SIZE:] - return self.decrypt(ct, raw(ike)[:aad_len], tag) + plain = self.decrypt(ct, raw(ike)[:aad_len], tag) else: self.verify_hmac(raw(ike)) integ_trunc = self.ike_integ_alg.trunc_len # remove ICV and decrypt payload ct = ep.load[:-integ_trunc] - return self.decrypt(ct) + plain = self.decrypt(ct) + # remove padding + pad_len = plain[-1] + return plain[:-pad_len - 1] def build_ts_addr(self, ts, version): return {'starting_address_v' + version: ts['start_addr'], @@ -540,14 +560,29 @@ class IkePeer(VppTestCase): def tearDownClass(cls): super(IkePeer, cls).tearDownClass() + def tearDown(self): + super(IkePeer, self).tearDown() + if self.del_sa_from_responder: + self.initiate_del_sa_from_responder() + else: + self.initiate_del_sa_from_initiator() + r = self.vapi.ikev2_sa_dump() + self.assertEqual(len(r), 0) + sas = self.vapi.ipsec_sa_dump() + self.assertEqual(len(sas), 0) + self.p.remove_vpp_config() + self.assertIsNone(self.p.query_vpp_config()) + def setUp(self): super(IkePeer, self).setUp() self.config_tc() self.p.add_vpp_config() self.assertIsNotNone(self.p.query_vpp_config()) - self.sa.generate_dh_data() + if self.sa.is_initiator: + self.sa.generate_dh_data() self.vapi.cli('ikev2 set logging level 4') self.vapi.cli('event-lo clear') + self.vapi.cli('ikev2 dpd disable') def create_packet(self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False): @@ -580,6 +615,7 @@ class IkePeer(VppTestCase): esp = packet[ESP] ih = self.verify_and_remove_non_esp_marker(esp) self.assertEqual(ih.version, 0x20) + self.assertNotIn('Version', ih.flags) return ih def verify_and_remove_non_esp_marker(self, packet): @@ -626,16 +662,30 @@ class IkePeer(VppTestCase): assert(len(res) == tlen) return res - def verify_ipsec_sas(self): + def verify_ipsec_sas(self, is_rekey=False): sas = self.vapi.ipsec_sa_dump() - self.assertEqual(len(sas), 2) + if is_rekey: + # after rekey there is a short period of time in which old + # inbound SA is still present + sa_count = 3 + else: + sa_count = 2 + self.assertEqual(len(sas), sa_count) e = VppEnum.vl_api_ipsec_sad_flags_t if self.sa.is_initiator: - sa0 = sas[0].entry - sa1 = sas[1].entry + if is_rekey: + sa0 = sas[0].entry + sa1 = sas[2].entry + else: + sa0 = sas[0].entry + sa1 = sas[1].entry else: - sa1 = sas[0].entry - sa0 = sas[1].entry + if is_rekey: + sa0 = sas[2].entry + sa1 = sas[0].entry + else: + sa1 = sas[0].entry + sa0 = sas[1].entry c = self.sa.child_sas[0] @@ -723,6 +773,8 @@ class IkePeer(VppTestCase): self.verify_keymat(csa.keys, c, 'sk_ar') self.verify_keymat(csa.keys, c, 'sk_ei') self.verify_keymat(csa.keys, c, 'sk_er') + self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi) + self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi) tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4) tsi = tsi[0] @@ -775,8 +827,49 @@ class IkePeer(VppTestCase): class TemplateInitiator(IkePeer): """ initiator test template """ - def tearDown(self): - super(TemplateInitiator, self).tearDown() + def initiate_del_sa_from_initiator(self): + ispi = int.from_bytes(self.sa.ispi, 'little') + self.pg0.enable_capture() + self.pg_start() + self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi) + capture = self.pg0.get_capture(1) + ih = self.get_ike_header(capture[0]) + self.assertNotIn('Response', ih.flags) + self.assertIn('Initiator', ih.flags) + self.assertEqual(ih.init_SPI, self.sa.ispi) + self.assertEqual(ih.resp_SPI, self.sa.rspi) + plain = self.sa.hmac_and_decrypt(ih) + d = ikev2.IKEv2_payload_Delete(plain) + self.assertEqual(d.proto, 1) # proto=IKEv2 + header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, + flags='Response', exch_type='INFORMATIONAL', + id=ih.id, next_payload='Encrypted') + resp = self.encrypt_ike_msg(header, b'', None) + self.send_and_assert_no_replies(self.pg0, resp) + + def verify_del_sa(self, packet): + ih = self.get_ike_header(packet) + self.assertEqual(ih.id, self.sa.msg_id) + self.assertEqual(ih.exch_type, 37) # exchange informational + self.assertIn('Response', ih.flags) + self.assertIn('Initiator', ih.flags) + plain = self.sa.hmac_and_decrypt(ih) + self.assertEqual(plain, b'') + + def initiate_del_sa_from_responder(self): + header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, + exch_type='INFORMATIONAL', + id=self.sa.new_msg_id()) + del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2') + ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete') + packet = self.create_packet(self.pg0, ike_msg, + self.sa.sport, self.sa.dport, + self.sa.natt, self.ip6) + self.pg0.add_stream(packet) + self.pg0.enable_capture() + self.pg_start() + capture = self.pg0.get_capture(1) + self.verify_del_sa(capture[0]) @staticmethod def find_notify_payload(packet, notify_type): @@ -832,9 +925,23 @@ class TemplateInitiator(IkePeer): self.p.ike_transforms['dh_group']) self.verify_nat_detection(packet) + self.sa.set_ike_props( + crypto='AES-GCM-16ICV', crypto_key_len=32, + integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr') + self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32, + integ='SHA2-256-128') + self.sa.generate_dh_data() self.sa.complete_dh_data() self.sa.calc_keys() + def update_esp_transforms(self, trans, sa): + while trans: + if trans.transform_type == 1: # ecryption + sa.esp_crypto = CRYPTO_IDS[trans.transform_id] + elif trans.transform_type == 3: # integrity + sa.esp_integ = INTEG_IDS[trans.transform_id] + trans = trans.payload + def verify_sa_auth_req(self, packet): ih = self.get_ike_header(packet) self.assertEqual(ih.resp_SPI, self.sa.rspi) @@ -852,6 +959,11 @@ class TemplateInitiator(IkePeer): idr = ikev2.IKEv2_payload_IDr(idi.payload) self.assertEqual(idi.load, self.sa.i_id) self.assertEqual(idr.load, self.sa.r_id) + prop = idi[ikev2.IKEv2_payload_Proposal] + c = self.sa.child_sas[0] + c.ispi = prop.SPI + self.update_esp_transforms( + prop[ikev2.IKEv2_payload_Transform], self.sa) def send_init_response(self): tr_attr = self.sa.ike_crypto_attr() @@ -905,8 +1017,9 @@ class TemplateInitiator(IkePeer): transform_type='Extended Sequence Number', transform_id='ESN')) + c = self.sa.child_sas[0] props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP', - SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans)) + SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans)) tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4) plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr', @@ -946,22 +1059,41 @@ class TemplateInitiator(IkePeer): class TemplateResponder(IkePeer): """ responder test template """ - def tearDown(self): - super(TemplateResponder, self).tearDown() - if self.sa.is_initiator: - self.initiate_del_sa() - r = self.vapi.ikev2_sa_dump() - self.assertEqual(len(r), 0) - - self.p.remove_vpp_config() - self.assertIsNone(self.p.query_vpp_config()) + def initiate_del_sa_from_responder(self): + self.pg0.enable_capture() + self.pg_start() + self.vapi.ikev2_initiate_del_ike_sa( + ispi=int.from_bytes(self.sa.ispi, 'little')) + capture = self.pg0.get_capture(1) + ih = self.get_ike_header(capture[0]) + self.assertNotIn('Response', ih.flags) + self.assertNotIn('Initiator', ih.flags) + self.assertEqual(ih.exch_type, 37) # INFORMATIONAL + plain = self.sa.hmac_and_decrypt(ih) + d = ikev2.IKEv2_payload_Delete(plain) + self.assertEqual(d.proto, 1) # proto=IKEv2 + self.assertEqual(ih.init_SPI, self.sa.ispi) + self.assertEqual(ih.resp_SPI, self.sa.rspi) + header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, + flags='Initiator+Response', + exch_type='INFORMATIONAL', + id=ih.id, next_payload='Encrypted') + resp = self.encrypt_ike_msg(header, b'', None) + self.send_and_assert_no_replies(self.pg0, resp) def verify_del_sa(self, packet): ih = self.get_ike_header(packet) self.assertEqual(ih.id, self.sa.msg_id) self.assertEqual(ih.exch_type, 37) # exchange informational + self.assertIn('Response', ih.flags) + self.assertNotIn('Initiator', ih.flags) + self.assertEqual(ih.next_payload, 46) # Encrypted + self.assertEqual(ih.init_SPI, self.sa.ispi) + self.assertEqual(ih.resp_SPI, self.sa.rspi) + plain = self.sa.hmac_and_decrypt(ih) + self.assertEqual(plain, b'') - def initiate_del_sa(self): + def initiate_del_sa_from_initiator(self): header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, flags='Initiator', exch_type='INFORMATIONAL', id=self.sa.new_msg_id()) @@ -1028,8 +1160,9 @@ class TemplateResponder(IkePeer): capture = self.pg0.get_capture(1) self.verify_sa_init(capture[0]) - def send_sa_auth(self): + def generate_auth_payload(self, last_payload=None, is_rekey=False): tr_attr = self.sa.esp_crypto_attr() + last_payload = last_payload or 'Notify' trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption', transform_id=self.sa.esp_crypto, length=tr_attr[1], key_length=tr_attr[0]) / @@ -1042,32 +1175,45 @@ class TemplateResponder(IkePeer): transform_type='Extended Sequence Number', transform_id='ESN')) + c = self.sa.child_sas[0] props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP', - SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans)) + SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans)) tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4) - plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr', - IDtype=self.sa.id_type, load=self.sa.i_id) / - ikev2.IKEv2_payload_IDr(next_payload='AUTH', - IDtype=self.sa.id_type, load=self.sa.r_id) / - ikev2.IKEv2_payload_AUTH(next_payload='SA', + plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA', auth_type=AuthMethod.value(self.sa.auth_method), load=self.sa.auth_data) / ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) / ikev2.IKEv2_payload_TSi(next_payload='TSr', - number_of_TSs=len(tsi), - traffic_selector=tsi) / - ikev2.IKEv2_payload_TSr(next_payload='Notify', - number_of_TSs=len(tsr), - traffic_selector=tsr) / - ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')) + number_of_TSs=len(tsi), traffic_selector=tsi) / + ikev2.IKEv2_payload_TSr(next_payload=last_payload, + number_of_TSs=len(tsr), traffic_selector=tsr)) + + if is_rekey: + first_payload = 'Nonce' + plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce, + next_payload='SA') / plain / + ikev2.IKEv2_payload_Notify(type='REKEY_SA', + proto='ESP', SPI=c.ispi)) + else: + first_payload = 'IDi' + ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr', + IDtype=self.sa.id_type, load=self.sa.i_id) / + ikev2.IKEv2_payload_IDr(next_payload='AUTH', + IDtype=self.sa.id_type, load=self.sa.r_id)) + plain = ids / plain + return plain, first_payload + def send_sa_auth(self): + plain, first_payload = self.generate_auth_payload( + last_payload='Notify') + plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT') header = ikev2.IKEv2( init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(), flags='Initiator', exch_type='IKE_AUTH') - ike_msg = self.encrypt_ike_msg(header, plain, 'IDi') + ike_msg = self.encrypt_ike_msg(header, plain, first_payload) packet = self.create_packet(self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6) self.pg0.add_stream(packet) @@ -1081,7 +1227,7 @@ class TemplateResponder(IkePeer): self.assertEqual(ih.id, self.sa.msg_id) self.assertEqual(ih.exch_type, 34) - self.assertTrue('Response' in ih.flags) + self.assertIn('Response', ih.flags) self.assertEqual(ih.init_SPI, self.sa.ispi) self.assertNotEqual(ih.resp_SPI, 0) self.sa.rspi = ih.resp_SPI @@ -1103,6 +1249,10 @@ class TemplateResponder(IkePeer): self.verify_udp(udp) self.assertEqual(ike.id, self.sa.msg_id) plain = self.sa.hmac_and_decrypt(ike) + idr = ikev2.IKEv2_payload_IDr(plain) + prop = idr[ikev2.IKEv2_payload_Proposal] + self.assertEqual(prop.SPIsize, 4) + self.sa.child_sas[0].rspi = prop.SPI self.sa.calc_child_keys() def test_responder(self): @@ -1129,6 +1279,8 @@ class Ikev2Params(object): 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192, 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256} + self.del_sa_from_responder = False if 'del_sa_from_responder'\ + not in params else params['del_sa_from_responder'] is_natt = 'natt' in params and params['natt'] or False self.p = Profile(self, 'pr1') self.ip6 = False if 'ip6' not in params else params['ip6'] @@ -1187,24 +1339,25 @@ class Ikev2Params(object): auth_data=auth_data, local_ts=self.p.remote_ts, remote_ts=self.p.local_ts) - ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\ - params['ike-crypto'] - ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\ - params['ike-integ'] - ike_dh = '2048MODPgr' if 'ike-dh' not in params else\ - params['ike-dh'] - - esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\ - params['esp-crypto'] - esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\ - params['esp-integ'] - - self.sa.set_ike_props( - crypto=ike_crypto[0], crypto_key_len=ike_crypto[1], - integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh) - self.sa.set_esp_props( - crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], - integ=esp_integ) + if is_init: + ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\ + params['ike-crypto'] + ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\ + params['ike-integ'] + ike_dh = '2048MODPgr' if 'ike-dh' not in params else\ + params['ike-dh'] + + esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\ + params['esp-crypto'] + esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\ + params['esp-integ'] + + self.sa.set_ike_props( + crypto=ike_crypto[0], crypto_key_len=ike_crypto[1], + integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh) + self.sa.set_esp_props( + crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], + integ=esp_integ) class TestApi(VppTestCase): @@ -1240,6 +1393,8 @@ class TestApi(VppTestCase): p.set_lifetime_data(cfg['lifetime_data']) if 'tun_itf' in cfg: p.set_tunnel_interface(cfg['tun_itf']) + if 'natt_disabled' in cfg and cfg['natt_disabled']: + p.disable_natt() p.add_vpp_config() return p @@ -1278,6 +1433,7 @@ class TestApi(VppTestCase): conf = { 'p1': { 'name': 'p1', + 'natt_disabled': True, 'loc_id': ('fqdn', b'vpp.home'), 'rem_id': ('fqdn', b'roadwarrior.example.com'), 'loc_ts': loc_ts4, @@ -1381,6 +1537,9 @@ class TestApi(VppTestCase): self.verify_ike_transforms(ap.ike_ts, cp['ike_ts']) self.verify_esp_transforms(ap.esp_ts, cp['esp_ts']) self.verify_auth(ap.auth, cp['auth']) + natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled'] + self.assertTrue(natt_dis == ap.natt_disabled) + if 'lifetime_data' in cp: self.verify_lifetime_data(ap, cp['lifetime_data']) self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port']) @@ -1392,6 +1551,7 @@ class TestApi(VppTestCase): class TestInitiatorPsk(TemplateInitiator, Ikev2Params): """ test ikev2 initiator - pre shared key auth """ + def config_tc(self): self.config_params({ 'is_initiator': False, # seen from test case perspective @@ -1413,6 +1573,69 @@ class TestInitiatorPsk(TemplateInitiator, Ikev2Params): 'integ_alg': 12}}) +class TestInitiatorRekey(TestInitiatorPsk): + """ test ikev2 initiator - rekey """ + + def rekey_from_initiator(self): + ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little') + self.pg0.enable_capture() + self.pg_start() + self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi) + capture = self.pg0.get_capture(1) + ih = self.get_ike_header(capture[0]) + self.assertEqual(ih.exch_type, 36) # CHILD_SA + self.assertNotIn('Response', ih.flags) + self.assertIn('Initiator', ih.flags) + plain = self.sa.hmac_and_decrypt(ih) + sa = ikev2.IKEv2_payload_SA(plain) + prop = sa[ikev2.IKEv2_payload_Proposal] + nonce = sa[ikev2.IKEv2_payload_Nonce] + self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load + self.sa.r_nonce = self.sa.i_nonce + # update new responder SPI + self.sa.child_sas[0].ispi = prop.SPI + self.sa.child_sas[0].rspi = prop.SPI + self.sa.calc_child_keys() + header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, + flags='Response', exch_type=36, + id=ih.id, next_payload='Encrypted') + resp = self.encrypt_ike_msg(header, sa, 'SA') + packet = self.create_packet(self.pg0, resp, self.sa.sport, + self.sa.dport, self.sa.natt, self.ip6) + self.send_and_assert_no_replies(self.pg0, packet) + + def test_initiator(self): + super(TestInitiatorRekey, self).test_initiator() + self.rekey_from_initiator() + self.verify_ike_sas() + self.verify_ipsec_sas(is_rekey=True) + + +class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params): + """ test ikev2 initiator - delete IKE SA from responder """ + + def config_tc(self): + self.config_params({ + 'del_sa_from_responder': True, + 'is_initiator': False, # seen from test case perspective + # thus vpp is initiator + 'responder': {'sw_if_index': self.pg0.sw_if_index, + 'addr': self.pg0.remote_ip4}, + 'ike-crypto': ('AES-GCM-16ICV', 32), + 'ike-integ': 'NULL', + 'ike-dh': '3072MODPgr', + 'ike_transforms': { + 'crypto_alg': 20, # "aes-gcm-16" + 'crypto_key_size': 256, + 'dh_group': 15, # "modp-3072" + }, + 'esp_transforms': { + 'crypto_alg': 12, # "aes-cbc" + 'crypto_key_size': 256, + # "hmac-sha2-256-128" + 'integ_alg': 12}}) + + class TestResponderNATT(TemplateResponder, Ikev2Params): """ test ikev2 responder - nat traversal """ def config_tc(self): @@ -1426,6 +1649,40 @@ class TestResponderPsk(TemplateResponder, Ikev2Params): self.config_params() +class TestResponderRekey(TestResponderPsk): + """ test ikev2 responder - rekey """ + + def rekey_from_initiator(self): + sa, first_payload = self.generate_auth_payload(is_rekey=True) + header = ikev2.IKEv2( + init_SPI=self.sa.ispi, + resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(), + flags='Initiator', exch_type='CREATE_CHILD_SA') + + ike_msg = self.encrypt_ike_msg(header, sa, first_payload) + packet = self.create_packet(self.pg0, ike_msg, self.sa.sport, + self.sa.dport, self.sa.natt, self.ip6) + self.pg0.add_stream(packet) + self.pg0.enable_capture() + self.pg_start() + capture = self.pg0.get_capture(1) + ih = self.get_ike_header(capture[0]) + plain = self.sa.hmac_and_decrypt(ih) + sa = ikev2.IKEv2_payload_SA(plain) + prop = sa[ikev2.IKEv2_payload_Proposal] + nonce = sa[ikev2.IKEv2_payload_Nonce] + self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load + # update new responder SPI + self.sa.child_sas[0].rspi = prop.SPI + + def test_responder(self): + super(TestResponderRekey, self).test_responder() + self.rekey_from_initiator() + self.sa.calc_child_keys() + self.verify_ike_sas() + self.verify_ipsec_sas(is_rekey=True) + + class TestResponderRsaSign(TemplateResponder, Ikev2Params): """ test ikev2 responder - cert based auth """ def config_tc(self): @@ -1471,6 +1728,7 @@ class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params): """ def config_tc(self): self.config_params({ + 'del_sa_from_responder': True, 'ip6': True, 'natt': True, 'ike-crypto': ('AES-GCM-16ICV', 32),