tests: add fast path ipv6 python tests for outbound policy matching 42/36642/3
authorPiotr Bronowski <piotrx.bronowski@intel.com>
Fri, 8 Jul 2022 12:45:51 +0000 (12:45 +0000)
committerFan Zhang <roy.fan.zhang@intel.com>
Fri, 15 Jul 2022 12:45:34 +0000 (12:45 +0000)
This patch introduces set of python tests for fast path ipv6, based on
ipv4 tests. Some missing parts of ipsec framework has been added
in order to test ipv6 implementation.

Type: feature
Signed-off-by: Piotr Bronowski <piotrx.bronowski@intel.com>
Change-Id: Icc13322787d76485c08106bad2cb071947ad9846

test/template_ipsec.py
test/test_ipsec_spd_fp_output.py

index 1b1c9aa..2295b75 100644 (file)
@@ -2088,5 +2088,200 @@ class SpdFlowCacheTemplate(IPSecIPv4Fwd):
             return False
 
 
+class IPSecIPv6Fwd(VppTestCase):
+    """Test IPSec by capturing and verifying IPv6 forwarded pkts"""
+
+    @classmethod
+    def setUpConstants(cls):
+        super(IPSecIPv6Fwd, cls).setUpConstants()
+
+    def setUp(self):
+        super(IPSecIPv6Fwd, self).setUp()
+        # store SPD objects so we can remove configs on tear down
+        self.spd_objs = []
+        self.spd_policies = []
+
+    def tearDown(self):
+        # remove SPD policies
+        for obj in self.spd_policies:
+            obj.remove_vpp_config()
+        self.spd_policies = []
+        # remove SPD items (interface bindings first, then SPD)
+        for obj in reversed(self.spd_objs):
+            obj.remove_vpp_config()
+        self.spd_objs = []
+        # close down pg intfs
+        for pg in self.pg_interfaces:
+            pg.unconfig_ip6()
+            pg.admin_down()
+        super(IPSecIPv6Fwd, self).tearDown()
+
+    def create_interfaces(self, num_ifs=2):
+        # create interfaces pg0 ... pg<num_ifs>
+        self.create_pg_interfaces(range(num_ifs))
+        for pg in self.pg_interfaces:
+            # put the interface up
+            pg.admin_up()
+            # configure IPv6 address on the interface
+            pg.config_ip6()
+            pg.resolve_ndp()
+        self.logger.info(self.vapi.ppcli("show int addr"))
+
+    def spd_create_and_intf_add(self, spd_id, pg_list):
+        spd = VppIpsecSpd(self, spd_id)
+        spd.add_vpp_config()
+        self.spd_objs.append(spd)
+        for pg in pg_list:
+            spdItf = VppIpsecSpdItfBinding(self, spd, pg)
+            spdItf.add_vpp_config()
+            self.spd_objs.append(spdItf)
+
+    def get_policy(self, policy_type):
+        e = VppEnum.vl_api_ipsec_spd_action_t
+        if policy_type == "protect":
+            return e.IPSEC_API_SPD_ACTION_PROTECT
+        elif policy_type == "bypass":
+            return e.IPSEC_API_SPD_ACTION_BYPASS
+        elif policy_type == "discard":
+            return e.IPSEC_API_SPD_ACTION_DISCARD
+        else:
+            raise Exception("Invalid policy type: %s", policy_type)
+
+    def spd_add_rem_policy(
+        self,
+        spd_id,
+        src_if,
+        dst_if,
+        proto,
+        is_out,
+        priority,
+        policy_type,
+        remove=False,
+        all_ips=False,
+        ip_range=False,
+        local_ip_start=ip_address("0::0"),
+        local_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
+        remote_ip_start=ip_address("0::0"),
+        remote_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
+        remote_port_start=0,
+        remote_port_stop=65535,
+        local_port_start=0,
+        local_port_stop=65535,
+    ):
+        spd = VppIpsecSpd(self, spd_id)
+
+        if all_ips:
+            src_range_low = ip_address("0::0")
+            src_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
+            dst_range_low = ip_address("0::0")
+            dst_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
+
+        elif ip_range:
+            src_range_low = local_ip_start
+            src_range_high = local_ip_stop
+            dst_range_low = remote_ip_start
+            dst_range_high = remote_ip_stop
+
+        else:
+            src_range_low = src_if.remote_ip6
+            src_range_high = src_if.remote_ip6
+            dst_range_low = dst_if.remote_ip6
+            dst_range_high = dst_if.remote_ip6
+
+        spdEntry = VppIpsecSpdEntry(
+            self,
+            spd,
+            0,
+            src_range_low,
+            src_range_high,
+            dst_range_low,
+            dst_range_high,
+            proto,
+            priority=priority,
+            policy=self.get_policy(policy_type),
+            is_outbound=is_out,
+            remote_port_start=remote_port_start,
+            remote_port_stop=remote_port_stop,
+            local_port_start=local_port_start,
+            local_port_stop=local_port_stop,
+        )
+
+        if remove is False:
+            spdEntry.add_vpp_config()
+            self.spd_policies.append(spdEntry)
+        else:
+            spdEntry.remove_vpp_config()
+            self.spd_policies.remove(spdEntry)
+        self.logger.info(self.vapi.ppcli("show ipsec all"))
+        return spdEntry
+
+    def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
+        packets = []
+        for i in range(pkt_count):
+            # create packet info stored in the test case instance
+            info = self.create_packet_info(src_if, dst_if)
+            # convert the info into packet payload
+            payload = self.info_to_payload(info)
+            # create the packet itself
+            p = (
+                Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+                / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
+                / UDP(sport=src_prt, dport=dst_prt)
+                / Raw(payload)
+            )
+            # store a copy of the packet in the packet info
+            info.data = p.copy()
+            # append the packet to the list
+            packets.append(p)
+        # return the created packet list
+        return packets
+
+    def verify_capture(self, src_if, dst_if, capture):
+        packet_info = None
+        for packet in capture:
+            try:
+                ip = packet[IPv6]
+                udp = packet[UDP]
+                # convert the payload to packet info object
+                payload_info = self.payload_to_info(packet)
+                # make sure the indexes match
+                self.assert_equal(
+                    payload_info.src, src_if.sw_if_index, "source sw_if_index"
+                )
+                self.assert_equal(
+                    payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
+                )
+                packet_info = self.get_next_packet_info_for_interface2(
+                    src_if.sw_if_index, dst_if.sw_if_index, packet_info
+                )
+                # make sure we didn't run out of saved packets
+                self.assertIsNotNone(packet_info)
+                self.assert_equal(
+                    payload_info.index, packet_info.index, "packet info index"
+                )
+                saved_packet = packet_info.data  # fetch the saved packet
+                # assert the values match
+                self.assert_equal(ip.src, saved_packet[IPv6].src, "IP source address")
+                # ... more assertions here
+                self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
+            except Exception as e:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+        remaining_packet = self.get_next_packet_info_for_interface2(
+            src_if.sw_if_index, dst_if.sw_if_index, packet_info
+        )
+        self.assertIsNone(
+            remaining_packet,
+            "Interface %s: Packet expected from interface "
+            "%s didn't arrive" % (dst_if.name, src_if.name),
+        )
+
+    def verify_policy_match(self, pkt_count, spdEntry):
+        self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
+        matched_pkts = spdEntry.get_stats().get("packets")
+        self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
+        self.assert_equal(pkt_count, matched_pkts)
+
+
 if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)
index 645939c..242d30d 100644 (file)
@@ -5,6 +5,7 @@ import ipaddress
 from util import ppp
 from framework import VppTestRunner
 from template_ipsec import IPSecIPv4Fwd
+from template_ipsec import IPSecIPv6Fwd
 
 
 class SpdFastPathOutbound(IPSecIPv4Fwd):
@@ -16,6 +17,15 @@ class SpdFastPathOutbound(IPSecIPv4Fwd):
         cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
 
 
+class SpdFastPathIPv6Outbound(IPSecIPv6Fwd):
+    # Override setUpConstants to enable outbound fast path in config
+    @classmethod
+    def setUpConstants(cls):
+        super(SpdFastPathIPv6Outbound, cls).setUpConstants()
+        cls.vpp_cmdline.extend(["ipsec", "{", "ipv6-outbound-spd-fast-path on", "}"])
+        cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
+
+
 class IPSec4SpdTestCaseAdd(SpdFastPathOutbound):
     """ IPSec/IPv4 outbound: Policy mode test case with fast path \
         (add rule)"""
@@ -467,8 +477,6 @@ class IPSec4SpdTestCaseRemove(SpdFastPathOutbound):
         # verify all policies matched the expected number of times
         self.verify_policy_match(pkt_count, policy_0)
         self.verify_policy_match(0, policy_1)
-        # check policy in SPD has been cached after traffic
-        # matched BYPASS rule in SPD
         # now remove the bypass rule
         self.spd_add_rem_policy(  # outbound, priority 10
             1,
@@ -560,10 +568,7 @@ class IPSec4SpdTestCaseReadd(SpdFastPathOutbound):
         # verify all policies matched the expected number of times
         self.verify_policy_match(pkt_count, policy_0)
         self.verify_policy_match(0, policy_1)
-        # check policy in SPD has been cached after traffic
-        # matched BYPASS rule in SPD
-
-        # now remove the bypass rule, leaving only the discard rule
+        # remove the bypass rule, leaving only the discard rule
         self.spd_add_rem_policy(  # outbound, priority 10
             1,
             self.pg0,
@@ -758,5 +763,656 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathOutbound):
         self.verify_policy_match(pkt_count, policy_22)
 
 
+class IPSec6SpdTestCaseAdd(SpdFastPathIPv6Outbound):
+    """ IPSec/IPv6 outbound: Policy mode test case with fast path \
+        (add rule)"""
+
+    def test_ipsec_spd_outbound_add(self):
+        # In this test case, packets in IPv4 FWD path are configured
+        # to go through IPSec outbound SPD policy lookup.
+        # 2 SPD rules (1 HIGH and 1 LOW) are added.
+        # High priority rule action is set to BYPASS.
+        # Low priority rule action is set to DISCARD.
+        # Traffic sent on pg0 interface should match high priority
+        # rule and should be sent out on pg1 interface.
+        self.create_interfaces(2)
+        pkt_count = 5
+        s_port_s = 1111
+        s_port_e = 1111
+        d_port_s = 2222
+        d_port_e = 2222
+        self.spd_create_and_intf_add(1, [self.pg1])
+        policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+            local_port_start=s_port_s,
+            local_port_stop=s_port_e,
+            remote_port_start=d_port_s,
+            remote_port_stop=d_port_e,
+        )
+        policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=5,
+            policy_type="discard",
+            local_port_start=s_port_s,
+            local_port_stop=s_port_e,
+            remote_port_start=d_port_s,
+            remote_port_stop=d_port_e,
+        )
+
+        # create the packet stream
+        packets = self.create_stream(self.pg0, self.pg1, pkt_count, s_port_s, d_port_s)
+        # add the stream to the source interface + enable capture
+        self.pg0.add_stream(packets)
+        self.pg0.enable_capture()
+        self.pg1.enable_capture()
+        # start the packet generator
+        self.pg_start()
+        # get capture
+        capture = self.pg1.get_capture()
+        for packet in capture:
+            try:
+                self.logger.debug(ppp("SPD - Got packet:", packet))
+            except Exception:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+        self.logger.debug("SPD: Num packets: %s", len(capture.res))
+
+        # assert nothing captured on pg0
+        self.pg0.assert_nothing_captured()
+        # verify captured packets
+        self.verify_capture(self.pg0, self.pg1, capture)
+        # verify all policies matched the expected number of times
+        self.verify_policy_match(pkt_count, policy_0)
+        self.verify_policy_match(0, policy_1)
+
+
+class IPSec6SpdTestCaseAddAll(SpdFastPathIPv6Outbound):
+    """ IPSec/IPv6 outbound: Policy mode test case with fast path \
+        (add all ips ports rule)"""
+
+    def test_ipsec_spd_outbound_add(self):
+        # In this test case, packets in IPv4 FWD path are configured
+        # to go through IPSec outbound SPD policy lookup.
+        # 2 SPD rules (1 HIGH and 1 LOW) are added.
+        # Low priority rule action is set to BYPASS all ips.
+        # High priority rule action is set to DISCARD all ips.
+        # Traffic sent on pg0 interface when LOW priority rule is added,
+        # expect the packet is being sent out to pg1. Then HIGH priority
+        # rule is added and send the same traffic to pg0, this time expect
+        # the traffic is dropped.
+        self.create_interfaces(2)
+        pkt_count = 5
+        self.spd_create_and_intf_add(1, [self.pg1])
+        policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+            all_ips=True,
+        )
+
+        # create the packet stream
+        packets = self.create_stream(self.pg0, self.pg1, pkt_count)
+        # add the stream to the source interface + enable capture
+        self.pg0.add_stream(packets)
+        self.pg0.enable_capture()
+        self.pg1.enable_capture()
+        # start the packet generator
+        self.pg_start()
+        # get capture
+        capture = self.pg1.get_capture()
+        for packet in capture:
+            try:
+                self.logger.debug(ppp("SPD - Got packet:", packet))
+            except Exception:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+        self.logger.debug("SPD: Num packets: %s", len(capture.res))
+
+        # assert nothing captured on pg0
+        self.pg0.assert_nothing_captured()
+        # verify captured packets
+        self.verify_capture(self.pg0, self.pg1, capture)
+        # verify all policies matched the expected number of times
+        self.verify_policy_match(pkt_count, policy_0)
+
+        policy_1 = self.spd_add_rem_policy(  # outbound, priority 20
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=20,
+            policy_type="discard",
+            all_ips=True,
+        )
+
+        # create the packet stream
+        packets = self.create_stream(self.pg0, self.pg1, pkt_count)
+        # add the stream to the source interface + enable capture
+        self.pg0.add_stream(packets)
+        self.pg0.enable_capture()
+        self.pg1.enable_capture()
+        # start the packet generator
+        self.pg_start()
+        # assert nothing captured on pg0 and pg1
+        self.pg0.assert_nothing_captured()
+        self.pg1.assert_nothing_captured()
+
+
+class IPSec6SpdTestCaseAddPortRange(SpdFastPathIPv6Outbound):
+    """ IPSec/IPv6 outbound: Policy mode test case with fast path \
+        (add all ips port range rule)"""
+
+    def test_ipsec_spd_outbound_add(self):
+        # In this test case, packets in IPv4 FWD path are configured
+        # to go through IPSec outbound SPD policy lookup.
+        # 2 SPD rules (1 HIGH and 1 LOW) are added.
+        # High priority rule action is set to BYPASS.
+        # Low priority rule action is set to DISCARD.
+        # Traffic sent on pg0 interface should match high priority
+        # rule and should be sent out on pg1 interface.
+        self.create_interfaces(2)
+        pkt_count = 5
+        s_port_s = 1000
+        s_port_e = 2023
+        d_port_s = 5000
+        d_port_e = 6023
+        self.spd_create_and_intf_add(1, [self.pg1])
+        policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+            all_ips=True,
+            local_port_start=s_port_s,
+            local_port_stop=s_port_e,
+            remote_port_start=d_port_s,
+            remote_port_stop=d_port_e,
+        )
+        policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=5,
+            policy_type="discard",
+            all_ips=True,
+            local_port_start=s_port_s,
+            local_port_stop=s_port_e,
+            remote_port_start=d_port_s,
+            remote_port_stop=d_port_e,
+        )
+
+        # create the packet stream
+        packets = self.create_stream(self.pg0, self.pg1, pkt_count, 1333, 5444)
+        # add the stream to the source interface + enable capture
+        self.pg0.add_stream(packets)
+        self.pg0.enable_capture()
+        self.pg1.enable_capture()
+        # start the packet generator
+        self.pg_start()
+        # get capture
+        capture = self.pg1.get_capture()
+        for packet in capture:
+            try:
+                self.logger.debug(ppp("SPD - Got packet:", packet))
+            except Exception:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+        self.logger.debug("SPD: Num packets: %s", len(capture.res))
+
+        # assert nothing captured on pg0
+        self.pg0.assert_nothing_captured()
+        # verify captured packets
+        self.verify_capture(self.pg0, self.pg1, capture)
+        # verify all policies matched the expected number of times
+        self.verify_policy_match(pkt_count, policy_0)
+        self.verify_policy_match(0, policy_1)
+
+
+class IPSec6SpdTestCaseAddIPRange(SpdFastPathIPv6Outbound):
+    """ IPSec/IPv6 outbound: Policy mode test case with fast path \
+        (add ips range with any port rule)"""
+
+    def test_ipsec_spd_outbound_add(self):
+        # In this test case, packets in IPv4 FWD path are configured
+        # to go through IPSec outbound SPD policy lookup.
+        # 2 SPD rules (1 HIGH and 1 LOW) are added.
+        # High priority rule action is set to BYPASS.
+        # Low priority rule action is set to DISCARD.
+        # Traffic sent on pg0 interface should match high priority
+        # rule and should be sent out on pg1 interface.
+        self.create_interfaces(2)
+        pkt_count = 5
+        s_ip_s = ipaddress.ip_address(self.pg0.remote_ip6)
+        s_ip_e = ipaddress.ip_address(int(s_ip_s) + 5)
+        d_ip_s = ipaddress.ip_address(self.pg1.remote_ip6)
+        d_ip_e = ipaddress.ip_address(int(d_ip_s) + 0)
+        self.spd_create_and_intf_add(1, [self.pg1])
+        policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+            ip_range=True,
+            local_ip_start=s_ip_s,
+            local_ip_stop=s_ip_e,
+            remote_ip_start=d_ip_s,
+            remote_ip_stop=d_ip_e,
+        )
+        policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=5,
+            policy_type="discard",
+            ip_range=True,
+            local_ip_start=s_ip_s,
+            local_ip_stop=s_ip_e,
+            remote_ip_start=d_ip_s,
+            remote_ip_stop=d_ip_e,
+        )
+
+        # create the packet stream
+        packets = self.create_stream(self.pg0, self.pg1, pkt_count)
+        # add the stream to the source interface + enable capture
+        self.pg0.add_stream(packets)
+        self.pg0.enable_capture()
+        self.pg1.enable_capture()
+        # start the packet generator
+        self.pg_start()
+        # get capture
+        capture = self.pg1.get_capture()
+        for packet in capture:
+            try:
+                self.logger.debug(ppp("SPD - Got packet:", packet))
+            except Exception:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+        self.logger.debug("SPD: Num packets: %s", len(capture.res))
+
+        # assert nothing captured on pg0
+        self.pg0.assert_nothing_captured()
+        # verify captured packets
+        self.verify_capture(self.pg0, self.pg1, capture)
+        # verify all policies matched the expected number of times
+        self.verify_policy_match(pkt_count, policy_0)
+        self.verify_policy_match(0, policy_1)
+
+
+class IPSec6SpdTestCaseAddIPAndPortRange(SpdFastPathIPv6Outbound):
+    """ IPSec/IPvr6 outbound: Policy mode test case with fast path \
+             (add all ips  range rule)"""
+
+    def test_ipsec_spd_outbound_add(self):
+        # In this test case, packets in IPv4 FWD path are configured
+        # to go through IPSec outbound SPD policy lookup.
+        # 2 SPD rules (1 HIGH and 1 LOW) are added.
+        # High priority rule action is set to BYPASS.
+        # Low priority rule action is set to DISCARD.
+        # Traffic sent on pg0 interface should match high priority
+        # rule and should be sent out on pg1 interface.
+        # in this test we define ranges of ports and ip addresses.
+        self.create_interfaces(2)
+        pkt_count = 5
+        s_port_s = 1000
+        s_port_e = 1000 + 1023
+        d_port_s = 5000
+        d_port_e = 5000 + 1023
+
+        s_ip_s = ipaddress.ip_address(
+            int(ipaddress.ip_address(self.pg0.remote_ip6)) - 24
+        )
+        s_ip_e = ipaddress.ip_address(int(s_ip_s) + 255)
+        d_ip_s = ipaddress.ip_address(self.pg1.remote_ip6)
+        d_ip_e = ipaddress.ip_address(int(d_ip_s) + 255)
+        self.spd_create_and_intf_add(1, [self.pg1])
+        policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+            ip_range=True,
+            local_ip_start=s_ip_s,
+            local_ip_stop=s_ip_e,
+            remote_ip_start=d_ip_s,
+            remote_ip_stop=d_ip_e,
+            local_port_start=s_port_s,
+            local_port_stop=s_port_e,
+            remote_port_start=d_port_s,
+            remote_port_stop=d_port_e,
+        )
+        policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=5,
+            policy_type="discard",
+            ip_range=True,
+            local_ip_start=s_ip_s,
+            local_ip_stop=s_ip_e,
+            remote_ip_start=d_ip_s,
+            remote_ip_stop=d_ip_e,
+            local_port_start=s_port_s,
+            local_port_stop=s_port_e,
+            remote_port_start=d_port_s,
+            remote_port_stop=d_port_e,
+        )
+
+        # create the packet stream
+        packets = self.create_stream(self.pg0, self.pg1, pkt_count)
+        # add the stream to the source interface + enable capture
+        self.pg0.add_stream(packets)
+        self.pg0.enable_capture()
+        self.pg1.enable_capture()
+        # start the packet generator
+        self.pg_start()
+        # get capture
+        capture = self.pg1.get_capture()
+        for packet in capture:
+            try:
+                self.logger.debug(ppp("SPD - Got packet:", packet))
+            except Exception:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+        self.logger.debug("SPD: Num packets: %s", len(capture.res))
+
+        # assert nothing captured on pg0
+        self.pg0.assert_nothing_captured()
+        # verify captured packets
+        self.verify_capture(self.pg0, self.pg1, capture)
+        # verify all policies matched the expected number of times
+        self.verify_policy_match(pkt_count, policy_0)
+        self.verify_policy_match(0, policy_1)
+
+
+class IPSec6SpdTestCaseReadd(SpdFastPathIPv6Outbound):
+    """ IPSec/IPv6 outbound: Policy mode test case with fast path \
+        (add, remove, re-add)"""
+
+    def test_ipsec_spd_outbound_readd(self):
+        # In this test case, packets in IPv4 FWD path are configured
+        # to go through IPSec outbound SPD policy lookup.
+        # 2 SPD rules (1 HIGH and 1 LOW) are added.
+        # High priority rule action is set to BYPASS.
+        # Low priority rule action is set to DISCARD.
+        # Traffic sent on pg0 interface should match high priority
+        # rule and should be sent out on pg1 interface.
+        # High priority rule is then removed.
+        # Traffic sent on pg0 interface should match low priority
+        # rule and should be discarded after SPD lookup.
+        # Readd high priority rule.
+        # Traffic sent on pg0 interface should match high priority
+        # rule and should be sent out on pg1 interface.
+        self.create_interfaces(2)
+        pkt_count = 5
+        self.spd_create_and_intf_add(1, [self.pg1])
+        policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+        )
+        policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=5,
+            policy_type="discard",
+        )
+
+        # create the packet stream
+        packets = self.create_stream(self.pg0, self.pg1, pkt_count)
+        # add the stream to the source interface + enable capture
+        self.pg0.add_stream(packets)
+        self.pg0.enable_capture()
+        self.pg1.enable_capture()
+        # start the packet generator
+        self.pg_start()
+        # get capture
+        capture = self.pg1.get_capture()
+        for packet in capture:
+            try:
+                self.logger.debug(ppp("SPD - Got packet:", packet))
+            except Exception:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+        self.logger.debug("SPD: Num packets: %s", len(capture.res))
+
+        # assert nothing captured on pg0
+        self.pg0.assert_nothing_captured()
+        # verify capture on pg1
+        self.verify_capture(self.pg0, self.pg1, capture)
+        # verify all policies matched the expected number of times
+        self.verify_policy_match(pkt_count, policy_0)
+        self.verify_policy_match(0, policy_1)
+        # remove the bypass rule, leaving only the discard rule
+        self.spd_add_rem_policy(  # outbound, priority 10
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+            remove=True,
+        )
+
+        # resend the same packets
+        self.pg0.add_stream(packets)
+        self.pg0.enable_capture()  # flush the old captures
+        self.pg1.enable_capture()
+        self.pg_start()
+
+        # assert nothing captured on pg0
+        self.pg0.assert_nothing_captured()
+        # all packets will be dropped by SPD rule
+        self.pg1.assert_nothing_captured()
+        # verify all policies matched the expected number of times
+        self.verify_policy_match(pkt_count, policy_0)
+        self.verify_policy_match(pkt_count, policy_1)
+
+        # now readd the bypass rule
+        policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+        )
+
+        # resend the same packets
+        self.pg0.add_stream(packets)
+        self.pg0.enable_capture()  # flush the old captures
+        self.pg1.enable_capture()
+        self.pg_start()
+
+        # get capture
+        capture = self.pg1.get_capture(pkt_count)
+        for packet in capture:
+            try:
+                self.logger.debug(ppp("SPD - Got packet:", packet))
+            except Exception:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+        self.logger.debug("SPD: Num packets: %s", len(capture.res))
+
+        # assert nothing captured on pg0
+        self.pg0.assert_nothing_captured()
+        # verify captured packets
+        self.verify_capture(self.pg0, self.pg1, capture)
+        # verify all policies matched the expected number of times
+        self.verify_policy_match(pkt_count, policy_0)
+        self.verify_policy_match(pkt_count, policy_1)
+
+
+class IPSec6SpdTestCaseMultiple(SpdFastPathIPv6Outbound):
+    """ IPSec/IPv6 outbound: Policy mode test case with fast path \
+        (multiple interfaces, multiple rules)"""
+
+    def test_ipsec_spd_outbound_multiple(self):
+        # In this test case, packets in IPv4 FWD path are configured to go
+        # through IPSec outbound SPD policy lookup.
+        # Multiples rules on multiple interfaces are tested at the same time.
+        # 3x interfaces are configured, binding the same SPD to each.
+        # Each interface has 2 SPD rules (1 BYPASS and 1 DISCARD).
+        # On pg0 & pg1, the BYPASS rule is HIGH priority
+        # On pg2, the DISCARD rule is HIGH priority
+        # Traffic should be received on pg0 & pg1 and dropped on pg2.
+        self.create_interfaces(3)
+        pkt_count = 5
+        # bind SPD to all interfaces
+        self.spd_create_and_intf_add(1, self.pg_interfaces)
+        # add rules on all interfaces
+        policy_01 = self.spd_add_rem_policy(  # outbound, priority 10
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+        )
+        policy_02 = self.spd_add_rem_policy(  # outbound, priority 5
+            1,
+            self.pg0,
+            self.pg1,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=5,
+            policy_type="discard",
+        )
+
+        policy_11 = self.spd_add_rem_policy(  # outbound, priority 10
+            1,
+            self.pg1,
+            self.pg2,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="bypass",
+        )
+        policy_12 = self.spd_add_rem_policy(  # outbound, priority 5
+            1,
+            self.pg1,
+            self.pg2,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=5,
+            policy_type="discard",
+        )
+
+        policy_21 = self.spd_add_rem_policy(  # outbound, priority 5
+            1,
+            self.pg2,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=5,
+            policy_type="bypass",
+        )
+        policy_22 = self.spd_add_rem_policy(  # outbound, priority 10
+            1,
+            self.pg2,
+            self.pg0,
+            socket.IPPROTO_UDP,
+            is_out=1,
+            priority=10,
+            policy_type="discard",
+        )
+
+        # interfaces bound to an SPD, will by default drop inbound
+        # traffic with no matching policies. add catch-all inbound
+        # bypass rule to SPD:
+        self.spd_add_rem_policy(  # inbound, all interfaces
+            1,
+            None,
+            None,
+            socket.IPPROTO_UDP,
+            is_out=0,
+            priority=10,
+            policy_type="bypass",
+            all_ips=True,
+        )
+
+        # create the packet streams
+        packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
+        packets1 = self.create_stream(self.pg1, self.pg2, pkt_count)
+        packets2 = self.create_stream(self.pg2, self.pg0, pkt_count)
+        # add the streams to the source interfaces
+        self.pg0.add_stream(packets0)
+        self.pg1.add_stream(packets1)
+        self.pg2.add_stream(packets2)
+        # enable capture on all interfaces
+        for pg in self.pg_interfaces:
+            pg.enable_capture()
+        # start the packet generator
+        self.pg_start()
+
+        # get captures
+        if_caps = []
+        for pg in [self.pg1, self.pg2]:  # we are expecting captures on pg1/pg2
+            if_caps.append(pg.get_capture())
+            for packet in if_caps[-1]:
+                try:
+                    self.logger.debug(ppp("SPD - Got packet:", packet))
+                except Exception:
+                    self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                    raise
+        self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
+        self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
+
+        # verify captures that matched BYPASS rule
+        self.verify_capture(self.pg0, self.pg1, if_caps[0])
+        self.verify_capture(self.pg1, self.pg2, if_caps[1])
+        # verify that traffic to pg0 matched DISCARD rule and was dropped
+        self.pg0.assert_nothing_captured()
+        # verify all packets that were expected to match rules, matched
+        # pg0 -> pg1
+        self.verify_policy_match(pkt_count, policy_01)
+        self.verify_policy_match(0, policy_02)
+        # pg1 -> pg2
+        self.verify_policy_match(pkt_count, policy_11)
+        self.verify_policy_match(0, policy_12)
+        # pg2 -> pg0
+        self.verify_policy_match(0, policy_21)
+        self.verify_policy_match(pkt_count, policy_22)
+
+
 if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)