hs-test: add support for running vpp in gdb
[vpp.git] / test / test_ikev2.py
index 3c58871..f0fd205 100644 (file)
@@ -19,7 +19,8 @@ from scapy.layers.inet import IP, UDP, Ether
 from scapy.layers.inet6 import IPv6
 from scapy.packet import raw, Raw
 from scapy.utils import long_converter
-from framework import tag_fixme_vpp_workers
+from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11
+from framework import is_distro_ubuntu2204, is_distro_debian11
 from framework import VppTestCase, VppTestRunner
 from vpp_ikev2 import Profile, IDType, AuthMethod
 from vpp_papi import VppEnum
@@ -296,9 +297,11 @@ class IKEv2SA(object):
     def complete_dh_data(self):
         self.dh_shared_secret = self.compute_secret()
 
-    def calc_child_keys(self):
+    def calc_child_keys(self, kex=False):
         prf = self.ike_prf_alg.mod()
         s = self.i_nonce + self.r_nonce
+        if kex:
+            s = self.dh_shared_secret + s
         c = self.child_sas[0]
 
         encr_key_len = self.esp_crypto_key_len
@@ -625,8 +628,8 @@ class IkePeer(VppTestCase):
         node_name = "/err/ikev2-%s/" % version + name
         self.assertEqual(count, self.statistics.get_err_counter(node_name))
 
-    def create_rekey_request(self):
-        sa, first_payload = self.generate_auth_payload(is_rekey=True)
+    def create_rekey_request(self, kex=False):
+        sa, first_payload = self.generate_auth_payload(is_rekey=True, kex=kex)
         header = ikev2.IKEv2(
             init_SPI=self.sa.ispi,
             resp_SPI=self.sa.rspi,
@@ -1345,9 +1348,10 @@ class TemplateResponder(IkePeer):
         capture = self.pg0.get_capture(1)
         self.verify_sa_init(capture[0])
 
-    def generate_auth_payload(self, last_payload=None, is_rekey=False):
+    def generate_auth_payload(self, last_payload=None, is_rekey=False, kex=False):
         tr_attr = self.sa.esp_crypto_attr()
         last_payload = last_payload or "Notify"
+        trans_nb = 4
         trans = (
             ikev2.IKEv2_payload_Transform(
                 transform_type="Encryption",
@@ -1366,9 +1370,20 @@ class TemplateResponder(IkePeer):
             )
         )
 
+        if kex:
+            trans_nb += 1
+            trans /= ikev2.IKEv2_payload_Transform(
+                transform_type="GroupDesc", transform_id=self.sa.ike_dh
+            )
+
         c = self.sa.child_sas[0]
         props = ikev2.IKEv2_payload_Proposal(
-            proposal=1, proto="ESP", SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans
+            proposal=1,
+            proto="ESP",
+            SPIsize=4,
+            SPI=c.ispi,
+            trans_nb=trans_nb,
+            trans=trans,
         )
 
         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
@@ -1389,8 +1404,18 @@ class TemplateResponder(IkePeer):
 
         if is_rekey:
             first_payload = "Nonce"
+            if kex:
+                head = ikev2.IKEv2_payload_Nonce(
+                    load=self.sa.i_nonce, next_payload="KE"
+                ) / ikev2.IKEv2_payload_KE(
+                    group=self.sa.ike_dh, load=self.sa.my_dh_pub_key, next_payload="SA"
+                )
+            else:
+                head = ikev2.IKEv2_payload_Nonce(
+                    load=self.sa.i_nonce, next_payload="SA"
+                )
             plain = (
-                ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce, next_payload="SA")
+                head
                 / plain
                 / ikev2.IKEv2_payload_Notify(
                     type="REKEY_SA",
@@ -2079,8 +2104,12 @@ class TestResponderDpd(TestResponderPsk):
 class TestResponderRekey(TestResponderPsk):
     """test ikev2 responder - rekey"""
 
+    WITH_KEX = False
+
     def send_rekey_from_initiator(self):
-        packet = self.create_rekey_request()
+        if self.WITH_KEX:
+            self.sa.generate_dh_data()
+        packet = self.create_rekey_request(kex=self.WITH_KEX)
         self.pg0.add_stream(packet)
         self.pg0.enable_capture()
         self.pg_start()
@@ -2095,11 +2124,14 @@ class TestResponderRekey(TestResponderPsk):
         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
         # update new responder SPI
         self.sa.child_sas[0].rspi = prop.SPI
+        if self.WITH_KEX:
+            self.sa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
+            self.sa.complete_dh_data()
+        self.sa.calc_child_keys(kex=self.WITH_KEX)
 
     def test_responder(self):
         super(TestResponderRekey, self).test_responder()
         self.process_rekey_response(self.send_rekey_from_initiator())
-        self.sa.calc_child_keys()
         self.verify_ike_sas()
         self.verify_ipsec_sas(is_rekey=True)
         self.assert_counter(1, "rekey_req", "ip4")
@@ -2128,11 +2160,26 @@ class TestResponderRekeyRepeat(TestResponderRekey):
         else:
             self.fail("old IPsec SA not expired")
         self.process_rekey_response(self.send_rekey_from_initiator())
-        self.sa.calc_child_keys()
         self.verify_ike_sas()
         self.verify_ipsec_sas(sa_count=3)
 
 
+@tag_fixme_vpp_workers
+class TestResponderRekeyKEX(TestResponderRekey):
+    """test ikev2 responder - rekey with key exchange"""
+
+    WITH_KEX = True
+
+
+@tag_fixme_vpp_workers
+class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat):
+    """test ikev2 responder - rekey repeat with key exchange"""
+
+    WITH_KEX = True
+
+
+@tag_fixme_ubuntu2204
+@tag_fixme_debian11
 class TestResponderVrf(TestResponderPsk, Ikev2Params):
     """test ikev2 responder - non-default table id"""
 
@@ -2142,6 +2189,10 @@ class TestResponderVrf(TestResponderPsk, Ikev2Params):
 
         globals()["ikev2"] = _ikev2
         super(IkePeer, cls).setUpClass()
+        if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
+            cls, "vpp"
+        ):
+            return
         cls.create_pg_interfaces(range(1))
         cls.vapi.cli("ip table add 1")
         cls.vapi.cli("set interface ip table pg0 1")