flowprobe: fix sending L2 flows using L2_IP6 template
[vpp.git] / test / test_flowprobe.py
index 1d565dc..19571f7 100644 (file)
@@ -11,6 +11,7 @@ from scapy.packet import Raw
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, TCP, UDP
 from scapy.layers.inet6 import IPv6
+from scapy.contrib.lacp import SlowProtocol, LACP
 
 from config import config
 from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11
@@ -273,9 +274,9 @@ class MethodHolder(VppTestCase):
             if self.debug_print:
                 print(data)
             if ip_ver == "v4":
-                ip_layer = capture[0][IP]
+                ip_layer = capture[0][IP] if capture[0].haslayer(IP) else None
             else:
-                ip_layer = capture[0][IPv6]
+                ip_layer = capture[0][IPv6] if capture[0].haslayer(IPv6) else None
             if data_set is not None:
                 for record in data:
                     # skip flow if ingress/egress interface is 0
@@ -682,6 +683,71 @@ class DatapathTestsHolder(object):
         ipfix.remove_vpp_config()
         self.logger.info("FFP_TEST_FINISH_0002")
 
+    def test_L23onL2(self):
+        """L2/3 data on L2 datapath"""
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pkts = []
+
+        ipfix = VppCFLOW(
+            test=self, intf=self.intf1, layer="l2 l3", direction=self.direction
+        )
+        ipfix.add_vpp_config()
+
+        ipfix_decoder = IPFIXDecoder()
+        # template packet should arrive immediately
+        templates = ipfix.verify_templates(ipfix_decoder, count=3)
+
+        # verify IPv4 and IPv6 flows
+        for ip_ver in ("v4", "v6"):
+            self.create_stream(packets=1, ip_ver=ip_ver)
+            capture = self.send_packets()
+
+            # make sure the one packet we expect actually showed up
+            self.vapi.ipfix_flush()
+            cflow = self.wait_for_cflow_packet(
+                self.collector, templates[1 if ip_ver == "v4" else 2]
+            )
+            src_ip_id = 8 if ip_ver == "v4" else 27
+            dst_ip_id = 12 if ip_ver == "v4" else 28
+            self.verify_cflow_data_detail(
+                ipfix_decoder,
+                capture,
+                cflow,
+                {
+                    2: "packets",
+                    256: 8 if ip_ver == "v4" else 56710,
+                    4: 17,
+                    src_ip_id: "src_ip",
+                    dst_ip_id: "dst_ip",
+                    61: (self.direction == "tx"),
+                },
+                ip_ver=ip_ver,
+            )
+
+        # verify non-IP flow
+        self.pkts = [
+            (
+                Ether(dst=self.pg2.local_mac, src=self.pg1.remote_mac)
+                / SlowProtocol()
+                / LACP()
+            )
+        ]
+        capture = self.send_packets()
+
+        # make sure the one packet we expect actually showed up
+        self.vapi.ipfix_flush()
+        cflow = self.wait_for_cflow_packet(self.collector, templates[0])
+        self.verify_cflow_data_detail(
+            ipfix_decoder,
+            capture,
+            cflow,
+            {2: "packets", 256: 2440, 61: (self.direction == "tx")},
+        )
+
+        self.collector.get_capture(6)
+
+        ipfix.remove_vpp_config()
+
     def test_L4onL2(self):
         """L4 data on L2 datapath"""
         self.logger.info("FFP_TEST_START_0003")