ikev2: fix issue when sending multiple requests at once
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
index 61dd53e..eb2c5f9 100644 (file)
@@ -1,4 +1,5 @@
 import os
+import time
 from socket import inet_pton
 from cryptography import x509
 from cryptography.hazmat.backends import default_backend
@@ -584,7 +585,27 @@ class IkePeer(VppTestCase):
             self.sa.generate_dh_data()
         self.vapi.cli('ikev2 set logging level 4')
         self.vapi.cli('event-lo clear')
-        self.vapi.cli('ikev2 dpd disable')
+
+    def create_rekey_request(self):
+        sa, first_payload = self.generate_auth_payload(is_rekey=True)
+        header = ikev2.IKEv2(
+                init_SPI=self.sa.ispi,
+                resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
+                flags='Initiator', exch_type='CREATE_CHILD_SA')
+
+        ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
+        return self.create_packet(self.pg0, ike_msg, self.sa.sport,
+                                  self.sa.dport, self.sa.natt, self.ip6)
+
+    def create_empty_request(self):
+        header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+                             id=self.sa.new_msg_id(), flags='Initiator',
+                             exch_type='INFORMATIONAL',
+                             next_payload='Encrypted')
+
+        msg = self.encrypt_ike_msg(header, b'', None)
+        return self.create_packet(self.pg0, msg, self.sa.sport,
+                                  self.sa.dport, self.sa.natt, self.ip6)
 
     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
                       use_ip6=False):
@@ -1289,6 +1310,10 @@ class Ikev2Params(object):
                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
 
+        dpd_disabled = True if 'dpd_disabled' not in params else\
+            params['dpd_disabled']
+        if dpd_disabled:
+            self.vapi.cli('ikev2 dpd disable')
         self.del_sa_from_responder = False if 'del_sa_from_responder'\
             not in params else params['del_sa_from_responder']
         is_natt = 'natt' in params and params['natt'] or False
@@ -1587,6 +1612,47 @@ class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
                 'integ_alg': 12}})
 
 
+class TestInitiatorRequestWindowSize(TestInitiatorPsk):
+    """ test initiator - request window size (1) """
+
+    def rekey_respond(self, req, update_child_sa_data):
+        ih = self.get_ike_header(req)
+        plain = self.sa.hmac_and_decrypt(ih)
+        sa = ikev2.IKEv2_payload_SA(plain)
+        if update_child_sa_data:
+            prop = sa[ikev2.IKEv2_payload_Proposal]
+            self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
+            self.sa.r_nonce = self.sa.i_nonce
+            self.sa.child_sas[0].ispi = prop.SPI
+            self.sa.child_sas[0].rspi = prop.SPI
+            self.sa.calc_child_keys()
+
+        header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+                             flags='Response', exch_type=36,
+                             id=ih.id, next_payload='Encrypted')
+        resp = self.encrypt_ike_msg(header, sa, 'SA')
+        packet = self.create_packet(self.pg0, resp, self.sa.sport,
+                                    self.sa.dport, self.sa.natt, self.ip6)
+        self.send_and_assert_no_replies(self.pg0, packet)
+
+    def test_initiator(self):
+        super(TestInitiatorRequestWindowSize, self).test_initiator()
+        self.pg0.enable_capture()
+        self.pg_start()
+        ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
+        self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
+        self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
+        capture = self.pg0.get_capture(2)
+
+        # reply in reverse order
+        self.rekey_respond(capture[1], True)
+        self.rekey_respond(capture[0], False)
+
+        # verify that only the second request was accepted
+        self.verify_ike_sas()
+        self.verify_ipsec_sas(is_rekey=True)
+
+
 class TestInitiatorRekey(TestInitiatorPsk):
     """ test ikev2 initiator - rekey """
 
@@ -1603,7 +1669,6 @@ class TestInitiatorRekey(TestInitiatorPsk):
         plain = self.sa.hmac_and_decrypt(ih)
         sa = ikev2.IKEv2_payload_SA(plain)
         prop = sa[ikev2.IKEv2_payload_Proposal]
-        nonce = sa[ikev2.IKEv2_payload_Nonce]
         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
         self.sa.r_nonce = self.sa.i_nonce
         # update new responder SPI
@@ -1663,19 +1728,40 @@ class TestResponderPsk(TemplateResponder, Ikev2Params):
         self.config_params()
 
 
+class TestResponderDpd(TestResponderPsk):
+    """
+    Dead peer detection test
+    """
+    def config_tc(self):
+        self.config_params({'dpd_disabled': False})
+
+    def tearDown(self):
+        pass
+
+    def test_responder(self):
+        self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
+        super(TestResponderDpd, self).test_responder()
+        self.pg0.enable_capture()
+        self.pg_start()
+        # capture empty request but don't reply
+        capture = self.pg0.get_capture(expected_count=1, timeout=5)
+        ih = self.get_ike_header(capture[0])
+        self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
+        plain = self.sa.hmac_and_decrypt(ih)
+        self.assertEqual(plain, b'')
+        # wait for SA expiration
+        time.sleep(3)
+        ike_sas = self.vapi.ikev2_sa_dump()
+        self.assertEqual(len(ike_sas), 0)
+        ipsec_sas = self.vapi.ipsec_sa_dump()
+        self.assertEqual(len(ipsec_sas), 0)
+
+
 class TestResponderRekey(TestResponderPsk):
     """ test ikev2 responder - rekey """
 
     def rekey_from_initiator(self):
-        sa, first_payload = self.generate_auth_payload(is_rekey=True)
-        header = ikev2.IKEv2(
-                init_SPI=self.sa.ispi,
-                resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
-                flags='Initiator', exch_type='CREATE_CHILD_SA')
-
-        ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
-        packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
-                                    self.sa.dport, self.sa.natt, self.ip6)
+        packet = self.create_rekey_request()
         self.pg0.add_stream(packet)
         self.pg0.enable_capture()
         self.pg_start()
@@ -1684,7 +1770,6 @@ class TestResponderRekey(TestResponderPsk):
         plain = self.sa.hmac_and_decrypt(ih)
         sa = ikev2.IKEv2_payload_SA(plain)
         prop = sa[ikev2.IKEv2_payload_Proposal]
-        nonce = sa[ikev2.IKEv2_payload_Nonce]
         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
         # update new responder SPI
         self.sa.child_sas[0].rspi = prop.SPI
@@ -1755,6 +1840,27 @@ class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
                        'end_addr': '11::100'}})
 
 
+class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
+    """
+    Test for keep alive messages
+    """
+
+    def send_empty_req_from_responder(self):
+        packet = self.create_empty_request()
+        self.pg0.add_stream(packet)
+        self.pg0.enable_capture()
+        self.pg_start()
+        capture = self.pg0.get_capture(1)
+        ih = self.get_ike_header(capture[0])
+        self.assertEqual(ih.id, self.sa.msg_id)
+        plain = self.sa.hmac_and_decrypt(ih)
+        self.assertEqual(plain, b'')
+
+    def test_initiator(self):
+        super(TestInitiatorKeepaliveMsg, self).test_initiator()
+        self.send_empty_req_from_responder()
+
+
 class TestMalformedMessages(TemplateResponder, Ikev2Params):
     """ malformed packet test """