tests: Use errno value rather than a specific int
[vpp.git] / test / test_ikev2.py
index 5b699dd..be788d8 100644 (file)
@@ -19,8 +19,15 @@ from scapy.layers.inet import IP, UDP, Ether
 from scapy.layers.inet6 import IPv6
 from scapy.packet import raw, Raw
 from scapy.utils import long_converter
-from framework import tag_fixme_vpp_workers
-from framework import VppTestCase, VppTestRunner
+from framework import VppTestCase
+from asfframework import (
+    tag_fixme_vpp_workers,
+    tag_fixme_ubuntu2204,
+    tag_fixme_debian11,
+    is_distro_ubuntu2204,
+    is_distro_debian11,
+    VppTestRunner,
+)
 from vpp_ikev2 import Profile, IDType, AuthMethod
 from vpp_papi import VppEnum
 
@@ -296,9 +303,11 @@ class IKEv2SA(object):
     def complete_dh_data(self):
         self.dh_shared_secret = self.compute_secret()
 
-    def calc_child_keys(self):
+    def calc_child_keys(self, kex=False):
         prf = self.ike_prf_alg.mod()
         s = self.i_nonce + self.r_nonce
+        if kex:
+            s = self.dh_shared_secret + s
         c = self.child_sas[0]
 
         encr_key_len = self.esp_crypto_key_len
@@ -352,14 +361,21 @@ class IKEv2SA(object):
         h.update(data)
         return h.finalize()
 
-    def calc_keys(self):
+    def calc_keys(self, sk_d=None):
         prf = self.ike_prf_alg.mod()
-        # SKEYSEED = prf(Ni | Nr, g^ir)
-        s = self.i_nonce + self.r_nonce
-        self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
+        if sk_d is None:
+            # SKEYSEED = prf(Ni | Nr, g^ir)
+            self.skeyseed = self.calc_prf(
+                prf, self.i_nonce + self.r_nonce, self.dh_shared_secret
+            )
+        else:
+            # SKEYSEED = prf(SK_d (old), g^ir (new) | Ni | Nr)
+            self.skeyseed = self.calc_prf(
+                prf, sk_d, self.dh_shared_secret + self.i_nonce + self.r_nonce
+            )
 
         # calculate S = Ni | Nr | SPIi SPIr
-        s = s + self.ispi + self.rspi
+        s = self.i_nonce + self.r_nonce + self.ispi + self.rspi
 
         prf_key_trunc = self.ike_prf_alg.trunc_len
         encr_key_len = self.ike_crypto_key_len
@@ -576,7 +592,47 @@ class IKEv2SA(object):
         digest.update(data)
         return digest.finalize()
 
+    def clone(self, test, **kwargs):
+        if "spi" not in kwargs:
+            kwargs["spi"] = self.ispi if self.is_initiator else self.rspi
+        if "nonce" not in kwargs:
+            kwargs["nonce"] = self.i_nonce if self.is_initiator else self.r_nonce
+        if self.child_sas:
+            if "local_ts" not in kwargs:
+                kwargs["local_ts"] = self.child_sas[0].local_ts
+            if "remote_ts" not in kwargs:
+                kwargs["remote_ts"] = self.child_sas[0].remote_ts
+        sa = type(self)(
+            test,
+            is_initiator=self.is_initiator,
+            i_id=self.i_id,
+            r_id=self.r_id,
+            id_type=self.id_type,
+            auth_data=self.auth_data,
+            auth_method=self.auth_method,
+            priv_key=self.priv_key,
+            i_natt=self.i_natt,
+            r_natt=self.r_natt,
+            udp_encap=self.udp_encap,
+            **kwargs,
+        )
+        if sa.is_initiator:
+            sa.set_ike_props(
+                crypto=self.ike_crypto,
+                crypto_key_len=self.ike_crypto_key_len,
+                integ=self.ike_integ,
+                prf=self.ike_prf,
+                dh=self.ike_dh,
+            )
+            sa.set_esp_props(
+                crypto=self.esp_crypto,
+                crypto_key_len=self.esp_crypto_key_len,
+                integ=self.esp_integ,
+            )
+        return sa
+
 
+@unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
 class IkePeer(VppTestCase):
     """common class for initiator and responder"""
 
@@ -606,6 +662,8 @@ class IkePeer(VppTestCase):
             self.initiate_del_sa_from_initiator()
         r = self.vapi.ikev2_sa_dump()
         self.assertEqual(len(r), 0)
+        r = self.vapi.ikev2_sa_v2_dump()
+        self.assertEqual(len(r), 0)
         sas = self.vapi.ipsec_sa_dump()
         self.assertEqual(len(sas), 0)
         self.p.remove_vpp_config()
@@ -625,8 +683,8 @@ class IkePeer(VppTestCase):
         node_name = "/err/ikev2-%s/" % version + name
         self.assertEqual(count, self.statistics.get_err_counter(node_name))
 
-    def create_rekey_request(self):
-        sa, first_payload = self.generate_auth_payload(is_rekey=True)
+    def create_rekey_request(self, kex=False):
+        sa, first_payload = self.generate_auth_payload(is_rekey=True, kex=kex)
         header = ikev2.IKEv2(
             init_SPI=self.sa.ispi,
             resp_SPI=self.sa.rspi,
@@ -640,6 +698,20 @@ class IkePeer(VppTestCase):
             self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
         )
 
+    def create_sa_rekey_request(self, **kwargs):
+        sa = self.generate_sa_init_payload(**kwargs)
+        header = ikev2.IKEv2(
+            init_SPI=self.sa.ispi,
+            resp_SPI=self.sa.rspi,
+            id=self.sa.new_msg_id(),
+            flags="Initiator",
+            exch_type="CREATE_CHILD_SA",
+        )
+        ike_msg = self.encrypt_ike_msg(header, sa, "SA")
+        return self.create_packet(
+            self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
+        )
+
     def create_empty_request(self):
         header = ikev2.IKEv2(
             init_SPI=self.sa.ispi,
@@ -752,14 +824,15 @@ class IkePeer(VppTestCase):
         else:
             self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
 
-    def verify_ipsec_sas(self, is_rekey=False):
+    def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
         sas = self.vapi.ipsec_sa_dump()
-        if is_rekey:
-            # after rekey there is a short period of time in which old
-            # inbound SA is still present
-            sa_count = 3
-        else:
-            sa_count = 2
+        if sa_count is None:
+            if is_rekey:
+                # after rekey there is a short period of time in which old
+                # inbound SA is still present
+                sa_count = 3
+            else:
+                sa_count = 2
         self.assertEqual(len(sas), sa_count)
         if self.sa.is_initiator:
             if is_rekey:
@@ -819,10 +892,89 @@ class IkePeer(VppTestCase):
         self.assertEqual(api_id.data_len, exp_id.data_len)
         self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
 
-    def verify_ike_sas(self):
+    def verify_ike_sas(self, is_rekey=False):
         r = self.vapi.ikev2_sa_dump()
+        if is_rekey:
+            sa_count = 2
+            sa = r[1].sa
+        else:
+            sa_count = 1
+            sa = r[0].sa
+        self.assertEqual(len(r), sa_count)
+        self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
+        self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
+        if self.ip6:
+            if self.sa.is_initiator:
+                self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
+                self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
+            else:
+                self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
+                self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
+        else:
+            if self.sa.is_initiator:
+                self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
+                self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
+            else:
+                self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
+                self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
+        self.verify_keymat(sa.keys, self.sa, "sk_d")
+        self.verify_keymat(sa.keys, self.sa, "sk_ai")
+        self.verify_keymat(sa.keys, self.sa, "sk_ar")
+        self.verify_keymat(sa.keys, self.sa, "sk_ei")
+        self.verify_keymat(sa.keys, self.sa, "sk_er")
+        self.verify_keymat(sa.keys, self.sa, "sk_pi")
+        self.verify_keymat(sa.keys, self.sa, "sk_pr")
+
+        self.assertEqual(sa.i_id.type, self.sa.id_type)
+        self.assertEqual(sa.r_id.type, self.sa.id_type)
+        self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
+        self.assertEqual(sa.r_id.data_len, len(self.idr))
+        self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
+        self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
+
+        n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
+        self.verify_nonce(n, self.sa.i_nonce)
+        n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
+        self.verify_nonce(n, self.sa.r_nonce)
+
+        r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
+        if is_rekey:
+            self.assertEqual(len(r), 0)
+            return
+
+        self.assertEqual(len(r), 1)
+        csa = r[0].child_sa
+        self.assertEqual(csa.sa_index, sa.sa_index)
+        c = self.sa.child_sas[0]
+        if hasattr(c, "sk_ai"):
+            self.verify_keymat(csa.keys, c, "sk_ai")
+            self.verify_keymat(csa.keys, c, "sk_ar")
+        self.verify_keymat(csa.keys, c, "sk_ei")
+        self.verify_keymat(csa.keys, c, "sk_er")
+        self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
+        self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
+
+        tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
+        tsi = tsi[0]
+        tsr = tsr[0]
+        r = self.vapi.ikev2_traffic_selector_dump(
+            is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
+        )
+        self.assertEqual(len(r), 1)
+        ts = r[0].ts
+        self.verify_ts(r[0].ts, tsi[0], True)
+
+        r = self.vapi.ikev2_traffic_selector_dump(
+            is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
+        )
+        self.assertEqual(len(r), 1)
+        self.verify_ts(r[0].ts, tsr[0], False)
+
+    def verify_ike_sas_v2(self):
+        r = self.vapi.ikev2_sa_v2_dump()
         self.assertEqual(len(r), 1)
         sa = r[0].sa
+        self.assertEqual(self.p.profile_name, sa.profile_name)
         self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
         self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
         if self.ip6:
@@ -1209,6 +1361,7 @@ class TemplateInitiator(IkePeer):
         self.sa.calc_child_keys()
         self.send_auth_response()
         self.verify_ike_sas()
+        self.verify_ike_sas_v2()
 
 
 class TemplateResponder(IkePeer):
@@ -1270,7 +1423,9 @@ class TemplateResponder(IkePeer):
         capture = self.pg0.get_capture(1)
         self.verify_del_sa(capture[0])
 
-    def send_sa_init_req(self):
+    def generate_sa_init_payload(
+        self, spi=None, dh_pub_key=None, nonce=None, next_payload=None
+    ):
         tr_attr = self.sa.ike_crypto_attr()
         trans = (
             ikev2.IKEv2_payload_Transform(
@@ -1290,23 +1445,36 @@ class TemplateResponder(IkePeer):
             )
         )
 
+        if spi is None:
+            pargs = {}
+        else:
+            pargs = {"SPI": spi, "SPIsize": len(spi)}
         props = ikev2.IKEv2_payload_Proposal(
-            proposal=1, proto="IKEv2", trans_nb=4, trans=trans
+            proposal=1,
+            proto="IKEv2",
+            trans_nb=4,
+            trans=trans,
+            **pargs,
         )
 
-        next_payload = None if self.ip6 else "Notify"
-
-        self.sa.init_req_packet = (
-            ikev2.IKEv2(
-                init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
-            )
-            / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
+        return (
+            ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
             / ikev2.IKEv2_payload_KE(
-                next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
+                next_payload="Nonce",
+                group=self.sa.ike_dh,
+                load=self.sa.my_dh_pub_key if dh_pub_key is None else dh_pub_key,
+            )
+            / ikev2.IKEv2_payload_Nonce(
+                next_payload=next_payload,
+                load=self.sa.i_nonce if nonce is None else nonce,
             )
-            / ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce)
         )
 
+    def send_sa_init_req(self):
+        self.sa.init_req_packet = ikev2.IKEv2(
+            init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
+        ) / self.generate_sa_init_payload(next_payload=None if self.ip6 else "Notify")
+
         if not self.ip6:
             if self.sa.i_natt:
                 src_address = b"\x0a\x0a\x0a\x01"
@@ -1344,9 +1512,10 @@ class TemplateResponder(IkePeer):
         capture = self.pg0.get_capture(1)
         self.verify_sa_init(capture[0])
 
-    def generate_auth_payload(self, last_payload=None, is_rekey=False):
+    def generate_auth_payload(self, last_payload=None, is_rekey=False, kex=False):
         tr_attr = self.sa.esp_crypto_attr()
         last_payload = last_payload or "Notify"
+        trans_nb = 4
         trans = (
             ikev2.IKEv2_payload_Transform(
                 transform_type="Encryption",
@@ -1365,9 +1534,20 @@ class TemplateResponder(IkePeer):
             )
         )
 
+        if kex:
+            trans_nb += 1
+            trans /= ikev2.IKEv2_payload_Transform(
+                transform_type="GroupDesc", transform_id=self.sa.ike_dh
+            )
+
         c = self.sa.child_sas[0]
         props = ikev2.IKEv2_payload_Proposal(
-            proposal=1, proto="ESP", SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans
+            proposal=1,
+            proto="ESP",
+            SPIsize=4,
+            SPI=c.ispi,
+            trans_nb=trans_nb,
+            trans=trans,
         )
 
         tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
@@ -1388,10 +1568,27 @@ class TemplateResponder(IkePeer):
 
         if is_rekey:
             first_payload = "Nonce"
+            if kex:
+                head = ikev2.IKEv2_payload_Nonce(
+                    load=self.sa.i_nonce, next_payload="KE"
+                ) / ikev2.IKEv2_payload_KE(
+                    group=self.sa.ike_dh, load=self.sa.my_dh_pub_key, next_payload="SA"
+                )
+            else:
+                head = ikev2.IKEv2_payload_Nonce(
+                    load=self.sa.i_nonce, next_payload="SA"
+                )
             plain = (
-                ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce, next_payload="SA")
+                head
                 / plain
-                / ikev2.IKEv2_payload_Notify(type="REKEY_SA", proto="ESP", SPI=c.ispi)
+                / ikev2.IKEv2_payload_Notify(
+                    type="REKEY_SA",
+                    proto="ESP",
+                    SPI=c.ispi,
+                    length=8 + len(c.ispi),
+                    next_payload="Notify",
+                )
+                / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
             )
         else:
             first_payload = "IDi"
@@ -1474,11 +1671,17 @@ class TemplateResponder(IkePeer):
         self.assertEqual(1, s.n_sa_auth_req)
         self.assertEqual(1, s.n_sa_init_req)
 
+        r = self.vapi.ikev2_sa_v2_dump()
+        s = r[0].sa.stats
+        self.assertEqual(1, s.n_sa_auth_req)
+        self.assertEqual(1, s.n_sa_init_req)
+
     def test_responder(self):
         self.send_sa_init_req()
         self.send_sa_auth()
         self.verify_ipsec_sas()
         self.verify_ike_sas()
+        self.verify_ike_sas_v2()
         self.verify_counters()
 
 
@@ -1634,6 +1837,7 @@ class Ikev2Params(object):
             )
 
 
+@unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
 class TestApi(VppTestCase):
     """Test IKEV2 API"""
 
@@ -1938,6 +2142,7 @@ class TestInitiatorRequestWindowSize(TestInitiatorPsk):
 
         # verify that only the second request was accepted
         self.verify_ike_sas()
+        self.verify_ike_sas_v2()
         self.verify_ipsec_sas(is_rekey=True)
 
 
@@ -1982,6 +2187,7 @@ class TestInitiatorRekey(TestInitiatorPsk):
         super(TestInitiatorRekey, self).test_initiator()
         self.rekey_from_initiator()
         self.verify_ike_sas()
+        self.verify_ike_sas_v2()
         self.verify_ipsec_sas(is_rekey=True)
 
 
@@ -2063,6 +2269,8 @@ class TestResponderDpd(TestResponderPsk):
         time.sleep(3)
         ike_sas = self.vapi.ikev2_sa_dump()
         self.assertEqual(len(ike_sas), 0)
+        ike_sas = self.vapi.ikev2_sa_v2_dump()
+        self.assertEqual(len(ike_sas), 0)
         ipsec_sas = self.vapi.ipsec_sa_dump()
         self.assertEqual(len(ipsec_sas), 0)
 
@@ -2071,12 +2279,19 @@ class TestResponderDpd(TestResponderPsk):
 class TestResponderRekey(TestResponderPsk):
     """test ikev2 responder - rekey"""
 
-    def rekey_from_initiator(self):
-        packet = self.create_rekey_request()
+    WITH_KEX = False
+
+    def send_rekey_from_initiator(self):
+        if self.WITH_KEX:
+            self.sa.generate_dh_data()
+        packet = self.create_rekey_request(kex=self.WITH_KEX)
         self.pg0.add_stream(packet)
         self.pg0.enable_capture()
         self.pg_start()
         capture = self.pg0.get_capture(1)
+        return capture
+
+    def process_rekey_response(self, capture):
         ih = self.get_ike_header(capture[0])
         plain = self.sa.hmac_and_decrypt(ih)
         sa = ikev2.IKEv2_payload_SA(plain)
@@ -2084,18 +2299,110 @@ class TestResponderRekey(TestResponderPsk):
         self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
         # update new responder SPI
         self.sa.child_sas[0].rspi = prop.SPI
+        if self.WITH_KEX:
+            self.sa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
+            self.sa.complete_dh_data()
+        self.sa.calc_child_keys(kex=self.WITH_KEX)
 
     def test_responder(self):
         super(TestResponderRekey, self).test_responder()
-        self.rekey_from_initiator()
-        self.sa.calc_child_keys()
+        self.process_rekey_response(self.send_rekey_from_initiator())
         self.verify_ike_sas()
+        self.verify_ike_sas_v2()
         self.verify_ipsec_sas(is_rekey=True)
         self.assert_counter(1, "rekey_req", "ip4")
         r = self.vapi.ikev2_sa_dump()
         self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
+        r = self.vapi.ikev2_sa_v2_dump()
+        self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
+
+
+@tag_fixme_vpp_workers
+class TestResponderRekeyRepeat(TestResponderRekey):
+    """test ikev2 responder - rekey repeat"""
+
+    def test_responder(self):
+        super(TestResponderRekeyRepeat, self).test_responder()
+        # rekey request is not accepted until old IPsec SA is expired
+        capture = self.send_rekey_from_initiator()
+        ih = self.get_ike_header(capture[0])
+        plain = self.sa.hmac_and_decrypt(ih)
+        notify = ikev2.IKEv2_payload_Notify(plain)
+        self.assertEqual(notify.type, 43)
+        self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
+        # rekey request is accepted after old IPsec SA was expired
+        for _ in range(50):
+            if len(self.vapi.ipsec_sa_dump()) != 3:
+                break
+            time.sleep(0.2)
+        else:
+            self.fail("old IPsec SA not expired")
+        self.process_rekey_response(self.send_rekey_from_initiator())
+        self.verify_ike_sas()
+        self.verify_ike_sas_v2()
+        self.verify_ipsec_sas(sa_count=3)
+
+
+@tag_fixme_vpp_workers
+class TestResponderRekeyKEX(TestResponderRekey):
+    """test ikev2 responder - rekey with key exchange"""
+
+    WITH_KEX = True
+
+
+@tag_fixme_vpp_workers
+class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat):
+    """test ikev2 responder - rekey repeat with key exchange"""
 
+    WITH_KEX = True
 
+
+@tag_fixme_vpp_workers
+class TestResponderRekeySA(TestResponderPsk):
+    """test ikev2 responder - rekey IKE SA"""
+
+    def send_rekey_from_initiator(self, newsa):
+        packet = self.create_sa_rekey_request(
+            spi=newsa.ispi,
+            dh_pub_key=newsa.my_dh_pub_key,
+            nonce=newsa.i_nonce,
+        )
+        self.pg0.add_stream(packet)
+        self.pg0.enable_capture()
+        self.pg_start()
+        capture = self.pg0.get_capture(1)
+        return capture
+
+    def process_rekey_response(self, newsa, capture):
+        ih = self.get_ike_header(capture[0])
+        plain = self.sa.hmac_and_decrypt(ih)
+        sa = ikev2.IKEv2_payload_SA(plain)
+        prop = sa[ikev2.IKEv2_payload_Proposal]
+        newsa.rspi = prop.SPI
+        newsa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
+        newsa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
+        newsa.complete_dh_data()
+        newsa.calc_keys(sk_d=self.sa.sk_d)
+        newsa.child_sas = self.sa.child_sas
+        self.sa.child_sas = []
+
+    def test_responder(self):
+        super(TestResponderRekeySA, self).test_responder()
+        newsa = self.sa.clone(self, spi=os.urandom(8))
+        newsa.generate_dh_data()
+        capture = self.send_rekey_from_initiator(newsa)
+        self.process_rekey_response(newsa, capture)
+        self.verify_ike_sas(is_rekey=True)
+        self.assert_counter(1, "rekey_req", "ip4")
+        r = self.vapi.ikev2_sa_dump()
+        self.assertEqual(r[1].sa.stats.n_rekey_req, 1)
+        self.initiate_del_sa_from_initiator()
+        self.sa = newsa
+        self.verify_ike_sas()
+
+
+@tag_fixme_ubuntu2204
+@tag_fixme_debian11
 class TestResponderVrf(TestResponderPsk, Ikev2Params):
     """test ikev2 responder - non-default table id"""
 
@@ -2105,6 +2412,10 @@ class TestResponderVrf(TestResponderPsk, Ikev2Params):
 
         globals()["ikev2"] = _ikev2
         super(IkePeer, cls).setUpClass()
+        if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
+            cls, "vpp"
+        ):
+            return
         cls.create_pg_interfaces(range(1))
         cls.vapi.cli("ip table add 1")
         cls.vapi.cli("set interface ip table pg0 1")
@@ -2232,6 +2543,8 @@ class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
         self.assert_counter(1, "keepalive", "ip4")
         r = self.vapi.ikev2_sa_dump()
         self.assertEqual(1, r[0].sa.stats.n_keepalives)
+        r = self.vapi.ikev2_sa_v2_dump()
+        self.assertEqual(1, r[0].sa.stats.n_keepalives)
 
     def test_initiator(self):
         super(TestInitiatorKeepaliveMsg, self).test_initiator()