ikev2: add tests for DPD 25/29825/2
authorFilip Tehlar <ftehlar@cisco.com>
Mon, 9 Nov 2020 13:23:24 +0000 (13:23 +0000)
committerBeno�t Ganne <bganne@cisco.com>
Mon, 9 Nov 2020 15:26:20 +0000 (15:26 +0000)
Type: test

Change-Id: I9c1129a8596344551f3f8f2e029846d22511482e
Signed-off-by: Filip Tehlar <ftehlar@cisco.com>
src/plugins/ikev2/test/test_ikev2.py

index 61dd53e..453a47e 100644 (file)
@@ -1,4 +1,5 @@
 import os
+import time
 from socket import inet_pton
 from cryptography import x509
 from cryptography.hazmat.backends import default_backend
@@ -584,7 +585,6 @@ class IkePeer(VppTestCase):
             self.sa.generate_dh_data()
         self.vapi.cli('ikev2 set logging level 4')
         self.vapi.cli('event-lo clear')
-        self.vapi.cli('ikev2 dpd disable')
 
     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
                       use_ip6=False):
@@ -1289,6 +1289,10 @@ class Ikev2Params(object):
                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
 
+        dpd_disabled = True if 'dpd_disabled' not in params else\
+            params['dpd_disabled']
+        if dpd_disabled:
+            self.vapi.cli('ikev2 dpd disable')
         self.del_sa_from_responder = False if 'del_sa_from_responder'\
             not in params else params['del_sa_from_responder']
         is_natt = 'natt' in params and params['natt'] or False
@@ -1663,6 +1667,35 @@ class TestResponderPsk(TemplateResponder, Ikev2Params):
         self.config_params()
 
 
+class TestResponderDpd(TestResponderPsk):
+    """
+    Dead peer detection test
+    """
+    def config_tc(self):
+        self.config_params({'dpd_disabled': False})
+
+    def tearDown(self):
+        pass
+
+    def test_responder(self):
+        self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
+        super(TestResponderDpd, self).test_responder()
+        self.pg0.enable_capture()
+        self.pg_start()
+        # capture empty request but don't reply
+        capture = self.pg0.get_capture(expected_count=1, timeout=5)
+        ih = self.get_ike_header(capture[0])
+        self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
+        plain = self.sa.hmac_and_decrypt(ih)
+        self.assertEqual(plain, b'')
+        # wait for SA expiration
+        time.sleep(3)
+        ike_sas = self.vapi.ikev2_sa_dump()
+        self.assertEqual(len(ike_sas), 0)
+        ipsec_sas = self.vapi.ipsec_sa_dump()
+        self.assertEqual(len(ipsec_sas), 0)
+
+
 class TestResponderRekey(TestResponderPsk):
     """ test ikev2 responder - rekey """
 
@@ -1755,6 +1788,34 @@ class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
                        'end_addr': '11::100'}})
 
 
+class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
+    """
+    Test for keep alive messages
+    """
+
+    def send_empty_req_from_responder(self):
+        header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+                             id=self.sa.new_msg_id(), flags='Initiator',
+                             exch_type='INFORMATIONAL',
+                             next_payload='Encrypted')
+
+        msg = self.encrypt_ike_msg(header, b'', None)
+        packet = self.create_packet(self.pg0, msg, self.sa.sport,
+                                    self.sa.dport, self.sa.natt, self.ip6)
+        self.pg0.add_stream(packet)
+        self.pg0.enable_capture()
+        self.pg_start()
+        capture = self.pg0.get_capture(1)
+        ih = self.get_ike_header(capture[0])
+        self.assertEqual(ih.id, self.sa.msg_id)
+        plain = self.sa.hmac_and_decrypt(ih)
+        self.assertEqual(plain, b'')
+
+    def test_initiator(self):
+        super(TestInitiatorKeepaliveMsg, self).test_initiator()
+        self.send_empty_req_from_responder()
+
+
 class TestMalformedMessages(TemplateResponder, Ikev2Params):
     """ malformed packet test """