tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / test_ipsec_spd_fp_input.py
index 199fbdf..ec4a7c7 100644 (file)
@@ -3,11 +3,11 @@ import unittest
 import ipaddress
 
 from util import ppp
-from framework import VppTestRunner
+from asfframework import VppTestRunner
 from template_ipsec import IPSecIPv4Fwd
 from template_ipsec import IPSecIPv6Fwd
 from test_ipsec_esp import TemplateIpsecEsp
-import pdb
+from template_ipsec import SpdFastPathTemplate
 
 
 def debug_signal_handler(signal, frame):
@@ -21,7 +21,7 @@ import signal
 signal.signal(signal.SIGINT, debug_signal_handler)
 
 
-class SpdFastPathInbound(IPSecIPv4Fwd):
+class SpdFastPathInbound(SpdFastPathTemplate):
     # In test cases derived from this class, packets in IPv4 FWD path
     # are configured to go through IPSec inbound SPD policy lookup.
     # Note that order in which the rules are applied is
@@ -35,30 +35,6 @@ class SpdFastPathInbound(IPSecIPv4Fwd):
         cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-inbound-spd-fast-path on", "}"])
         cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
 
-    @classmethod
-    def create_enc_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
-        packets = []
-        params = self.params[socket.AF_INET]
-        for i in range(pkt_count):
-            # create packet info stored in the test case instance
-            info = self.create_packet_info(src_if, dst_if)
-            # convert the info into packet payload
-            payload = self.info_to_payload(info)
-            # create the packet itself
-            p = Ether(
-                src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
-            ) / params.scapy_tra_sa.encrypt(
-                IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
-                / UDP(sport=src_prt, dport=dst_prt)
-                / Raw(payload)
-            )
-            # store a copy of the packet in the packet info
-            info.data = p.copy()
-            # append the packet to the list
-            packets.append(p)
-        # return the created packet list
-        return packets
-
 
 class SpdFastPathInboundProtect(TemplateIpsecEsp):
     @classmethod
@@ -98,6 +74,29 @@ class SpdFastPathIPv6Inbound(IPSecIPv6Fwd):
         cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
 
 
+class SpdFastPathIPv6InboundProtect(TemplateIpsecEsp):
+    @classmethod
+    def setUpConstants(cls):
+        super(SpdFastPathIPv6InboundProtect, cls).setUpConstants()
+        cls.vpp_cmdline.extend(["ipsec", "{", "ipv6-inbound-spd-fast-path on", "}"])
+        cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
+
+    @classmethod
+    def setUpClass(cls):
+        super(SpdFastPathIPv6InboundProtect, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(SpdFastPathIPv6InboundProtect, cls).tearDownClass()
+
+    def setUp(self):
+        super(SpdFastPathIPv6InboundProtect, self).setUp()
+
+    def tearDown(self):
+        self.unconfig_network()
+        super(SpdFastPathIPv6InboundProtect, self).tearDown()
+
+
 class IPSec4SpdTestCaseBypass(SpdFastPathInbound):
     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
         (add bypass)"""
@@ -120,7 +119,10 @@ class IPSec4SpdTestCaseBypass(SpdFastPathInbound):
 
         # create input rules
         # bypass rule should take precedence over discard rule,
-        # even though it's lower priority
+        # even though it's lower priority, because for input policies
+        # matching PROTECT policies precedes matching BYPASS policies
+        # which preceeds matching for DISCARD policies.
+        # Any hit stops the process.
         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
             1,
             self.pg1,
@@ -130,10 +132,10 @@ class IPSec4SpdTestCaseBypass(SpdFastPathInbound):
             priority=10,
             policy_type="bypass",
             ip_range=True,
-            local_ip_start=self.pg0.remote_ip4,
-            local_ip_stop=self.pg0.remote_ip4,
-            remote_ip_start=self.pg1.remote_ip4,
-            remote_ip_stop=self.pg1.remote_ip4,
+            local_ip_start=self.pg1.remote_ip4,
+            local_ip_stop=self.pg1.remote_ip4,
+            remote_ip_start=self.pg0.remote_ip4,
+            remote_ip_stop=self.pg0.remote_ip4,
         )
         policy_1 = self.spd_add_rem_policy(  # inbound, priority 15
             1,
@@ -144,10 +146,10 @@ class IPSec4SpdTestCaseBypass(SpdFastPathInbound):
             priority=15,
             policy_type="discard",
             ip_range=True,
-            local_ip_start=self.pg0.remote_ip4,
-            local_ip_stop=self.pg0.remote_ip4,
-            remote_ip_start=self.pg1.remote_ip4,
-            remote_ip_stop=self.pg1.remote_ip4,
+            local_ip_start=self.pg1.remote_ip4,
+            local_ip_stop=self.pg1.remote_ip4,
+            remote_ip_start=self.pg0.remote_ip4,
+            remote_ip_stop=self.pg0.remote_ip4,
         )
 
         # create output rule so we can capture forwarded packets
@@ -212,18 +214,13 @@ class IPSec4SpdTestCaseDiscard(SpdFastPathInbound):
             is_out=0,
             priority=10,
             policy_type="discard",
-            ip_range=True,
-            local_ip_start=self.pg0.remote_ip4,
-            local_ip_stop=self.pg0.remote_ip4,
-            remote_ip_start=self.pg1.remote_ip4,
-            remote_ip_stop=self.pg1.remote_ip4,
         )
 
         # create output rule so we can capture forwarded packets
         policy_1 = self.spd_add_rem_policy(  # outbound, priority 10
             1,
-            self.pg0,
             self.pg1,
+            self.pg0,
             socket.IPPROTO_UDP,
             is_out=1,
             priority=10,
@@ -264,16 +261,9 @@ class IPSec4SpdTestCaseProtect(SpdFastPathInboundProtect):
         super(IPSec4SpdTestCaseProtect, self).tearDown()
 
     def test_ipsec_spd_inbound_protect(self):
-        # In this test case, packets in IPv4 FWD path are configured
+        # In this test case, encrypted packets in IPv4
+        # PROTECT path are configured
         # to go through IPSec inbound SPD policy lookup.
-        #
-        # 2 inbound SPD rules (1 HIGH and 1 LOW) are added.
-        # - High priority rule action is set to DISCARD.
-        # - Low priority rule action is set to BYPASS.
-        #
-        # Since BYPASS rules take precedence over DISCARD
-        # (the order being PROTECT, BYPASS, DISCARD) we expect the
-        # BYPASS rule to match and traffic to be correctly forwarded.
 
         pkt_count = 5
         payload_size = 64
@@ -282,8 +272,8 @@ class IPSec4SpdTestCaseProtect(SpdFastPathInboundProtect):
             p,
             p.scapy_tra_sa,
             self.tra_if,
-            src=self.tra_if.local_ip4,
-            dst=self.tra_if.remote_ip4,
+            src=self.tra_if.remote_ip4,
+            dst=self.tra_if.local_ip4,
             count=pkt_count,
             payload_size=payload_size,
         )
@@ -304,8 +294,8 @@ class IPSec4SpdTestCaseProtect(SpdFastPathInboundProtect):
             pkt_count,
             "incorrect SA out counts: expected %d != %d" % (pkt_count, pkts),
         )
-        self.assertEqual(p.tra_sa_out.get_lost(), 0)
-        self.assertEqual(p.tra_sa_in.get_lost(), 0)
+        self.assertEqual(p.tra_sa_out.get_err("lost"), 0)
+        self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
 
 
 class IPSec4SpdTestCaseAddIPRange(SpdFastPathInbound):
@@ -340,10 +330,10 @@ class IPSec4SpdTestCaseAddIPRange(SpdFastPathInbound):
             priority=10,
             policy_type="bypass",
             ip_range=True,
-            local_ip_start=s_ip_s0,
-            local_ip_stop=s_ip_e0,
-            remote_ip_start=d_ip_s0,
-            remote_ip_stop=d_ip_e0,
+            local_ip_start=d_ip_s0,
+            local_ip_stop=d_ip_e0,
+            remote_ip_start=s_ip_s0,
+            remote_ip_stop=s_ip_e0,
         )
         policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
             1,
@@ -492,8 +482,8 @@ class IPSec4SpdTestCaseRemove(SpdFastPathInbound):
         self.spd_create_and_intf_add(1, [self.pg0, self.pg1])
         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
             1,
-            self.pg0,
             self.pg1,
+            self.pg0,
             socket.IPPROTO_UDP,
             is_out=0,
             priority=10,
@@ -501,8 +491,8 @@ class IPSec4SpdTestCaseRemove(SpdFastPathInbound):
         )
         policy_1 = self.spd_add_rem_policy(  # inbound, priority 5
             1,
-            self.pg0,
             self.pg1,
+            self.pg0,
             socket.IPPROTO_UDP,
             is_out=0,
             priority=5,
@@ -547,8 +537,8 @@ class IPSec4SpdTestCaseRemove(SpdFastPathInbound):
         # now remove the bypass rule
         self.spd_add_rem_policy(  # outbound, priority 10
             1,
-            self.pg0,
             self.pg1,
+            self.pg0,
             socket.IPPROTO_UDP,
             is_out=0,
             priority=10,
@@ -593,8 +583,8 @@ class IPSec4SpdTestCaseReadd(SpdFastPathInbound):
         self.spd_create_and_intf_add(1, [self.pg0, self.pg1])
         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
             1,
-            self.pg0,
             self.pg1,
+            self.pg0,
             socket.IPPROTO_UDP,
             is_out=0,
             priority=10,
@@ -602,8 +592,8 @@ class IPSec4SpdTestCaseReadd(SpdFastPathInbound):
         )
         policy_1 = self.spd_add_rem_policy(  # inbound, priority 5
             1,
-            self.pg0,
             self.pg1,
+            self.pg0,
             socket.IPPROTO_UDP,
             is_out=0,
             priority=5,
@@ -647,8 +637,8 @@ class IPSec4SpdTestCaseReadd(SpdFastPathInbound):
         # remove the bypass rule, leaving only the discard rule
         self.spd_add_rem_policy(  # inbound, priority 10
             1,
-            self.pg0,
             self.pg1,
+            self.pg0,
             socket.IPPROTO_UDP,
             is_out=0,
             priority=10,
@@ -673,8 +663,8 @@ class IPSec4SpdTestCaseReadd(SpdFastPathInbound):
         # now readd the bypass rule
         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
             1,
-            self.pg0,
             self.pg1,
+            self.pg0,
             socket.IPPROTO_UDP,
             is_out=0,
             priority=10,
@@ -726,8 +716,8 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathInbound):
         # add rules on all interfaces
         policy_01 = self.spd_add_rem_policy(  # inbound, priority 10
             1,
-            self.pg0,
             self.pg1,
+            self.pg0,
             socket.IPPROTO_UDP,
             is_out=0,
             priority=10,
@@ -735,8 +725,8 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathInbound):
         )
         policy_02 = self.spd_add_rem_policy(  # inbound, priority 5
             1,
-            self.pg0,
             self.pg1,
+            self.pg0,
             socket.IPPROTO_UDP,
             is_out=0,
             priority=5,
@@ -745,8 +735,8 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathInbound):
 
         policy_11 = self.spd_add_rem_policy(  # inbound, priority 10
             1,
-            self.pg1,
             self.pg2,
+            self.pg1,
             socket.IPPROTO_UDP,
             is_out=0,
             priority=10,
@@ -754,8 +744,8 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathInbound):
         )
         policy_12 = self.spd_add_rem_policy(  # inbound, priority 5
             1,
-            self.pg1,
             self.pg2,
+            self.pg1,
             socket.IPPROTO_UDP,
             is_out=0,
             priority=5,
@@ -764,8 +754,8 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathInbound):
 
         policy_21 = self.spd_add_rem_policy(  # inbound, priority 5
             1,
-            self.pg2,
             self.pg0,
+            self.pg2,
             socket.IPPROTO_UDP,
             is_out=0,
             priority=5,
@@ -773,8 +763,8 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathInbound):
         )
         policy_22 = self.spd_add_rem_policy(  # inbound, priority 10
             1,
-            self.pg2,
             self.pg0,
+            self.pg2,
             socket.IPPROTO_UDP,
             is_out=0,
             priority=10,
@@ -840,5 +830,56 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathInbound):
         self.verify_policy_match(0, policy_22)
 
 
+class IPSec6SpdTestCaseProtect(SpdFastPathIPv6InboundProtect):
+    """ IPSec/IPv6 inbound: Policy mode test case with fast path \
+    (add protect)"""
+
+    @classmethod
+    def setUpClass(cls):
+        super(IPSec6SpdTestCaseProtect, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(IPSec6SpdTestCaseProtect, cls).tearDownClass()
+
+    def setUp(self):
+        super(IPSec6SpdTestCaseProtect, self).setUp()
+
+    def tearDown(self):
+        super(IPSec6SpdTestCaseProtect, self).tearDown()
+
+    def test_ipsec6_spd_inbound_protect(self):
+        pkt_count = 5
+        payload_size = 64
+        p = self.params[socket.AF_INET6]
+        send_pkts = self.gen_encrypt_pkts6(
+            p,
+            p.scapy_tra_sa,
+            self.tra_if,
+            src=self.tra_if.remote_ip6,
+            dst=self.tra_if.local_ip6,
+            count=pkt_count,
+            payload_size=payload_size,
+        )
+        recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
+
+        self.logger.info(self.vapi.ppcli("show error"))
+        self.logger.info(self.vapi.ppcli("show ipsec all"))
+        pkts = p.tra_sa_in.get_stats()["packets"]
+        self.assertEqual(
+            pkts,
+            pkt_count,
+            "incorrect SA in counts: expected %d != %d" % (pkt_count, pkts),
+        )
+        pkts = p.tra_sa_out.get_stats()["packets"]
+        self.assertEqual(
+            pkts,
+            pkt_count,
+            "incorrect SA out counts: expected %d != %d" % (pkt_count, pkts),
+        )
+        self.assertEqual(p.tra_sa_out.get_err("lost"), 0)
+        self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
+
+
 if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)