ikev2: accept key exchange on CREATE_CHILD_SA
[vpp.git] / test / test_ikev2.py
index ac77a41..ffddc79 100644 (file)
@@ -296,9 +296,11 @@ class IKEv2SA(object):
     def complete_dh_data(self):
         self.dh_shared_secret = self.compute_secret()
 
-    def calc_child_keys(self):
+    def calc_child_keys(self, kex=False):
         prf = self.ike_prf_alg.mod()
         s = self.i_nonce + self.r_nonce
+        if kex:
+            s = self.dh_shared_secret + s
         c = self.child_sas[0]
 
         encr_key_len = self.esp_crypto_key_len
@@ -625,8 +627,8 @@ class IkePeer(VppTestCase):
         node_name = "/err/ikev2-%s/" % version + name
         self.assertEqual(count, self.statistics.get_err_counter(node_name))
 
-    def create_rekey_request(self):
-        sa, first_payload = self.generate_auth_payload(is_rekey=True)
+    def create_rekey_request(self, kex=False):
+        sa, first_payload = self.generate_auth_payload(is_rekey=True, kex=kex)
         header = ikev2.IKEv2(
             init_SPI=self.sa.ispi,
             resp_SPI=self.sa.rspi,
@@ -752,14 +754,15 @@ class IkePeer(VppTestCase):
         else:
             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
 
-    def verify_ipsec_sas(self, is_rekey=False):
+    def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
         sas = self.vapi.ipsec_sa_dump()
-        if is_rekey:
-            # after rekey there is a short period of time in which old
-            # inbound SA is still present
-            sa_count = 3
-        else:
-            sa_count = 2
+        if sa_count is None:
+            if is_rekey:
+                # after rekey there is a short period of time in which old
+                # inbound SA is still present
+                sa_count = 3
+            else:
+                sa_count = 2
         self.assertEqual(len(sas), sa_count)
         if self.sa.is_initiator:
             if is_rekey:
@@ -1344,9 +1347,10 @@ class TemplateResponder(IkePeer):
         capture = self.pg0.get_capture(1)
         self.verify_sa_init(capture[0])
 
-    def generate_auth_payload(self, last_payload=None, is_rekey=False):
+    def generate_auth_payload(self, last_payload=None, is_rekey=False, kex=False):
         tr_attr = self.sa.esp_crypto_attr()
         last_payload = last_payload or "Notify"
+        trans_nb = 4
         trans = (
             ikev2.IKEv2_payload_Transform(
                 transform_type="Encryption",
@@ -1365,9 +1369,20 @@ class TemplateResponder(IkePeer):
             )
         )
 
+        if kex:
+            trans_nb += 1
+            trans /= ikev2.IKEv2_payload_Transform(
+                transform_type="GroupDesc", transform_id=self.sa.ike_dh
+            )
+
         c = self.sa.child_sas[0]
         props = ikev2.IKEv2_payload_Proposal(
-            proposal=1, proto="ESP", SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans
+            proposal=1,
+            proto="ESP",
+            SPIsize=4,
+            SPI=c.ispi,
+            trans_nb=trans_nb,
+            trans=trans,
         )
 
         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
@@ -1388,8 +1403,18 @@ class TemplateResponder(IkePeer):
 
         if is_rekey:
             first_payload = "Nonce"
+            if kex:
+                head = ikev2.IKEv2_payload_Nonce(
+                    load=self.sa.i_nonce, next_payload="KE"
+                ) / ikev2.IKEv2_payload_KE(
+                    group=self.sa.ike_dh, load=self.sa.my_dh_pub_key, next_payload="SA"
+                )
+            else:
+                head = ikev2.IKEv2_payload_Nonce(
+                    load=self.sa.i_nonce, next_payload="SA"
+                )
             plain = (
-                ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce, next_payload="SA")
+                head
                 / plain
                 / ikev2.IKEv2_payload_Notify(
                     type="REKEY_SA",
@@ -2078,12 +2103,19 @@ class TestResponderDpd(TestResponderPsk):
 class TestResponderRekey(TestResponderPsk):
     """test ikev2 responder - rekey"""
 
-    def rekey_from_initiator(self):
-        packet = self.create_rekey_request()
+    WITH_KEX = False
+
+    def send_rekey_from_initiator(self):
+        if self.WITH_KEX:
+            self.sa.generate_dh_data()
+        packet = self.create_rekey_request(kex=self.WITH_KEX)
         self.pg0.add_stream(packet)
         self.pg0.enable_capture()
         self.pg_start()
         capture = self.pg0.get_capture(1)
+        return capture
+
+    def process_rekey_response(self, capture):
         ih = self.get_ike_header(capture[0])
         plain = self.sa.hmac_and_decrypt(ih)
         sa = ikev2.IKEv2_payload_SA(plain)
@@ -2091,11 +2123,14 @@ class TestResponderRekey(TestResponderPsk):
         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
         # update new responder SPI
         self.sa.child_sas[0].rspi = prop.SPI
+        if self.WITH_KEX:
+            self.sa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
+            self.sa.complete_dh_data()
+        self.sa.calc_child_keys(kex=self.WITH_KEX)
 
     def test_responder(self):
         super(TestResponderRekey, self).test_responder()
-        self.rekey_from_initiator()
-        self.sa.calc_child_keys()
+        self.process_rekey_response(self.send_rekey_from_initiator())
         self.verify_ike_sas()
         self.verify_ipsec_sas(is_rekey=True)
         self.assert_counter(1, "rekey_req", "ip4")
@@ -2103,6 +2138,45 @@ class TestResponderRekey(TestResponderPsk):
         self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
 
 
+@tag_fixme_vpp_workers
+class TestResponderRekeyRepeat(TestResponderRekey):
+    """test ikev2 responder - rekey repeat"""
+
+    def test_responder(self):
+        super(TestResponderRekeyRepeat, self).test_responder()
+        # rekey request is not accepted until old IPsec SA is expired
+        capture = self.send_rekey_from_initiator()
+        ih = self.get_ike_header(capture[0])
+        plain = self.sa.hmac_and_decrypt(ih)
+        notify = ikev2.IKEv2_payload_Notify(plain)
+        self.assertEqual(notify.type, 43)
+        self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
+        # rekey request is accepted after old IPsec SA was expired
+        for _ in range(50):
+            if len(self.vapi.ipsec_sa_dump()) != 3:
+                break
+            time.sleep(0.2)
+        else:
+            self.fail("old IPsec SA not expired")
+        self.process_rekey_response(self.send_rekey_from_initiator())
+        self.verify_ike_sas()
+        self.verify_ipsec_sas(sa_count=3)
+
+
+@tag_fixme_vpp_workers
+class TestResponderRekeyKEX(TestResponderRekey):
+    """test ikev2 responder - rekey with key exchange"""
+
+    WITH_KEX = True
+
+
+@tag_fixme_vpp_workers
+class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat):
+    """test ikev2 responder - rekey repeat with key exchange"""
+
+    WITH_KEX = True
+
+
 class TestResponderVrf(TestResponderPsk, Ikev2Params):
     """test ikev2 responder - non-default table id"""