ipsec: modify IPsec related tests to send and verify UDP-encapsulated ESP traffics 92/38792/9
authorvinay Tripathi <vinayx.tripathi@intel.com>
Fri, 20 Oct 2023 05:20:47 +0000 (05:20 +0000)
committerFan Zhang <fanzhang.oss@gmail.com>
Tue, 31 Oct 2023 10:33:13 +0000 (10:33 +0000)
In this patch, IPsec related test files have been modified to send UDP-encapsulated
ESP packets,and validate against Inbound and Outbound policies that are configured
with Bypass, Discard and Protect action.

Type: test

Change-Id: I4b8da18270fd177868223bfe1389dc9c50e86cc5
Signed-off-by: vinay Tripathi <vinayx.tripathi@intel.com>
test/asf/test_ipsec_default.py
test/template_ipsec.py
test/test_ipsec_spd_fp_input.py

index e97e2ef..2fefb77 100644 (file)
@@ -3,7 +3,7 @@ import unittest
 
 from util import ppp
 from asfframework import VppTestRunner
-from template_ipsec import IPSecIPv4Fwd
+from template_ipsec import IpsecDefaultTemplate
 
 """
 When an IPSec SPD is configured on an interface, any inbound packets
@@ -32,7 +32,7 @@ packets are dropped as expected.
 """
 
 
-class IPSecInboundDefaultDrop(IPSecIPv4Fwd):
+class IPSecInboundDefaultDrop(IpsecDefaultTemplate):
     """IPSec: inbound packets drop by default with no matching rule"""
 
     def test_ipsec_inbound_default_drop(self):
@@ -114,7 +114,7 @@ class IPSecInboundDefaultDrop(IPSecIPv4Fwd):
         self.verify_policy_match(pkt_count, inbound_policy)
 
 
-class IPSecOutboundDefaultDrop(IPSecIPv4Fwd):
+class IPSecOutboundDefaultDrop(IpsecDefaultTemplate):
     """IPSec: outbound packets drop by default with no matching rule"""
 
     def test_ipsec_inbound_default_drop(self):
index c438fbb..4d4e4c6 100644 (file)
@@ -2806,20 +2806,48 @@ class IPSecIPv4Fwd(VppTestCase):
         self.logger.info(self.vapi.ppcli("show ipsec all"))
         return spdEntry
 
-    def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
+    def create_stream(
+        self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP"
+    ):
         packets = []
+        # create SA
+        sa = SecurityAssociation(
+            ESP,
+            spi=1000,
+            crypt_algo="AES-CBC",
+            crypt_key=b"JPjyOWBeVEQiMe7h",
+            auth_algo="HMAC-SHA1-96",
+            auth_key=b"C91KUR9GYMm5GfkEvNjX",
+            tunnel_header=IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
+            nat_t_header=UDP(sport=src_prt, dport=dst_prt),
+        )
         for i in range(pkt_count):
             # create packet info stored in the test case instance
             info = self.create_packet_info(src_if, dst_if)
             # convert the info into packet payload
             payload = self.info_to_payload(info)
             # create the packet itself
-            p = (
-                Ether(dst=src_if.local_mac, src=src_if.remote_mac)
-                / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
-                / UDP(sport=src_prt, dport=dst_prt)
-                / Raw(payload)
-            )
+            p = []
+            if proto == "UDP-ESP":
+                p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) / sa.encrypt(
+                    IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+                    / UDP(sport=src_prt, dport=dst_prt)
+                    / Raw(payload)
+                )
+            elif proto == "UDP":
+                p = (
+                    Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+                    / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+                    / UDP(sport=src_prt, dport=dst_prt)
+                    / Raw(payload)
+                )
+            elif proto == "TCP":
+                p = (
+                    Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+                    / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+                    / TCP(sport=src_prt, dport=dst_prt)
+                    / Raw(payload)
+                )
             # store a copy of the packet in the packet info
             info.data = p.copy()
             # append the packet to the list
@@ -2873,6 +2901,50 @@ class IPSecIPv4Fwd(VppTestCase):
         self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
         self.assert_equal(pkt_count, matched_pkts)
 
+    # Method verify_l3_l4_capture() will verify network and transport layer
+    # fields of the packet sa.encrypt() gives interface number garbadge.
+    # thus interface validation get failed (scapy bug?). However our intent
+    # is to verify IP layer and above and that is covered.
+
+    def verify_l3_l4_capture(
+        self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+    ):
+        for packet in capture:
+            try:
+                self.assert_packet_checksums_valid(packet)
+                self.assert_equal(
+                    packet[IP].src,
+                    src_if.remote_ip4,
+                    "decrypted packet source address",
+                )
+                self.assert_equal(
+                    packet[IP].dst,
+                    dst_if.remote_ip4,
+                    "decrypted packet destination address",
+                )
+                if packet.haslayer(TCP):
+                    self.assertFalse(
+                        packet.haslayer(UDP),
+                        "unexpected UDP header in decrypted packet",
+                    )
+                elif packet.haslayer(UDP):
+                    if packet[UDP].payload:
+                        self.assertFalse(
+                            packet[UDP][1].haslayer(UDP),
+                            "unexpected UDP header in decrypted packet",
+                        )
+                else:
+                    self.assertFalse(
+                        packet.haslayer(UDP),
+                        "unexpected UDP header in decrypted packet",
+                    )
+                    self.assert_equal(
+                        packet[ICMP].id, self.icmp_id_in, "decrypted packet ICMP ID"
+                    )
+            except Exception:
+                self.logger.error(ppp("Unexpected or invalid plain packet:", packet))
+                raise
+
 
 class SpdFlowCacheTemplate(IPSecIPv4Fwd):
     @classmethod
@@ -2948,6 +3020,84 @@ class SpdFlowCacheTemplate(IPSecIPv4Fwd):
             self.logger.info("\ncrc32 NOT supported:\n" + cpu_info)
             return False
 
+    def create_stream(
+        cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
+    ):
+        packets = []
+        packets = super(SpdFlowCacheTemplate, cls).create_stream(
+            src_if, dst_if, pkt_count, src_prt, dst_prt, proto
+        )
+        return packets
+
+    def verify_capture(
+        self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+    ):
+        super(SpdFlowCacheTemplate, self).verify_l3_l4_capture(
+            src_if, dst_if, capture, tcp_port_in, udp_port_in
+        )
+
+
+class SpdFastPathTemplate(IPSecIPv4Fwd):
+    @classmethod
+    def setUpConstants(cls):
+        super(SpdFastPathTemplate, cls).setUpConstants()
+        # Override this method with required cmdline parameters e.g.
+        # cls.vpp_cmdline.extend(["ipsec", "{",
+        #                         "ipv4-outbound-spd-flow-cache on",
+        #                         "}"])
+        # cls.logger.info("VPP modified cmdline is %s" % " "
+        #                 .join(cls.vpp_cmdline))
+
+    def setUp(self):
+        super(SpdFastPathTemplate, self).setUp()
+
+    def tearDown(self):
+        super(SpdFastPathTemplate, self).tearDown()
+
+    def create_stream(
+        cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
+    ):
+        packets = []
+        packets = super(SpdFastPathTemplate, cls).create_stream(
+            src_if, dst_if, pkt_count, src_prt, dst_prt, proto
+        )
+        return packets
+
+    def verify_capture(
+        self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+    ):
+        super(SpdFastPathTemplate, self).verify_l3_l4_capture(
+            src_if, dst_if, capture, tcp_port_in, udp_port_in
+        )
+
+
+class IpsecDefaultTemplate(IPSecIPv4Fwd):
+    @classmethod
+    def setUpConstants(cls):
+        super(IpsecDefaultTemplate, cls).setUpConstants()
+
+    def setUp(self):
+        super(IpsecDefaultTemplate, self).setUp()
+
+    def tearDown(self):
+        super(IpsecDefaultTemplate, self).tearDown()
+
+    def create_stream(
+        cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
+    ):
+        packets = []
+        packets = super(IpsecDefaultTemplate, cls).create_stream(
+            src_if, dst_if, pkt_count, src_prt, dst_prt, proto
+        )
+        return packets
+
+    def verify_capture(
+        self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+    ):
+        super(IpsecDefaultTemplate, self).verify_l3_l4_capture(
+            src_if, dst_if, capture, tcp_port_in, udp_port_in
+        )
+
 
 class IPSecIPv6Fwd(VppTestCase):
     """Test IPSec by capturing and verifying IPv6 forwarded pkts"""
index d70623e..0380032 100644 (file)
@@ -4,9 +4,9 @@ import ipaddress
 
 from util import ppp
 from framework import VppTestRunner
-from template_ipsec import IPSecIPv4Fwd
 from template_ipsec import IPSecIPv6Fwd
 from test_ipsec_esp import TemplateIpsecEsp
+from template_ipsec import SpdFastPathTemplate
 
 
 def debug_signal_handler(signal, frame):
@@ -20,7 +20,7 @@ import signal
 signal.signal(signal.SIGINT, debug_signal_handler)
 
 
-class SpdFastPathInbound(IPSecIPv4Fwd):
+class SpdFastPathInbound(SpdFastPathTemplate):
     # In test cases derived from this class, packets in IPv4 FWD path
     # are configured to go through IPSec inbound SPD policy lookup.
     # Note that order in which the rules are applied is