ikev2: support responder hostname
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
index d065d46..558e8a0 100644 (file)
@@ -1,4 +1,5 @@
 import os
+import time
 from socket import inet_pton
 from cryptography import x509
 from cryptography.hazmat.backends import default_backend
@@ -11,11 +12,13 @@ from cryptography.hazmat.primitives.ciphers import (
     modes,
 )
 from ipaddress import IPv4Address, IPv6Address, ip_address
+import unittest
 from scapy.layers.ipsec import ESP
 from scapy.layers.inet import IP, UDP, Ether
 from scapy.layers.inet6 import IPv6
 from scapy.packet import raw, Raw
 from scapy.utils import long_converter
+from framework import tag_fixme_vpp_workers
 from framework import VppTestCase, VppTestRunner
 from vpp_ikev2 import Profile, IDType, AuthMethod
 from vpp_papi import VppEnum
@@ -181,9 +184,12 @@ class IKEv2SA(object):
     def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
                  spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
                  nonce=None, auth_data=None, local_ts=None, remote_ts=None,
-                 auth_method='shared-key', priv_key=None, natt=False):
-        self.natt = natt
-        if natt:
+                 auth_method='shared-key', priv_key=None, i_natt=False,
+                 r_natt=False, udp_encap=False):
+        self.udp_encap = udp_encap
+        self.i_natt = i_natt
+        self.r_natt = r_natt
+        if i_natt or r_natt:
             self.sport = 4500
             self.dport = 4500
         else:
@@ -230,6 +236,10 @@ class IKEv2SA(object):
             return self.r_dh_data
         return self.i_dh_data
 
+    @property
+    def natt(self):
+        return self.i_natt or self.r_natt
+
     def compute_secret(self):
         priv = self.dh_private_key
         peer = self.peer_dh_pub_key
@@ -582,7 +592,31 @@ class IkePeer(VppTestCase):
             self.sa.generate_dh_data()
         self.vapi.cli('ikev2 set logging level 4')
         self.vapi.cli('event-lo clear')
-        self.vapi.cli('ikev2 dpd disable')
+
+    def assert_counter(self, count, name, version='ip4'):
+        node_name = '/err/ikev2-%s/' % version + name
+        self.assertEqual(count, self.statistics.get_err_counter(node_name))
+
+    def create_rekey_request(self):
+        sa, first_payload = self.generate_auth_payload(is_rekey=True)
+        header = ikev2.IKEv2(
+                init_SPI=self.sa.ispi,
+                resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
+                flags='Initiator', exch_type='CREATE_CHILD_SA')
+
+        ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
+        return self.create_packet(self.pg0, ike_msg, self.sa.sport,
+                                  self.sa.dport, self.sa.natt, self.ip6)
+
+    def create_empty_request(self):
+        header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+                             id=self.sa.new_msg_id(), flags='Initiator',
+                             exch_type='INFORMATIONAL',
+                             next_payload='Encrypted')
+
+        msg = self.encrypt_ike_msg(header, b'', None)
+        return self.create_packet(self.pg0, msg, self.sa.sport,
+                                  self.sa.dport, self.sa.natt, self.ip6)
 
     def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
                       use_ip6=False):
@@ -609,6 +643,7 @@ class IkePeer(VppTestCase):
     def get_ike_header(self, packet):
         try:
             ih = packet[ikev2.IKEv2]
+            ih = self.verify_and_remove_non_esp_marker(ih)
         except IndexError as e:
             # this is a workaround for getting IKEv2 layer as both ikev2 and
             # ipsec register for port 4500
@@ -662,6 +697,13 @@ class IkePeer(VppTestCase):
         assert(len(res) == tlen)
         return res
 
+    def verify_udp_encap(self, ipsec_sa):
+        e = VppEnum.vl_api_ipsec_sad_flags_t
+        if self.sa.udp_encap or self.sa.natt:
+            self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
+        else:
+            self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
+
     def verify_ipsec_sas(self, is_rekey=False):
         sas = self.vapi.ipsec_sa_dump()
         if is_rekey:
@@ -671,7 +713,6 @@ class IkePeer(VppTestCase):
         else:
             sa_count = 2
         self.assertEqual(len(sas), sa_count)
-        e = VppEnum.vl_api_ipsec_sad_flags_t
         if self.sa.is_initiator:
             if is_rekey:
                 sa0 = sas[0].entry
@@ -689,6 +730,8 @@ class IkePeer(VppTestCase):
 
         c = self.sa.child_sas[0]
 
+        self.verify_udp_encap(sa0)
+        self.verify_udp_encap(sa1)
         vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
         self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
         self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
@@ -902,6 +945,8 @@ class TemplateInitiator(IkePeer):
         self.assertEqual(s.load, dst_sha)
 
     def verify_sa_init_request(self, packet):
+        udp = packet[UDP]
+        self.sa.dport = udp.sport
         ih = packet[ikev2.IKEv2]
         self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
         self.assertEqual(ih.exch_type, 34)  # SA_INIT
@@ -943,6 +988,8 @@ class TemplateInitiator(IkePeer):
             trans = trans.payload
 
     def verify_sa_auth_req(self, packet):
+        udp = packet[UDP]
+        self.sa.dport = udp.sport
         ih = self.get_ike_header(packet)
         self.assertEqual(ih.resp_SPI, self.sa.rspi)
         self.assertEqual(ih.init_SPI, self.sa.ispi)
@@ -978,6 +1025,15 @@ class TemplateInitiator(IkePeer):
                  transform_id=self.sa.ike_dh))
         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
                  trans_nb=4, trans=trans))
+
+        src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
+        if self.sa.natt:
+            dst_address = b'\x0a\x0a\x0a\x0a'
+        else:
+            dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
+        src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
+        dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
+
         self.sa.init_resp_packet = (
             ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
                         exch_type='IKE_SA_INIT', flags='Response') /
@@ -985,11 +1041,16 @@ class TemplateInitiator(IkePeer):
             ikev2.IKEv2_payload_KE(next_payload='Nonce',
                                    group=self.sa.ike_dh,
                                    load=self.sa.my_dh_pub_key) /
-            ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce))
+            ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
+                                      next_payload='Notify') /
+            ikev2.IKEv2_payload_Notify(
+                    type='NAT_DETECTION_SOURCE_IP', load=src_nat,
+                    next_payload='Notify') / ikev2.IKEv2_payload_Notify(
+                    type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
 
         ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
                                      self.sa.sport, self.sa.dport,
-                                     self.sa.natt, self.ip6)
+                                     False, self.ip6)
         self.pg_send(self.pg0, ike_msg)
         capture = self.pg0.get_capture(1)
         self.verify_sa_auth_req(capture[0])
@@ -1108,7 +1169,7 @@ class TemplateResponder(IkePeer):
         capture = self.pg0.get_capture(1)
         self.verify_del_sa(capture[0])
 
-    def send_sa_init_req(self, behind_nat=False):
+    def send_sa_init_req(self):
         tr_attr = self.sa.ike_crypto_attr()
         trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
                  transform_id=self.sa.ike_crypto, length=tr_attr[1],
@@ -1123,6 +1184,8 @@ class TemplateResponder(IkePeer):
         props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
                  trans_nb=4, trans=trans))
 
+        next_payload = None if self.ip6 else 'Notify'
+
         self.sa.init_req_packet = (
                 ikev2.IKEv2(init_SPI=self.sa.ispi,
                             flags='Initiator', exch_type='IKE_SA_INIT') /
@@ -1130,26 +1193,30 @@ class TemplateResponder(IkePeer):
                 ikev2.IKEv2_payload_KE(next_payload='Nonce',
                                        group=self.sa.ike_dh,
                                        load=self.sa.my_dh_pub_key) /
-                ikev2.IKEv2_payload_Nonce(next_payload='Notify',
+                ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
                                           load=self.sa.i_nonce))
 
-        if behind_nat:
-            src_address = b'\x0a\x0a\x0a\x01'
-        else:
-            src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
+        if not self.ip6:
+            if self.sa.i_natt:
+                src_address = b'\x0a\x0a\x0a\x01'
+            else:
+                src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
 
-        src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
-        dst_nat = self.sa.compute_nat_sha1(
-                inet_pton(socket.AF_INET, self.pg0.local_ip4),
-                self.sa.dport)
-        nat_src_detection = ikev2.IKEv2_payload_Notify(
-                type='NAT_DETECTION_SOURCE_IP', load=src_nat,
-                next_payload='Notify')
-        nat_dst_detection = ikev2.IKEv2_payload_Notify(
-                type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
-        self.sa.init_req_packet = (self.sa.init_req_packet /
-                                   nat_src_detection /
-                                   nat_dst_detection)
+            if self.sa.r_natt:
+                dst_address = b'\x0a\x0a\x0a\x0a'
+            else:
+                dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
+
+            src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
+            dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
+            nat_src_detection = ikev2.IKEv2_payload_Notify(
+                    type='NAT_DETECTION_SOURCE_IP', load=src_nat,
+                    next_payload='Notify')
+            nat_dst_detection = ikev2.IKEv2_payload_Notify(
+                    type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
+            self.sa.init_req_packet = (self.sa.init_req_packet /
+                                       nat_src_detection /
+                                       nat_dst_detection)
 
         ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
                                      self.sa.sport, self.sa.dport,
@@ -1255,11 +1322,24 @@ class TemplateResponder(IkePeer):
         self.sa.child_sas[0].rspi = prop.SPI
         self.sa.calc_child_keys()
 
+    IKE_NODE_SUFFIX = 'ip4'
+
+    def verify_counters(self):
+        self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX)
+        self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX)
+        self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX)
+
+        r = self.vapi.ikev2_sa_dump()
+        s = r[0].sa.stats
+        self.assertEqual(1, s.n_sa_auth_req)
+        self.assertEqual(1, s.n_sa_init_req)
+
     def test_responder(self):
-        self.send_sa_init_req(self.sa.natt)
+        self.send_sa_init_req()
         self.send_sa_auth()
         self.verify_ipsec_sas()
         self.verify_ike_sas()
+        self.verify_counters()
 
 
 class Ikev2Params(object):
@@ -1279,9 +1359,14 @@ class Ikev2Params(object):
                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
 
+        dpd_disabled = True if 'dpd_disabled' not in params else\
+            params['dpd_disabled']
+        if dpd_disabled:
+            self.vapi.cli('ikev2 dpd disable')
         self.del_sa_from_responder = False if 'del_sa_from_responder'\
             not in params else params['del_sa_from_responder']
-        is_natt = 'natt' in params and params['natt'] or False
+        i_natt = False if 'i_natt' not in params else params['i_natt']
+        r_natt = False if 'r_natt' not in params else params['r_natt']
         self.p = Profile(self, 'pr1')
         self.ip6 = False if 'ip6' not in params else params['ip6']
 
@@ -1332,13 +1417,32 @@ class Ikev2Params(object):
         if 'esp_transforms' in params:
             self.p.add_esp_transforms(params['esp_transforms'])
 
+        udp_encap = False if 'udp_encap' not in params else\
+            params['udp_encap']
+        if udp_encap:
+            self.p.set_udp_encap(True)
+
+        if 'responder_hostname' in params:
+            hn = params['responder_hostname']
+            self.p.add_responder_hostname(hn)
+
+            # configure static dns record
+            self.vapi.dns_name_server_add_del(
+                is_ip6=0, is_add=1,
+                server_address=IPv4Address(u'8.8.8.8').packed)
+            self.vapi.dns_enable_disable(enable=1)
+
+            cmd = "dns cache add {} {}".format(hn['hostname'],
+                                               self.pg0.remote_ip4)
+            self.vapi.cli(cmd)
+
         self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
                           is_initiator=is_init,
-                          id_type=self.p.local_id['id_type'], natt=is_natt,
+                          id_type=self.p.local_id['id_type'],
+                          i_natt=i_natt, r_natt=r_natt,
                           priv_key=client_priv, auth_method=auth_method,
-                          auth_data=auth_data,
+                          auth_data=auth_data, udp_encap=udp_encap,
                           local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
-
         if is_init:
             ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
                 params['ike-crypto']
@@ -1549,11 +1653,23 @@ class TestApi(VppTestCase):
             self.assertEqual(ap.tun_itf, 0xffffffff)
 
 
-class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
-    """ test ikev2 initiator - pre shared key auth """
+@tag_fixme_vpp_workers
+class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
+    """ test responder - responder behind NAT """
+
+    IKE_NODE_SUFFIX = 'ip4-natt'
+
+    def config_tc(self):
+        self.config_params({'r_natt': True})
+
+
+@tag_fixme_vpp_workers
+class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
+    """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
 
     def config_tc(self):
         self.config_params({
+            'i_natt': True,
             'is_initiator': False,  # seen from test case perspective
                                     # thus vpp is initiator
             'responder': {'sw_if_index': self.pg0.sw_if_index,
@@ -1573,6 +1689,74 @@ class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
                 'integ_alg': 12}})
 
 
+@tag_fixme_vpp_workers
+class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
+    """ test ikev2 initiator - pre shared key auth """
+
+    def config_tc(self):
+        self.config_params({
+            'is_initiator': False,  # seen from test case perspective
+                                    # thus vpp is initiator
+            'ike-crypto': ('AES-GCM-16ICV', 32),
+            'ike-integ': 'NULL',
+            'ike-dh': '3072MODPgr',
+            'ike_transforms': {
+                'crypto_alg': 20,  # "aes-gcm-16"
+                'crypto_key_size': 256,
+                'dh_group': 15,  # "modp-3072"
+            },
+            'esp_transforms': {
+                'crypto_alg': 12,  # "aes-cbc"
+                'crypto_key_size': 256,
+                # "hmac-sha2-256-128"
+                'integ_alg': 12},
+            'responder_hostname': {'hostname': 'vpp.responder.org',
+                                   'sw_if_index': self.pg0.sw_if_index}})
+
+
+@tag_fixme_vpp_workers
+class TestInitiatorRequestWindowSize(TestInitiatorPsk):
+    """ test initiator - request window size (1) """
+
+    def rekey_respond(self, req, update_child_sa_data):
+        ih = self.get_ike_header(req)
+        plain = self.sa.hmac_and_decrypt(ih)
+        sa = ikev2.IKEv2_payload_SA(plain)
+        if update_child_sa_data:
+            prop = sa[ikev2.IKEv2_payload_Proposal]
+            self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
+            self.sa.r_nonce = self.sa.i_nonce
+            self.sa.child_sas[0].ispi = prop.SPI
+            self.sa.child_sas[0].rspi = prop.SPI
+            self.sa.calc_child_keys()
+
+        header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+                             flags='Response', exch_type=36,
+                             id=ih.id, next_payload='Encrypted')
+        resp = self.encrypt_ike_msg(header, sa, 'SA')
+        packet = self.create_packet(self.pg0, resp, self.sa.sport,
+                                    self.sa.dport, self.sa.natt, self.ip6)
+        self.send_and_assert_no_replies(self.pg0, packet)
+
+    def test_initiator(self):
+        super(TestInitiatorRequestWindowSize, self).test_initiator()
+        self.pg0.enable_capture()
+        self.pg_start()
+        ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
+        self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
+        self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
+        capture = self.pg0.get_capture(2)
+
+        # reply in reverse order
+        self.rekey_respond(capture[1], True)
+        self.rekey_respond(capture[0], False)
+
+        # verify that only the second request was accepted
+        self.verify_ike_sas()
+        self.verify_ipsec_sas(is_rekey=True)
+
+
+@tag_fixme_vpp_workers
 class TestInitiatorRekey(TestInitiatorPsk):
     """ test ikev2 initiator - rekey """
 
@@ -1589,7 +1773,6 @@ class TestInitiatorRekey(TestInitiatorPsk):
         plain = self.sa.hmac_and_decrypt(ih)
         sa = ikev2.IKEv2_payload_SA(plain)
         prop = sa[ikev2.IKEv2_payload_Proposal]
-        nonce = sa[ikev2.IKEv2_payload_Nonce]
         self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
         self.sa.r_nonce = self.sa.i_nonce
         # update new responder SPI
@@ -1611,6 +1794,7 @@ class TestInitiatorRekey(TestInitiatorPsk):
         self.verify_ipsec_sas(is_rekey=True)
 
 
+@tag_fixme_vpp_workers
 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
     """ test ikev2 initiator - delete IKE SA from responder """
 
@@ -1636,32 +1820,60 @@ class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
                 'integ_alg': 12}})
 
 
-class TestResponderNATT(TemplateResponder, Ikev2Params):
-    """ test ikev2 responder - nat traversal """
+@tag_fixme_vpp_workers
+class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
+    """ test ikev2 responder - initiator behind NAT """
+
+    IKE_NODE_SUFFIX = 'ip4-natt'
+
     def config_tc(self):
         self.config_params(
-                {'natt': True})
+                {'i_natt': True})
 
 
+@tag_fixme_vpp_workers
 class TestResponderPsk(TemplateResponder, Ikev2Params):
     """ test ikev2 responder - pre shared key auth """
     def config_tc(self):
         self.config_params()
 
 
+@tag_fixme_vpp_workers
+class TestResponderDpd(TestResponderPsk):
+    """
+    Dead peer detection test
+    """
+    def config_tc(self):
+        self.config_params({'dpd_disabled': False})
+
+    def tearDown(self):
+        pass
+
+    def test_responder(self):
+        self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
+        super(TestResponderDpd, self).test_responder()
+        self.pg0.enable_capture()
+        self.pg_start()
+        # capture empty request but don't reply
+        capture = self.pg0.get_capture(expected_count=1, timeout=5)
+        ih = self.get_ike_header(capture[0])
+        self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
+        plain = self.sa.hmac_and_decrypt(ih)
+        self.assertEqual(plain, b'')
+        # wait for SA expiration
+        time.sleep(3)
+        ike_sas = self.vapi.ikev2_sa_dump()
+        self.assertEqual(len(ike_sas), 0)
+        ipsec_sas = self.vapi.ipsec_sa_dump()
+        self.assertEqual(len(ipsec_sas), 0)
+
+
+@tag_fixme_vpp_workers
 class TestResponderRekey(TestResponderPsk):
     """ test ikev2 responder - rekey """
 
     def rekey_from_initiator(self):
-        sa, first_payload = self.generate_auth_payload(is_rekey=True)
-        header = ikev2.IKEv2(
-                init_SPI=self.sa.ispi,
-                resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
-                flags='Initiator', exch_type='CREATE_CHILD_SA')
-
-        ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
-        packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
-                                    self.sa.dport, self.sa.natt, self.ip6)
+        packet = self.create_rekey_request()
         self.pg0.add_stream(packet)
         self.pg0.enable_capture()
         self.pg_start()
@@ -1670,7 +1882,6 @@ class TestResponderRekey(TestResponderPsk):
         plain = self.sa.hmac_and_decrypt(ih)
         sa = ikev2.IKEv2_payload_SA(plain)
         prop = sa[ikev2.IKEv2_payload_Proposal]
-        nonce = sa[ikev2.IKEv2_payload_Nonce]
         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
         # update new responder SPI
         self.sa.child_sas[0].rspi = prop.SPI
@@ -1681,12 +1892,50 @@ class TestResponderRekey(TestResponderPsk):
         self.sa.calc_child_keys()
         self.verify_ike_sas()
         self.verify_ipsec_sas(is_rekey=True)
+        self.assert_counter(1, 'rekey_req', 'ip4')
+        r = self.vapi.ikev2_sa_dump()
+        self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
+
+
+class TestResponderVrf(TestResponderPsk, Ikev2Params):
+    """ test ikev2 responder - non-default table id """
+
+    @classmethod
+    def setUpClass(cls):
+        import scapy.contrib.ikev2 as _ikev2
+        globals()['ikev2'] = _ikev2
+        super(IkePeer, cls).setUpClass()
+        cls.create_pg_interfaces(range(1))
+        cls.vapi.cli("ip table add 1")
+        cls.vapi.cli("set interface ip table pg0 1")
+        for i in cls.pg_interfaces:
+            i.admin_up()
+            i.config_ip4()
+            i.resolve_arp()
+            i.config_ip6()
+            i.resolve_ndp()
+
+    def config_tc(self):
+        self.config_params({'dpd_disabled': False})
+
+    def test_responder(self):
+        self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
+        super(TestResponderVrf, self).test_responder()
+        self.pg0.enable_capture()
+        self.pg_start()
+        capture = self.pg0.get_capture(expected_count=1, timeout=5)
+        ih = self.get_ike_header(capture[0])
+        self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
+        plain = self.sa.hmac_and_decrypt(ih)
+        self.assertEqual(plain, b'')
 
 
+@tag_fixme_vpp_workers
 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
     """ test ikev2 responder - cert based auth """
     def config_tc(self):
         self.config_params({
+            'udp_encap': True,
             'auth': 'rsa-sig',
             'server-key': 'server-key.pem',
             'client-key': 'client-key.pem',
@@ -1694,6 +1943,7 @@ class TestResponderRsaSign(TemplateResponder, Ikev2Params):
             'server-cert': 'server-cert.pem'})
 
 
+@tag_fixme_vpp_workers
 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
         (TemplateResponder, Ikev2Params):
     """
@@ -1708,8 +1958,10 @@ class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
             'ike-dh': '2048MODPgr'})
 
 
+@tag_fixme_vpp_workers
 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
         (TemplateResponder, Ikev2Params):
+
     """
     IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
     """
@@ -1722,10 +1974,14 @@ class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
             'ike-dh': '3072MODPgr'})
 
 
+@tag_fixme_vpp_workers
 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
     """
     IKE:AES_GCM_16_256
     """
+
+    IKE_NODE_SUFFIX = 'ip6'
+
     def config_tc(self):
         self.config_params({
             'del_sa_from_responder': True,
@@ -1740,6 +1996,31 @@ class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
                        'end_addr': '11::100'}})
 
 
+@tag_fixme_vpp_workers
+class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
+    """
+    Test for keep alive messages
+    """
+
+    def send_empty_req_from_responder(self):
+        packet = self.create_empty_request()
+        self.pg0.add_stream(packet)
+        self.pg0.enable_capture()
+        self.pg_start()
+        capture = self.pg0.get_capture(1)
+        ih = self.get_ike_header(capture[0])
+        self.assertEqual(ih.id, self.sa.msg_id)
+        plain = self.sa.hmac_and_decrypt(ih)
+        self.assertEqual(plain, b'')
+        self.assert_counter(1, 'keepalive', 'ip4')
+        r = self.vapi.ikev2_sa_dump()
+        self.assertEqual(1, r[0].sa.stats.n_keepalives)
+
+    def test_initiator(self):
+        super(TestInitiatorKeepaliveMsg, self).test_initiator()
+        self.send_empty_req_from_responder()
+
+
 class TestMalformedMessages(TemplateResponder, Ikev2Params):
     """ malformed packet test """
 
@@ -1749,10 +2030,6 @@ class TestMalformedMessages(TemplateResponder, Ikev2Params):
     def config_tc(self):
         self.config_params()
 
-    def assert_counter(self, count, name, version='ip4'):
-        node_name = '/err/ikev2-%s/' % version + name
-        self.assertEqual(count, self.statistics.get_err_counter(node_name))
-
     def create_ike_init_msg(self, length=None, payload=None):
         msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
                           flags='Initiator', exch_type='IKE_SA_INIT')
@@ -1764,13 +2041,13 @@ class TestMalformedMessages(TemplateResponder, Ikev2Params):
     def verify_bad_packet_length(self):
         ike_msg = self.create_ike_init_msg(length=0xdead)
         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
-        self.assert_counter(self.pkt_count, 'Bad packet length')
+        self.assert_counter(self.pkt_count, 'bad_length')
 
     def verify_bad_sa_payload_length(self):
         p = ikev2.IKEv2_payload_SA(length=0xdead)
         ike_msg = self.create_ike_init_msg(payload=p)
         self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
-        self.assert_counter(self.pkt_count, 'Malformed packet')
+        self.assert_counter(self.pkt_count, 'malformed_packet')
 
     def test_responder(self):
         self.pkt_count = 254