ikev2: do not accept rekey until old SA is deleted
[vpp.git] / test / test_ikev2.py
index ac77a41..3c58871 100644 (file)
@@ -752,14 +752,15 @@ class IkePeer(VppTestCase):
         else:
             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
 
-    def verify_ipsec_sas(self, is_rekey=False):
+    def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
         sas = self.vapi.ipsec_sa_dump()
-        if is_rekey:
-            # after rekey there is a short period of time in which old
-            # inbound SA is still present
-            sa_count = 3
-        else:
-            sa_count = 2
+        if sa_count is None:
+            if is_rekey:
+                # after rekey there is a short period of time in which old
+                # inbound SA is still present
+                sa_count = 3
+            else:
+                sa_count = 2
         self.assertEqual(len(sas), sa_count)
         if self.sa.is_initiator:
             if is_rekey:
@@ -2078,12 +2079,15 @@ class TestResponderDpd(TestResponderPsk):
 class TestResponderRekey(TestResponderPsk):
     """test ikev2 responder - rekey"""
 
-    def rekey_from_initiator(self):
+    def send_rekey_from_initiator(self):
         packet = self.create_rekey_request()
         self.pg0.add_stream(packet)
         self.pg0.enable_capture()
         self.pg_start()
         capture = self.pg0.get_capture(1)
+        return capture
+
+    def process_rekey_response(self, capture):
         ih = self.get_ike_header(capture[0])
         plain = self.sa.hmac_and_decrypt(ih)
         sa = ikev2.IKEv2_payload_SA(plain)
@@ -2094,7 +2098,7 @@ class TestResponderRekey(TestResponderPsk):
 
     def test_responder(self):
         super(TestResponderRekey, self).test_responder()
-        self.rekey_from_initiator()
+        self.process_rekey_response(self.send_rekey_from_initiator())
         self.sa.calc_child_keys()
         self.verify_ike_sas()
         self.verify_ipsec_sas(is_rekey=True)
@@ -2103,6 +2107,32 @@ class TestResponderRekey(TestResponderPsk):
         self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
 
 
+@tag_fixme_vpp_workers
+class TestResponderRekeyRepeat(TestResponderRekey):
+    """test ikev2 responder - rekey repeat"""
+
+    def test_responder(self):
+        super(TestResponderRekeyRepeat, self).test_responder()
+        # rekey request is not accepted until old IPsec SA is expired
+        capture = self.send_rekey_from_initiator()
+        ih = self.get_ike_header(capture[0])
+        plain = self.sa.hmac_and_decrypt(ih)
+        notify = ikev2.IKEv2_payload_Notify(plain)
+        self.assertEqual(notify.type, 43)
+        self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
+        # rekey request is accepted after old IPsec SA was expired
+        for _ in range(50):
+            if len(self.vapi.ipsec_sa_dump()) != 3:
+                break
+            time.sleep(0.2)
+        else:
+            self.fail("old IPsec SA not expired")
+        self.process_rekey_response(self.send_rekey_from_initiator())
+        self.sa.calc_child_keys()
+        self.verify_ike_sas()
+        self.verify_ipsec_sas(sa_count=3)
+
+
 class TestResponderVrf(TestResponderPsk, Ikev2Params):
     """test ikev2 responder - non-default table id"""