ipsec: add test for tun sa ip6 fast-path spd policy matching 94/42094/5
authorPiotr Bronowski <[email protected]>
Fri, 3 Jan 2025 18:25:41 +0000 (18:25 +0000)
committerFan Zhang <[email protected]>
Tue, 14 Jan 2025 11:02:45 +0000 (11:02 +0000)
In case SA defines a tunnel, policy matching should be performed based
on the tunnel header defined by the SA. This change tests
the  matching for ip6 fast path SPD implementation.

Type: test

Signed-off-by: Piotr Bronowski <[email protected]>
Change-Id: I311b221bce565de0e8235fd162305eb10550edd9

test/test_ipsec_spd_fp_input.py

index eb04df4..1953bbe 100644 (file)
@@ -9,6 +9,7 @@ from template_ipsec import IPSecIPv6Fwd
 from test_ipsec_esp import TemplateIpsecEsp
 from template_ipsec import SpdFastPathTemplate
 from config import config
+import pdb
 
 
 def debug_signal_handler(signal, frame):
@@ -888,5 +889,50 @@ class IPSec6SpdTestCaseProtect(SpdFastPathIPv6InboundProtect):
         self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
 
 
+class IPSec6SpdTestCaseTunProtect(SpdFastPathIPv6InboundProtect):
+    """IPSec/IPv6 inbound: Policy mode test case with fast path"""
+
+    # In this test sa_in defines a tunnel. Matching should be
+    # done based on the sa tunnel header.
+
+    @classmethod
+    def setUpClass(cls):
+        super(IPSec6SpdTestCaseTunProtect, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(IPSec6SpdTestCaseTunProtect, cls).tearDownClass()
+
+    def setUp(self):
+        super(IPSec6SpdTestCaseTunProtect, self).setUp()
+
+    def tearDown(self):
+        super(IPSec6SpdTestCaseTunProtect, self).tearDown()
+
+    def test_ipsec6_spd_inbound_tun_protect(self):
+        pkt_count = 5
+        payload_size = 64
+        p = self.params[socket.AF_INET6]
+        send_pkts = self.gen_encrypt_pkts6(
+            p,
+            p.scapy_tun_sa,
+            self.tun_if,
+            src=p.remote_tun_if_host,
+            dst=self.pg1.remote_ip6,
+            count=pkt_count,
+            payload_size=payload_size,
+        )
+        recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
+        self.logger.info(self.vapi.ppcli("show error"))
+        self.logger.info(self.vapi.ppcli("show ipsec all"))
+        pkts = p.tun_sa_in.get_stats()["packets"]
+        self.assertEqual(
+            pkts,
+            pkt_count,
+            "incorrect SA in counts: expected %d != %d" % (pkt_count, pkts),
+        )
+        self.assertEqual(p.tun_sa_in.get_err("lost"), 0)
+
+
 if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)