ikev2: fix issue when sending multiple requests at once
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
index 91cec8e..eb2c5f9 100644 (file)
@@ -1,4 +1,5 @@
 import os
+import time
 from socket import inet_pton
 from cryptography import x509
 from cryptography.hazmat.backends import default_backend
@@ -181,7 +182,9 @@ class IKEv2SA(object):
     def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
                  spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
                  nonce=None, auth_data=None, local_ts=None, remote_ts=None,
-                 auth_method='shared-key', priv_key=None, natt=False):
+                 auth_method='shared-key', priv_key=None, natt=False,
+                 udp_encap=False):
+        self.udp_encap = udp_encap
         self.natt = natt
         if natt:
             self.sport = 4500
@@ -582,7 +585,27 @@ class IkePeer(VppTestCase):
             self.sa.generate_dh_data()
         self.vapi.cli('ikev2 set logging level 4')
         self.vapi.cli('event-lo clear')
-        self.vapi.cli('ikev2 dpd disable')
+
+    def create_rekey_request(self):
+        sa, first_payload = self.generate_auth_payload(is_rekey=True)
+        header = ikev2.IKEv2(
+                init_SPI=self.sa.ispi,
+                resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
+                flags='Initiator', exch_type='CREATE_CHILD_SA')
+
+        ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
+        return self.create_packet(self.pg0, ike_msg, self.sa.sport,
+                                  self.sa.dport, self.sa.natt, self.ip6)
+
+    def create_empty_request(self):
+        header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+                             id=self.sa.new_msg_id(), flags='Initiator',
+                             exch_type='INFORMATIONAL',
+                             next_payload='Encrypted')
+
+        msg = self.encrypt_ike_msg(header, b'', None)
+        return self.create_packet(self.pg0, msg, self.sa.sport,
+                                  self.sa.dport, self.sa.natt, self.ip6)
 
     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
                       use_ip6=False):
@@ -662,6 +685,13 @@ class IkePeer(VppTestCase):
         assert(len(res) == tlen)
         return res
 
+    def verify_udp_encap(self, ipsec_sa):
+        e = VppEnum.vl_api_ipsec_sad_flags_t
+        if self.sa.udp_encap or self.sa.natt:
+            self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
+        else:
+            self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
+
     def verify_ipsec_sas(self, is_rekey=False):
         sas = self.vapi.ipsec_sa_dump()
         if is_rekey:
@@ -671,7 +701,6 @@ class IkePeer(VppTestCase):
         else:
             sa_count = 2
         self.assertEqual(len(sas), sa_count)
-        e = VppEnum.vl_api_ipsec_sad_flags_t
         if self.sa.is_initiator:
             if is_rekey:
                 sa0 = sas[0].entry
@@ -689,6 +718,8 @@ class IkePeer(VppTestCase):
 
         c = self.sa.child_sas[0]
 
+        self.verify_udp_encap(sa0)
+        self.verify_udp_encap(sa1)
         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
@@ -1279,6 +1310,10 @@ class Ikev2Params(object):
                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
 
+        dpd_disabled = True if 'dpd_disabled' not in params else\
+            params['dpd_disabled']
+        if dpd_disabled:
+            self.vapi.cli('ikev2 dpd disable')
         self.del_sa_from_responder = False if 'del_sa_from_responder'\
             not in params else params['del_sa_from_responder']
         is_natt = 'natt' in params and params['natt'] or False
@@ -1332,13 +1367,17 @@ class Ikev2Params(object):
         if 'esp_transforms' in params:
             self.p.add_esp_transforms(params['esp_transforms'])
 
+        udp_encap = False if 'udp_encap' not in params else\
+            params['udp_encap']
+        if udp_encap:
+            self.p.set_udp_encap(True)
+
         self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
                           is_initiator=is_init,
                           id_type=self.p.local_id['id_type'], natt=is_natt,
                           priv_key=client_priv, auth_method=auth_method,
-                          auth_data=auth_data,
+                          auth_data=auth_data, udp_encap=udp_encap,
                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
-
         if is_init:
             ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
                 params['ike-crypto']
@@ -1393,6 +1432,8 @@ class TestApi(VppTestCase):
             p.set_lifetime_data(cfg['lifetime_data'])
         if 'tun_itf' in cfg:
             p.set_tunnel_interface(cfg['tun_itf'])
+        if 'natt_disabled' in cfg and cfg['natt_disabled']:
+            p.disable_natt()
         p.add_vpp_config()
         return p
 
@@ -1431,6 +1472,7 @@ class TestApi(VppTestCase):
         conf = {
             'p1': {
                 'name': 'p1',
+                'natt_disabled': True,
                 'loc_id': ('fqdn', b'vpp.home'),
                 'rem_id': ('fqdn', b'roadwarrior.example.com'),
                 'loc_ts': loc_ts4,
@@ -1534,6 +1576,9 @@ class TestApi(VppTestCase):
         self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
         self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
         self.verify_auth(ap.auth, cp['auth'])
+        natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
+        self.assertTrue(natt_dis == ap.natt_disabled)
+
         if 'lifetime_data' in cp:
             self.verify_lifetime_data(ap, cp['lifetime_data'])
         self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
@@ -1567,6 +1612,47 @@ class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
                 'integ_alg': 12}})
 
 
+class TestInitiatorRequestWindowSize(TestInitiatorPsk):
+    """ test initiator - request window size (1) """
+
+    def rekey_respond(self, req, update_child_sa_data):
+        ih = self.get_ike_header(req)
+        plain = self.sa.hmac_and_decrypt(ih)
+        sa = ikev2.IKEv2_payload_SA(plain)
+        if update_child_sa_data:
+            prop = sa[ikev2.IKEv2_payload_Proposal]
+            self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
+            self.sa.r_nonce = self.sa.i_nonce
+            self.sa.child_sas[0].ispi = prop.SPI
+            self.sa.child_sas[0].rspi = prop.SPI
+            self.sa.calc_child_keys()
+
+        header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+                             flags='Response', exch_type=36,
+                             id=ih.id, next_payload='Encrypted')
+        resp = self.encrypt_ike_msg(header, sa, 'SA')
+        packet = self.create_packet(self.pg0, resp, self.sa.sport,
+                                    self.sa.dport, self.sa.natt, self.ip6)
+        self.send_and_assert_no_replies(self.pg0, packet)
+
+    def test_initiator(self):
+        super(TestInitiatorRequestWindowSize, self).test_initiator()
+        self.pg0.enable_capture()
+        self.pg_start()
+        ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
+        self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
+        self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
+        capture = self.pg0.get_capture(2)
+
+        # reply in reverse order
+        self.rekey_respond(capture[1], True)
+        self.rekey_respond(capture[0], False)
+
+        # verify that only the second request was accepted
+        self.verify_ike_sas()
+        self.verify_ipsec_sas(is_rekey=True)
+
+
 class TestInitiatorRekey(TestInitiatorPsk):
     """ test ikev2 initiator - rekey """
 
@@ -1583,7 +1669,6 @@ class TestInitiatorRekey(TestInitiatorPsk):
         plain = self.sa.hmac_and_decrypt(ih)
         sa = ikev2.IKEv2_payload_SA(plain)
         prop = sa[ikev2.IKEv2_payload_Proposal]
-        nonce = sa[ikev2.IKEv2_payload_Nonce]
         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
         self.sa.r_nonce = self.sa.i_nonce
         # update new responder SPI
@@ -1643,19 +1728,40 @@ class TestResponderPsk(TemplateResponder, Ikev2Params):
         self.config_params()
 
 
+class TestResponderDpd(TestResponderPsk):
+    """
+    Dead peer detection test
+    """
+    def config_tc(self):
+        self.config_params({'dpd_disabled': False})
+
+    def tearDown(self):
+        pass
+
+    def test_responder(self):
+        self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
+        super(TestResponderDpd, self).test_responder()
+        self.pg0.enable_capture()
+        self.pg_start()
+        # capture empty request but don't reply
+        capture = self.pg0.get_capture(expected_count=1, timeout=5)
+        ih = self.get_ike_header(capture[0])
+        self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
+        plain = self.sa.hmac_and_decrypt(ih)
+        self.assertEqual(plain, b'')
+        # wait for SA expiration
+        time.sleep(3)
+        ike_sas = self.vapi.ikev2_sa_dump()
+        self.assertEqual(len(ike_sas), 0)
+        ipsec_sas = self.vapi.ipsec_sa_dump()
+        self.assertEqual(len(ipsec_sas), 0)
+
+
 class TestResponderRekey(TestResponderPsk):
     """ test ikev2 responder - rekey """
 
     def rekey_from_initiator(self):
-        sa, first_payload = self.generate_auth_payload(is_rekey=True)
-        header = ikev2.IKEv2(
-                init_SPI=self.sa.ispi,
-                resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
-                flags='Initiator', exch_type='CREATE_CHILD_SA')
-
-        ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
-        packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
-                                    self.sa.dport, self.sa.natt, self.ip6)
+        packet = self.create_rekey_request()
         self.pg0.add_stream(packet)
         self.pg0.enable_capture()
         self.pg_start()
@@ -1664,7 +1770,6 @@ class TestResponderRekey(TestResponderPsk):
         plain = self.sa.hmac_and_decrypt(ih)
         sa = ikev2.IKEv2_payload_SA(plain)
         prop = sa[ikev2.IKEv2_payload_Proposal]
-        nonce = sa[ikev2.IKEv2_payload_Nonce]
         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
         # update new responder SPI
         self.sa.child_sas[0].rspi = prop.SPI
@@ -1681,6 +1786,7 @@ class TestResponderRsaSign(TemplateResponder, Ikev2Params):
     """ test ikev2 responder - cert based auth """
     def config_tc(self):
         self.config_params({
+            'udp_encap': True,
             'auth': 'rsa-sig',
             'server-key': 'server-key.pem',
             'client-key': 'client-key.pem',
@@ -1734,6 +1840,27 @@ class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
                        'end_addr': '11::100'}})
 
 
+class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
+    """
+    Test for keep alive messages
+    """
+
+    def send_empty_req_from_responder(self):
+        packet = self.create_empty_request()
+        self.pg0.add_stream(packet)
+        self.pg0.enable_capture()
+        self.pg_start()
+        capture = self.pg0.get_capture(1)
+        ih = self.get_ike_header(capture[0])
+        self.assertEqual(ih.id, self.sa.msg_id)
+        plain = self.sa.hmac_and_decrypt(ih)
+        self.assertEqual(plain, b'')
+
+    def test_initiator(self):
+        super(TestInitiatorKeepaliveMsg, self).test_initiator()
+        self.send_empty_req_from_responder()
+
+
 class TestMalformedMessages(TemplateResponder, Ikev2Params):
     """ malformed packet test """