ipsec: modify IPsec related tests to send and verify UDP-encapsulated ESP traffics
[vpp.git] / test / template_ipsec.py
index c438fbb..4d4e4c6 100644 (file)
@@ -2806,20 +2806,48 @@ class IPSecIPv4Fwd(VppTestCase):
         self.logger.info(self.vapi.ppcli("show ipsec all"))
         return spdEntry
 
-    def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
+    def create_stream(
+        self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP"
+    ):
         packets = []
+        # create SA
+        sa = SecurityAssociation(
+            ESP,
+            spi=1000,
+            crypt_algo="AES-CBC",
+            crypt_key=b"JPjyOWBeVEQiMe7h",
+            auth_algo="HMAC-SHA1-96",
+            auth_key=b"C91KUR9GYMm5GfkEvNjX",
+            tunnel_header=IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
+            nat_t_header=UDP(sport=src_prt, dport=dst_prt),
+        )
         for i in range(pkt_count):
             # create packet info stored in the test case instance
             info = self.create_packet_info(src_if, dst_if)
             # convert the info into packet payload
             payload = self.info_to_payload(info)
             # create the packet itself
-            p = (
-                Ether(dst=src_if.local_mac, src=src_if.remote_mac)
-                / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
-                / UDP(sport=src_prt, dport=dst_prt)
-                / Raw(payload)
-            )
+            p = []
+            if proto == "UDP-ESP":
+                p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) / sa.encrypt(
+                    IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+                    / UDP(sport=src_prt, dport=dst_prt)
+                    / Raw(payload)
+                )
+            elif proto == "UDP":
+                p = (
+                    Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+                    / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+                    / UDP(sport=src_prt, dport=dst_prt)
+                    / Raw(payload)
+                )
+            elif proto == "TCP":
+                p = (
+                    Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+                    / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+                    / TCP(sport=src_prt, dport=dst_prt)
+                    / Raw(payload)
+                )
             # store a copy of the packet in the packet info
             info.data = p.copy()
             # append the packet to the list
@@ -2873,6 +2901,50 @@ class IPSecIPv4Fwd(VppTestCase):
         self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
         self.assert_equal(pkt_count, matched_pkts)
 
+    # Method verify_l3_l4_capture() will verify network and transport layer
+    # fields of the packet sa.encrypt() gives interface number garbadge.
+    # thus interface validation get failed (scapy bug?). However our intent
+    # is to verify IP layer and above and that is covered.
+
+    def verify_l3_l4_capture(
+        self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+    ):
+        for packet in capture:
+            try:
+                self.assert_packet_checksums_valid(packet)
+                self.assert_equal(
+                    packet[IP].src,
+                    src_if.remote_ip4,
+                    "decrypted packet source address",
+                )
+                self.assert_equal(
+                    packet[IP].dst,
+                    dst_if.remote_ip4,
+                    "decrypted packet destination address",
+                )
+                if packet.haslayer(TCP):
+                    self.assertFalse(
+                        packet.haslayer(UDP),
+                        "unexpected UDP header in decrypted packet",
+                    )
+                elif packet.haslayer(UDP):
+                    if packet[UDP].payload:
+                        self.assertFalse(
+                            packet[UDP][1].haslayer(UDP),
+                            "unexpected UDP header in decrypted packet",
+                        )
+                else:
+                    self.assertFalse(
+                        packet.haslayer(UDP),
+                        "unexpected UDP header in decrypted packet",
+                    )
+                    self.assert_equal(
+                        packet[ICMP].id, self.icmp_id_in, "decrypted packet ICMP ID"
+                    )
+            except Exception:
+                self.logger.error(ppp("Unexpected or invalid plain packet:", packet))
+                raise
+
 
 class SpdFlowCacheTemplate(IPSecIPv4Fwd):
     @classmethod
@@ -2948,6 +3020,84 @@ class SpdFlowCacheTemplate(IPSecIPv4Fwd):
             self.logger.info("\ncrc32 NOT supported:\n" + cpu_info)
             return False
 
+    def create_stream(
+        cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
+    ):
+        packets = []
+        packets = super(SpdFlowCacheTemplate, cls).create_stream(
+            src_if, dst_if, pkt_count, src_prt, dst_prt, proto
+        )
+        return packets
+
+    def verify_capture(
+        self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+    ):
+        super(SpdFlowCacheTemplate, self).verify_l3_l4_capture(
+            src_if, dst_if, capture, tcp_port_in, udp_port_in
+        )
+
+
+class SpdFastPathTemplate(IPSecIPv4Fwd):
+    @classmethod
+    def setUpConstants(cls):
+        super(SpdFastPathTemplate, cls).setUpConstants()
+        # Override this method with required cmdline parameters e.g.
+        # cls.vpp_cmdline.extend(["ipsec", "{",
+        #                         "ipv4-outbound-spd-flow-cache on",
+        #                         "}"])
+        # cls.logger.info("VPP modified cmdline is %s" % " "
+        #                 .join(cls.vpp_cmdline))
+
+    def setUp(self):
+        super(SpdFastPathTemplate, self).setUp()
+
+    def tearDown(self):
+        super(SpdFastPathTemplate, self).tearDown()
+
+    def create_stream(
+        cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
+    ):
+        packets = []
+        packets = super(SpdFastPathTemplate, cls).create_stream(
+            src_if, dst_if, pkt_count, src_prt, dst_prt, proto
+        )
+        return packets
+
+    def verify_capture(
+        self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+    ):
+        super(SpdFastPathTemplate, self).verify_l3_l4_capture(
+            src_if, dst_if, capture, tcp_port_in, udp_port_in
+        )
+
+
+class IpsecDefaultTemplate(IPSecIPv4Fwd):
+    @classmethod
+    def setUpConstants(cls):
+        super(IpsecDefaultTemplate, cls).setUpConstants()
+
+    def setUp(self):
+        super(IpsecDefaultTemplate, self).setUp()
+
+    def tearDown(self):
+        super(IpsecDefaultTemplate, self).tearDown()
+
+    def create_stream(
+        cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
+    ):
+        packets = []
+        packets = super(IpsecDefaultTemplate, cls).create_stream(
+            src_if, dst_if, pkt_count, src_prt, dst_prt, proto
+        )
+        return packets
+
+    def verify_capture(
+        self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+    ):
+        super(IpsecDefaultTemplate, self).verify_l3_l4_capture(
+            src_if, dst_if, capture, tcp_port_in, udp_port_in
+        )
+
 
 class IPSecIPv6Fwd(VppTestCase):
     """Test IPSec by capturing and verifying IPv6 forwarded pkts"""