X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_ikev2.py;h=f0fd20553794a6bb4dd1c97009bf23da3ec6d6e1;hb=ec5c40b83acae400a8cc1a18ad897b6365774559;hp=3c5887195810b152d98467813e50aa3ccacf8680;hpb=7e6ffba672875f1070348753890d023695d73be6;p=vpp.git diff --git a/test/test_ikev2.py b/test/test_ikev2.py index 3c588719581..f0fd2055379 100644 --- a/test/test_ikev2.py +++ b/test/test_ikev2.py @@ -19,7 +19,8 @@ from scapy.layers.inet import IP, UDP, Ether from scapy.layers.inet6 import IPv6 from scapy.packet import raw, Raw from scapy.utils import long_converter -from framework import tag_fixme_vpp_workers +from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11 +from framework import is_distro_ubuntu2204, is_distro_debian11 from framework import VppTestCase, VppTestRunner from vpp_ikev2 import Profile, IDType, AuthMethod from vpp_papi import VppEnum @@ -296,9 +297,11 @@ class IKEv2SA(object): def complete_dh_data(self): self.dh_shared_secret = self.compute_secret() - def calc_child_keys(self): + def calc_child_keys(self, kex=False): prf = self.ike_prf_alg.mod() s = self.i_nonce + self.r_nonce + if kex: + s = self.dh_shared_secret + s c = self.child_sas[0] encr_key_len = self.esp_crypto_key_len @@ -625,8 +628,8 @@ class IkePeer(VppTestCase): node_name = "/err/ikev2-%s/" % version + name self.assertEqual(count, self.statistics.get_err_counter(node_name)) - def create_rekey_request(self): - sa, first_payload = self.generate_auth_payload(is_rekey=True) + def create_rekey_request(self, kex=False): + sa, first_payload = self.generate_auth_payload(is_rekey=True, kex=kex) header = ikev2.IKEv2( init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, @@ -1345,9 +1348,10 @@ class TemplateResponder(IkePeer): capture = self.pg0.get_capture(1) self.verify_sa_init(capture[0]) - def generate_auth_payload(self, last_payload=None, is_rekey=False): + def generate_auth_payload(self, last_payload=None, is_rekey=False, kex=False): tr_attr = self.sa.esp_crypto_attr() last_payload = last_payload or "Notify" + trans_nb = 4 trans = ( ikev2.IKEv2_payload_Transform( transform_type="Encryption", @@ -1366,9 +1370,20 @@ class TemplateResponder(IkePeer): ) ) + if kex: + trans_nb += 1 + trans /= ikev2.IKEv2_payload_Transform( + transform_type="GroupDesc", transform_id=self.sa.ike_dh + ) + c = self.sa.child_sas[0] props = ikev2.IKEv2_payload_Proposal( - proposal=1, proto="ESP", SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans + proposal=1, + proto="ESP", + SPIsize=4, + SPI=c.ispi, + trans_nb=trans_nb, + trans=trans, ) tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4) @@ -1389,8 +1404,18 @@ class TemplateResponder(IkePeer): if is_rekey: first_payload = "Nonce" + if kex: + head = ikev2.IKEv2_payload_Nonce( + load=self.sa.i_nonce, next_payload="KE" + ) / ikev2.IKEv2_payload_KE( + group=self.sa.ike_dh, load=self.sa.my_dh_pub_key, next_payload="SA" + ) + else: + head = ikev2.IKEv2_payload_Nonce( + load=self.sa.i_nonce, next_payload="SA" + ) plain = ( - ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce, next_payload="SA") + head / plain / ikev2.IKEv2_payload_Notify( type="REKEY_SA", @@ -2079,8 +2104,12 @@ class TestResponderDpd(TestResponderPsk): class TestResponderRekey(TestResponderPsk): """test ikev2 responder - rekey""" + WITH_KEX = False + def send_rekey_from_initiator(self): - packet = self.create_rekey_request() + if self.WITH_KEX: + self.sa.generate_dh_data() + packet = self.create_rekey_request(kex=self.WITH_KEX) self.pg0.add_stream(packet) self.pg0.enable_capture() self.pg_start() @@ -2095,11 +2124,14 @@ class TestResponderRekey(TestResponderPsk): self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load # update new responder SPI self.sa.child_sas[0].rspi = prop.SPI + if self.WITH_KEX: + self.sa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load + self.sa.complete_dh_data() + self.sa.calc_child_keys(kex=self.WITH_KEX) def test_responder(self): super(TestResponderRekey, self).test_responder() self.process_rekey_response(self.send_rekey_from_initiator()) - self.sa.calc_child_keys() self.verify_ike_sas() self.verify_ipsec_sas(is_rekey=True) self.assert_counter(1, "rekey_req", "ip4") @@ -2128,11 +2160,26 @@ class TestResponderRekeyRepeat(TestResponderRekey): else: self.fail("old IPsec SA not expired") self.process_rekey_response(self.send_rekey_from_initiator()) - self.sa.calc_child_keys() self.verify_ike_sas() self.verify_ipsec_sas(sa_count=3) +@tag_fixme_vpp_workers +class TestResponderRekeyKEX(TestResponderRekey): + """test ikev2 responder - rekey with key exchange""" + + WITH_KEX = True + + +@tag_fixme_vpp_workers +class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat): + """test ikev2 responder - rekey repeat with key exchange""" + + WITH_KEX = True + + +@tag_fixme_ubuntu2204 +@tag_fixme_debian11 class TestResponderVrf(TestResponderPsk, Ikev2Params): """test ikev2 responder - non-default table id""" @@ -2142,6 +2189,10 @@ class TestResponderVrf(TestResponderPsk, Ikev2Params): globals()["ikev2"] = _ikev2 super(IkePeer, cls).setUpClass() + if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr( + cls, "vpp" + ): + return cls.create_pg_interfaces(range(1)) cls.vapi.cli("ip table add 1") cls.vapi.cli("set interface ip table pg0 1")