X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_ikev2.py;h=be788d86b3f0765c244f18e27fbe474ee6dab12d;hb=dfd426c2c83f69e95fe8dab5b4827825230f1d7c;hp=f0fd20553794a6bb4dd1c97009bf23da3ec6d6e1;hpb=670724c51eccea6c622f047c546d15c894531ce3;p=vpp.git diff --git a/test/test_ikev2.py b/test/test_ikev2.py index f0fd2055379..be788d86b3f 100644 --- a/test/test_ikev2.py +++ b/test/test_ikev2.py @@ -19,9 +19,15 @@ from scapy.layers.inet import IP, UDP, Ether from scapy.layers.inet6 import IPv6 from scapy.packet import raw, Raw from scapy.utils import long_converter -from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11 -from framework import is_distro_ubuntu2204, is_distro_debian11 -from framework import VppTestCase, VppTestRunner +from framework import VppTestCase +from asfframework import ( + tag_fixme_vpp_workers, + tag_fixme_ubuntu2204, + tag_fixme_debian11, + is_distro_ubuntu2204, + is_distro_debian11, + VppTestRunner, +) from vpp_ikev2 import Profile, IDType, AuthMethod from vpp_papi import VppEnum @@ -355,14 +361,21 @@ class IKEv2SA(object): h.update(data) return h.finalize() - def calc_keys(self): + def calc_keys(self, sk_d=None): prf = self.ike_prf_alg.mod() - # SKEYSEED = prf(Ni | Nr, g^ir) - s = self.i_nonce + self.r_nonce - self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret) + if sk_d is None: + # SKEYSEED = prf(Ni | Nr, g^ir) + self.skeyseed = self.calc_prf( + prf, self.i_nonce + self.r_nonce, self.dh_shared_secret + ) + else: + # SKEYSEED = prf(SK_d (old), g^ir (new) | Ni | Nr) + self.skeyseed = self.calc_prf( + prf, sk_d, self.dh_shared_secret + self.i_nonce + self.r_nonce + ) # calculate S = Ni | Nr | SPIi SPIr - s = s + self.ispi + self.rspi + s = self.i_nonce + self.r_nonce + self.ispi + self.rspi prf_key_trunc = self.ike_prf_alg.trunc_len encr_key_len = self.ike_crypto_key_len @@ -579,7 +592,47 @@ class IKEv2SA(object): digest.update(data) return digest.finalize() + def clone(self, test, **kwargs): + if "spi" not in kwargs: + kwargs["spi"] = self.ispi if self.is_initiator else self.rspi + if "nonce" not in kwargs: + kwargs["nonce"] = self.i_nonce if self.is_initiator else self.r_nonce + if self.child_sas: + if "local_ts" not in kwargs: + kwargs["local_ts"] = self.child_sas[0].local_ts + if "remote_ts" not in kwargs: + kwargs["remote_ts"] = self.child_sas[0].remote_ts + sa = type(self)( + test, + is_initiator=self.is_initiator, + i_id=self.i_id, + r_id=self.r_id, + id_type=self.id_type, + auth_data=self.auth_data, + auth_method=self.auth_method, + priv_key=self.priv_key, + i_natt=self.i_natt, + r_natt=self.r_natt, + udp_encap=self.udp_encap, + **kwargs, + ) + if sa.is_initiator: + sa.set_ike_props( + crypto=self.ike_crypto, + crypto_key_len=self.ike_crypto_key_len, + integ=self.ike_integ, + prf=self.ike_prf, + dh=self.ike_dh, + ) + sa.set_esp_props( + crypto=self.esp_crypto, + crypto_key_len=self.esp_crypto_key_len, + integ=self.esp_integ, + ) + return sa + +@unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests") class IkePeer(VppTestCase): """common class for initiator and responder""" @@ -609,6 +662,8 @@ class IkePeer(VppTestCase): self.initiate_del_sa_from_initiator() r = self.vapi.ikev2_sa_dump() self.assertEqual(len(r), 0) + r = self.vapi.ikev2_sa_v2_dump() + self.assertEqual(len(r), 0) sas = self.vapi.ipsec_sa_dump() self.assertEqual(len(sas), 0) self.p.remove_vpp_config() @@ -643,6 +698,20 @@ class IkePeer(VppTestCase): self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6 ) + def create_sa_rekey_request(self, **kwargs): + sa = self.generate_sa_init_payload(**kwargs) + header = ikev2.IKEv2( + init_SPI=self.sa.ispi, + resp_SPI=self.sa.rspi, + id=self.sa.new_msg_id(), + flags="Initiator", + exch_type="CREATE_CHILD_SA", + ) + ike_msg = self.encrypt_ike_msg(header, sa, "SA") + return self.create_packet( + self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6 + ) + def create_empty_request(self): header = ikev2.IKEv2( init_SPI=self.sa.ispi, @@ -823,10 +892,89 @@ class IkePeer(VppTestCase): self.assertEqual(api_id.data_len, exp_id.data_len) self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type) - def verify_ike_sas(self): + def verify_ike_sas(self, is_rekey=False): r = self.vapi.ikev2_sa_dump() + if is_rekey: + sa_count = 2 + sa = r[1].sa + else: + sa_count = 1 + sa = r[0].sa + self.assertEqual(len(r), sa_count) + self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big")) + self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big")) + if self.ip6: + if self.sa.is_initiator: + self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6)) + self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6)) + else: + self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6)) + self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6)) + else: + if self.sa.is_initiator: + self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4)) + self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4)) + else: + self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4)) + self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4)) + self.verify_keymat(sa.keys, self.sa, "sk_d") + self.verify_keymat(sa.keys, self.sa, "sk_ai") + self.verify_keymat(sa.keys, self.sa, "sk_ar") + self.verify_keymat(sa.keys, self.sa, "sk_ei") + self.verify_keymat(sa.keys, self.sa, "sk_er") + self.verify_keymat(sa.keys, self.sa, "sk_pi") + self.verify_keymat(sa.keys, self.sa, "sk_pr") + + self.assertEqual(sa.i_id.type, self.sa.id_type) + self.assertEqual(sa.r_id.type, self.sa.id_type) + self.assertEqual(sa.i_id.data_len, len(self.sa.i_id)) + self.assertEqual(sa.r_id.data_len, len(self.idr)) + self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id) + self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr) + + n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index) + self.verify_nonce(n, self.sa.i_nonce) + n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index) + self.verify_nonce(n, self.sa.r_nonce) + + r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index) + if is_rekey: + self.assertEqual(len(r), 0) + return + + self.assertEqual(len(r), 1) + csa = r[0].child_sa + self.assertEqual(csa.sa_index, sa.sa_index) + c = self.sa.child_sas[0] + if hasattr(c, "sk_ai"): + self.verify_keymat(csa.keys, c, "sk_ai") + self.verify_keymat(csa.keys, c, "sk_ar") + self.verify_keymat(csa.keys, c, "sk_ei") + self.verify_keymat(csa.keys, c, "sk_er") + self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi) + self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi) + + tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4) + tsi = tsi[0] + tsr = tsr[0] + r = self.vapi.ikev2_traffic_selector_dump( + is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index + ) + self.assertEqual(len(r), 1) + ts = r[0].ts + self.verify_ts(r[0].ts, tsi[0], True) + + r = self.vapi.ikev2_traffic_selector_dump( + is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index + ) + self.assertEqual(len(r), 1) + self.verify_ts(r[0].ts, tsr[0], False) + + def verify_ike_sas_v2(self): + r = self.vapi.ikev2_sa_v2_dump() self.assertEqual(len(r), 1) sa = r[0].sa + self.assertEqual(self.p.profile_name, sa.profile_name) self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big")) self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big")) if self.ip6: @@ -1213,6 +1361,7 @@ class TemplateInitiator(IkePeer): self.sa.calc_child_keys() self.send_auth_response() self.verify_ike_sas() + self.verify_ike_sas_v2() class TemplateResponder(IkePeer): @@ -1274,7 +1423,9 @@ class TemplateResponder(IkePeer): capture = self.pg0.get_capture(1) self.verify_del_sa(capture[0]) - def send_sa_init_req(self): + def generate_sa_init_payload( + self, spi=None, dh_pub_key=None, nonce=None, next_payload=None + ): tr_attr = self.sa.ike_crypto_attr() trans = ( ikev2.IKEv2_payload_Transform( @@ -1294,23 +1445,36 @@ class TemplateResponder(IkePeer): ) ) + if spi is None: + pargs = {} + else: + pargs = {"SPI": spi, "SPIsize": len(spi)} props = ikev2.IKEv2_payload_Proposal( - proposal=1, proto="IKEv2", trans_nb=4, trans=trans + proposal=1, + proto="IKEv2", + trans_nb=4, + trans=trans, + **pargs, ) - next_payload = None if self.ip6 else "Notify" - - self.sa.init_req_packet = ( - ikev2.IKEv2( - init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT" - ) - / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props) + return ( + ikev2.IKEv2_payload_SA(next_payload="KE", prop=props) / ikev2.IKEv2_payload_KE( - next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key + next_payload="Nonce", + group=self.sa.ike_dh, + load=self.sa.my_dh_pub_key if dh_pub_key is None else dh_pub_key, + ) + / ikev2.IKEv2_payload_Nonce( + next_payload=next_payload, + load=self.sa.i_nonce if nonce is None else nonce, ) - / ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce) ) + def send_sa_init_req(self): + self.sa.init_req_packet = ikev2.IKEv2( + init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT" + ) / self.generate_sa_init_payload(next_payload=None if self.ip6 else "Notify") + if not self.ip6: if self.sa.i_natt: src_address = b"\x0a\x0a\x0a\x01" @@ -1507,11 +1671,17 @@ class TemplateResponder(IkePeer): self.assertEqual(1, s.n_sa_auth_req) self.assertEqual(1, s.n_sa_init_req) + r = self.vapi.ikev2_sa_v2_dump() + s = r[0].sa.stats + self.assertEqual(1, s.n_sa_auth_req) + self.assertEqual(1, s.n_sa_init_req) + def test_responder(self): self.send_sa_init_req() self.send_sa_auth() self.verify_ipsec_sas() self.verify_ike_sas() + self.verify_ike_sas_v2() self.verify_counters() @@ -1667,6 +1837,7 @@ class Ikev2Params(object): ) +@unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests") class TestApi(VppTestCase): """Test IKEV2 API""" @@ -1971,6 +2142,7 @@ class TestInitiatorRequestWindowSize(TestInitiatorPsk): # verify that only the second request was accepted self.verify_ike_sas() + self.verify_ike_sas_v2() self.verify_ipsec_sas(is_rekey=True) @@ -2015,6 +2187,7 @@ class TestInitiatorRekey(TestInitiatorPsk): super(TestInitiatorRekey, self).test_initiator() self.rekey_from_initiator() self.verify_ike_sas() + self.verify_ike_sas_v2() self.verify_ipsec_sas(is_rekey=True) @@ -2096,6 +2269,8 @@ class TestResponderDpd(TestResponderPsk): time.sleep(3) ike_sas = self.vapi.ikev2_sa_dump() self.assertEqual(len(ike_sas), 0) + ike_sas = self.vapi.ikev2_sa_v2_dump() + self.assertEqual(len(ike_sas), 0) ipsec_sas = self.vapi.ipsec_sa_dump() self.assertEqual(len(ipsec_sas), 0) @@ -2133,10 +2308,13 @@ class TestResponderRekey(TestResponderPsk): super(TestResponderRekey, self).test_responder() self.process_rekey_response(self.send_rekey_from_initiator()) self.verify_ike_sas() + self.verify_ike_sas_v2() self.verify_ipsec_sas(is_rekey=True) self.assert_counter(1, "rekey_req", "ip4") r = self.vapi.ikev2_sa_dump() self.assertEqual(r[0].sa.stats.n_rekey_req, 1) + r = self.vapi.ikev2_sa_v2_dump() + self.assertEqual(r[0].sa.stats.n_rekey_req, 1) @tag_fixme_vpp_workers @@ -2161,6 +2339,7 @@ class TestResponderRekeyRepeat(TestResponderRekey): self.fail("old IPsec SA not expired") self.process_rekey_response(self.send_rekey_from_initiator()) self.verify_ike_sas() + self.verify_ike_sas_v2() self.verify_ipsec_sas(sa_count=3) @@ -2178,6 +2357,50 @@ class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat): WITH_KEX = True +@tag_fixme_vpp_workers +class TestResponderRekeySA(TestResponderPsk): + """test ikev2 responder - rekey IKE SA""" + + def send_rekey_from_initiator(self, newsa): + packet = self.create_sa_rekey_request( + spi=newsa.ispi, + dh_pub_key=newsa.my_dh_pub_key, + nonce=newsa.i_nonce, + ) + self.pg0.add_stream(packet) + self.pg0.enable_capture() + self.pg_start() + capture = self.pg0.get_capture(1) + return capture + + def process_rekey_response(self, newsa, capture): + ih = self.get_ike_header(capture[0]) + plain = self.sa.hmac_and_decrypt(ih) + sa = ikev2.IKEv2_payload_SA(plain) + prop = sa[ikev2.IKEv2_payload_Proposal] + newsa.rspi = prop.SPI + newsa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load + newsa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load + newsa.complete_dh_data() + newsa.calc_keys(sk_d=self.sa.sk_d) + newsa.child_sas = self.sa.child_sas + self.sa.child_sas = [] + + def test_responder(self): + super(TestResponderRekeySA, self).test_responder() + newsa = self.sa.clone(self, spi=os.urandom(8)) + newsa.generate_dh_data() + capture = self.send_rekey_from_initiator(newsa) + self.process_rekey_response(newsa, capture) + self.verify_ike_sas(is_rekey=True) + self.assert_counter(1, "rekey_req", "ip4") + r = self.vapi.ikev2_sa_dump() + self.assertEqual(r[1].sa.stats.n_rekey_req, 1) + self.initiate_del_sa_from_initiator() + self.sa = newsa + self.verify_ike_sas() + + @tag_fixme_ubuntu2204 @tag_fixme_debian11 class TestResponderVrf(TestResponderPsk, Ikev2Params): @@ -2320,6 +2543,8 @@ class TestInitiatorKeepaliveMsg(TestInitiatorPsk): self.assert_counter(1, "keepalive", "ip4") r = self.vapi.ikev2_sa_dump() self.assertEqual(1, r[0].sa.stats.n_keepalives) + r = self.vapi.ikev2_sa_v2_dump() + self.assertEqual(1, r[0].sa.stats.n_keepalives) def test_initiator(self): super(TestInitiatorKeepaliveMsg, self).test_initiator()