wireguard: notify key changes to crypto engine
[vpp.git] / test / test_flowprobe.py
index 6b27179..6090999 100644 (file)
@@ -5,25 +5,54 @@ import random
 import socket
 import unittest
 import time
 import socket
 import unittest
 import time
-import re
 
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, TCP, UDP
 from scapy.layers.inet6 import IPv6
 
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, TCP, UDP
 from scapy.layers.inet6 import IPv6
+from scapy.contrib.lacp import SlowProtocol, LACP
 
 from config import config
 
 from config import config
-from framework import tag_fixme_vpp_workers
-from framework import VppTestCase, VppTestRunner
-from framework import tag_run_solo
+from framework import VppTestCase
+from asfframework import (
+    tag_fixme_vpp_workers,
+    tag_fixme_ubuntu2204,
+    tag_fixme_debian11,
+    tag_run_solo,
+    is_distro_ubuntu2204,
+    is_distro_debian11,
+    VppTestRunner,
+)
 from vpp_object import VppObject
 from vpp_object import VppObject
-from vpp_pg_interface import CaptureTimeoutError
 from util import ppp
 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
 from vpp_ip_route import VppIpRoute, VppRoutePath
 from vpp_papi.macaddress import mac_ntop
 from socket import inet_ntop
 from vpp_papi import VppEnum
 from util import ppp
 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
 from vpp_ip_route import VppIpRoute, VppRoutePath
 from vpp_papi.macaddress import mac_ntop
 from socket import inet_ntop
 from vpp_papi import VppEnum
+from vpp_sub_interface import VppDot1ADSubint
+
+
+TMPL_COMMON_FIELD_COUNT = 6
+TMPL_L2_FIELD_COUNT = 3
+TMPL_L3_FIELD_COUNT = 4
+TMPL_L4_FIELD_COUNT = 3
+
+IPFIX_TCP_FLAGS_ID = 6
+IPFIX_SRC_TRANS_PORT_ID = 7
+IPFIX_DST_TRANS_PORT_ID = 11
+IPFIX_SRC_IP4_ADDR_ID = 8
+IPFIX_DST_IP4_ADDR_ID = 12
+IPFIX_FLOW_DIRECTION_ID = 61
+
+TCP_F_FIN = 0x01
+TCP_F_SYN = 0x02
+TCP_F_RST = 0x04
+TCP_F_PSH = 0x08
+TCP_F_ACK = 0x10
+TCP_F_URG = 0x20
+TCP_F_ECE = 0x40
+TCP_F_CWR = 0x80
 
 
 class VppCFLOW(VppObject):
 
 
 class VppCFLOW(VppObject):
@@ -39,9 +68,11 @@ class VppCFLOW(VppObject):
         mtu=1024,
         datapath="l2",
         layer="l2 l3 l4",
         mtu=1024,
         datapath="l2",
         layer="l2 l3 l4",
+        direction="tx",
     ):
         self._test = test
         self._intf = intf
     ):
         self._test = test
         self._intf = intf
+        self._intf_obj = getattr(self._test, intf)
         self._active = active
         if passive == 0 or passive < active:
             self._passive = active + 1
         self._active = active
         if passive == 0 or passive < active:
             self._passive = active + 1
@@ -49,6 +80,7 @@ class VppCFLOW(VppObject):
             self._passive = passive
         self._datapath = datapath  # l2 ip4 ip6
         self._collect = layer  # l2 l3 l4
             self._passive = passive
         self._datapath = datapath  # l2 ip4 ip6
         self._collect = layer  # l2 l3 l4
+        self._direction = direction  # rx tx both
         self._timeout = timeout
         self._mtu = mtu
         self._configured = False
         self._timeout = timeout
         self._mtu = mtu
         self._configured = False
@@ -64,7 +96,7 @@ class VppCFLOW(VppObject):
             l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
         if "l4" in self._collect.lower():
             l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
             l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
         if "l4" in self._collect.lower():
             l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
-        self._test.vapi.flowprobe_params(
+        self._test.vapi.flowprobe_set_params(
             record_flags=(l2_flag | l3_flag | l4_flag),
             active_timer=self._active,
             passive_timer=self._passive,
             record_flags=(l2_flag | l3_flag | l4_flag),
             active_timer=self._active,
             passive_timer=self._passive,
@@ -87,18 +119,32 @@ class VppCFLOW(VppObject):
             template_interval=self._timeout,
         )
 
             template_interval=self._timeout,
         )
 
-    def enable_flowprobe_feature(self):
-        self._test.vapi.ppcli(
-            "flowprobe feature add-del %s %s" % (self._intf, self._datapath)
+    def _enable_disable_flowprobe_feature(self, is_add):
+        which_map = {
+            "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
+            "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
+            "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
+        }
+        direction_map = {
+            "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
+            "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
+            "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
+        }
+        self._test.vapi.flowprobe_interface_add_del(
+            is_add=is_add,
+            which=which_map[self._datapath],
+            direction=direction_map[self._direction],
+            sw_if_index=self._intf_obj.sw_if_index,
         )
 
         )
 
+    def enable_flowprobe_feature(self):
+        self._enable_disable_flowprobe_feature(is_add=True)
+
     def disable_exporter(self):
         self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
 
     def disable_flowprobe_feature(self):
     def disable_exporter(self):
         self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
 
     def disable_flowprobe_feature(self):
-        self._test.vapi.cli(
-            "flowprobe feature add-del %s %s disable" % (self._intf, self._datapath)
-        )
+        self._enable_disable_flowprobe_feature(is_add=False)
 
     def object_id(self):
         return "ipfix-collector-%s-%s" % (self._src, self.dst)
 
     def object_id(self):
         return "ipfix-collector-%s-%s" % (self._src, self.dst)
@@ -106,15 +152,18 @@ class VppCFLOW(VppObject):
     def query_vpp_config(self):
         return self._configured
 
     def query_vpp_config(self):
         return self._configured
 
-    def verify_templates(self, decoder=None, timeout=1, count=3):
+    def verify_templates(self, decoder=None, timeout=1, count=3, field_count_in=None):
         templates = []
         self._test.assertIn(count, (1, 2, 3))
         for _ in range(count):
             p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
             self._test.assertTrue(p.haslayer(IPFIX))
         templates = []
         self._test.assertIn(count, (1, 2, 3))
         for _ in range(count):
             p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
             self._test.assertTrue(p.haslayer(IPFIX))
-            if decoder is not None and p.haslayer(Template):
+            self._test.assertTrue(p.haslayer(Template))
+            if decoder is not None:
                 templates.append(p[Template].templateID)
                 decoder.add_template(p.getlayer(Template))
                 templates.append(p[Template].templateID)
                 decoder.add_template(p.getlayer(Template))
+            if field_count_in is not None:
+                self._test.assertIn(p[Template].fieldCount, field_count_in)
         return templates
 
 
         return templates
 
 
@@ -134,6 +183,10 @@ class MethodHolder(VppTestCase):
         variables and configure VPP.
         """
         super(MethodHolder, cls).setUpClass()
         variables and configure VPP.
         """
         super(MethodHolder, cls).setUpClass()
+        if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
+            cls, "vpp"
+        ):
+            return
         try:
             # Create pg interfaces
             cls.create_pg_interfaces(range(9))
         try:
             # Create pg interfaces
             cls.create_pg_interfaces(range(9))
@@ -143,7 +196,9 @@ class MethodHolder(VppTestCase):
 
             # Create BD with MAC learning and unknown unicast flooding disabled
             # and put interfaces to this BD
 
             # Create BD with MAC learning and unknown unicast flooding disabled
             # and put interfaces to this BD
-            cls.vapi.bridge_domain_add_del(bd_id=1, uu_flood=1, learn=1)
+            cls.vapi.bridge_domain_add_del_v2(
+                bd_id=1, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
+            )
             cls.vapi.sw_interface_set_l2_bridge(
                 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
             )
             cls.vapi.sw_interface_set_l2_bridge(
                 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
             )
@@ -240,7 +295,13 @@ class MethodHolder(VppTestCase):
         return dst_if.get_capture(len(self.pkts))
 
     def verify_cflow_data_detail(
         return dst_if.get_capture(len(self.pkts))
 
     def verify_cflow_data_detail(
-        self, decoder, capture, cflow, data_set={1: "octets", 2: "packets"}, ip_ver="v4"
+        self,
+        decoder,
+        capture,
+        cflow,
+        data_set={1: "octets", 2: "packets"},
+        ip_ver="v4",
+        field_count=None,
     ):
         if self.debug_print:
             print(capture[0].show())
     ):
         if self.debug_print:
             print(capture[0].show())
@@ -249,9 +310,9 @@ class MethodHolder(VppTestCase):
             if self.debug_print:
                 print(data)
             if ip_ver == "v4":
             if self.debug_print:
                 print(data)
             if ip_ver == "v4":
-                ip_layer = capture[0][IP]
+                ip_layer = capture[0][IP] if capture[0].haslayer(IP) else None
             else:
             else:
-                ip_layer = capture[0][IPv6]
+                ip_layer = capture[0][IPv6] if capture[0].haslayer(IPv6) else None
             if data_set is not None:
                 for record in data:
                     # skip flow if ingress/egress interface is 0
             if data_set is not None:
                 for record in data:
                     # skip flow if ingress/egress interface is 0
@@ -261,8 +322,6 @@ class MethodHolder(VppTestCase):
                         continue
 
                     for field in data_set:
                         continue
 
                     for field in data_set:
-                        if field not in record.keys():
-                            continue
                         value = data_set[field]
                         if value == "octets":
                             value = ip_layer.len
                         value = data_set[field]
                         if value == "octets":
                             value = ip_layer.len
@@ -289,6 +348,9 @@ class MethodHolder(VppTestCase):
                         self.assertEqual(
                             int(binascii.hexlify(record[field]), 16), value
                         )
                         self.assertEqual(
                             int(binascii.hexlify(record[field]), 16), value
                         )
+            if field_count is not None:
+                for record in data:
+                    self.assertEqual(len(record), field_count)
 
     def verify_cflow_data_notimer(self, decoder, capture, cflows):
         idx = 0
 
     def verify_cflow_data_notimer(self, decoder, capture, cflows):
         idx = 0
@@ -322,6 +384,8 @@ class MethodHolder(VppTestCase):
 
 @tag_run_solo
 @tag_fixme_vpp_workers
 
 @tag_run_solo
 @tag_fixme_vpp_workers
+@tag_fixme_ubuntu2204
+@tag_fixme_debian11
 class Flowprobe(MethodHolder):
     """Template verification, timer tests"""
 
 class Flowprobe(MethodHolder):
     """Template verification, timer tests"""
 
@@ -357,8 +421,11 @@ class Flowprobe(MethodHolder):
         ipfix.remove_vpp_config()
         self.logger.info("FFP_TEST_FINISH_0001")
 
         ipfix.remove_vpp_config()
         self.logger.info("FFP_TEST_FINISH_0001")
 
+    @unittest.skipUnless(
+        config.extended, "Test is unstable (assertion error, needs to be fixed"
+    )
     def test_0002(self):
     def test_0002(self):
-        """timer greater than template timeout"""
+        """timer greater than template timeout [UNSTABLE, FIX ME]"""
         self.logger.info("FFP_TEST_START_0002")
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
         self.logger.info("FFP_TEST_START_0002")
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
@@ -434,6 +501,8 @@ class Flowprobe(MethodHolder):
             self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
             # egress interface
             self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
             self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
             # egress interface
             self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
+            # direction
+            self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
             # packets
             self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
             # src mac
             # packets
             self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
             # src mac
@@ -464,25 +533,178 @@ class Flowprobe(MethodHolder):
         ipfix.remove_vpp_config()
         self.logger.info("FFP_TEST_FINISH_0000")
 
         ipfix.remove_vpp_config()
         self.logger.info("FFP_TEST_FINISH_0000")
 
+    def test_flow_entry_reuse(self):
+        """Verify flow entry reuse doesn't accumulate meta info"""
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pkts = []
 
 
-@tag_fixme_vpp_workers
-class Datapath(MethodHolder):
+        # enable ip4 datapath for an interface
+        # set active and passive timers
+        ipfix = VppCFLOW(
+            test=self,
+            active=2,
+            passive=3,
+            intf="pg3",
+            layer="l3 l4",
+            datapath="ip4",
+            direction="rx",
+            mtu=100,
+        )
+        ipfix.add_vpp_config()
+
+        # template packet should arrive immediately
+        ipfix_decoder = IPFIXDecoder()
+        templates = ipfix.verify_templates(ipfix_decoder, count=1)
+
+        # make a tcp packet
+        self.pkts = [
+            (
+                Ether(src=self.pg3.remote_mac, dst=self.pg4.local_mac)
+                / IP(src=self.pg3.remote_ip4, dst=self.pg4.remote_ip4)
+                / TCP(sport=1234, dport=4321)
+                / Raw(b"\xa5" * 50)
+            )
+        ]
+
+        # send the tcp packet two times, each time with new set of flags
+        tcp_flags = (
+            TCP_F_SYN | TCP_F_ACK,
+            TCP_F_RST | TCP_F_PSH,
+        )
+        for f in tcp_flags:
+            self.pkts[0][TCP].flags = f
+            capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
+
+            # verify meta info - packet/octet delta and tcp flags
+            cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=6)
+            self.verify_cflow_data(ipfix_decoder, capture, cflow)
+            self.verify_cflow_data_detail(
+                ipfix_decoder,
+                capture,
+                cflow,
+                {
+                    IPFIX_TCP_FLAGS_ID: f,
+                    IPFIX_SRC_TRANS_PORT_ID: 1234,
+                    IPFIX_DST_TRANS_PORT_ID: 4321,
+                },
+            )
+
+        self.collector.get_capture(3)
+
+        # cleanup
+        ipfix.remove_vpp_config()
+
+    def test_interface_dump(self):
+        """Dump interfaces with IPFIX flow record generation enabled"""
+        self.logger.info("FFP_TEST_START_0003")
+
+        # Enable feature for 3 interfaces
+        ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx")
+        ipfix1.add_vpp_config()
+
+        ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx")
+        ipfix2.enable_flowprobe_feature()
+
+        ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both")
+        ipfix3.enable_flowprobe_feature()
+
+        # When request "all", dump should contain all enabled interfaces
+        dump = self.vapi.flowprobe_interface_dump()
+        self.assertEqual(len(dump), 3)
+
+        # Verify 1st interface
+        self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index)
+        self.assertEqual(
+            dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2
+        )
+        self.assertEqual(
+            dump[0].direction,
+            VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
+        )
+
+        # Verify 2nd interface
+        self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index)
+        self.assertEqual(
+            dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
+        )
+        self.assertEqual(
+            dump[1].direction,
+            VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
+        )
+
+        # Verify 3rd interface
+        self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index)
+        self.assertEqual(
+            dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6
+        )
+        self.assertEqual(
+            dump[2].direction,
+            VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
+        )
+
+        # When request 2nd interface, dump should contain only the specified interface
+        dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index)
+        self.assertEqual(len(dump), 1)
+
+        # Verify 2nd interface
+        self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index)
+        self.assertEqual(
+            dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
+        )
+        self.assertEqual(
+            dump[0].direction,
+            VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
+        )
+
+        # When request 99th interface, dump should be empty
+        dump = self.vapi.flowprobe_interface_dump(sw_if_index=99)
+        self.assertEqual(len(dump), 0)
+
+        ipfix1.remove_vpp_config()
+        ipfix2.remove_vpp_config()
+        ipfix3.remove_vpp_config()
+        self.logger.info("FFP_TEST_FINISH_0003")
+
+    def test_get_params(self):
+        """Get IPFIX flow record generation parameters"""
+        self.logger.info("FFP_TEST_START_0004")
+
+        # Enable feature for an interface with custom parameters
+        ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4")
+        ipfix.add_vpp_config()
+
+        # Get and verify parameters
+        params = self.vapi.flowprobe_get_params()
+        self.assertEqual(params.active_timer, 20)
+        self.assertEqual(params.passive_timer, 40)
+        record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
+        record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
+        record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
+        self.assertEqual(params.record_flags, record_flags)
+
+        ipfix.remove_vpp_config()
+        self.logger.info("FFP_TEST_FINISH_0004")
+
+
+class DatapathTestsHolder(object):
     """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
 
     @classmethod
     def setUpClass(cls):
     """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
 
     @classmethod
     def setUpClass(cls):
-        super(Datapath, cls).setUpClass()
+        super(DatapathTestsHolder, cls).setUpClass()
 
     @classmethod
     def tearDownClass(cls):
 
     @classmethod
     def tearDownClass(cls):
-        super(Datapath, cls).tearDownClass()
+        super(DatapathTestsHolder, cls).tearDownClass()
 
     def test_templatesL2(self):
         """verify template on L2 datapath"""
         self.logger.info("FFP_TEST_START_0000")
         self.pg_enable_capture(self.pg_interfaces)
 
 
     def test_templatesL2(self):
         """verify template on L2 datapath"""
         self.logger.info("FFP_TEST_START_0000")
         self.pg_enable_capture(self.pg_interfaces)
 
-        ipfix = VppCFLOW(test=self, layer="l2")
+        ipfix = VppCFLOW(
+            test=self, intf=self.intf1, layer="l2", direction=self.direction
+        )
         ipfix.add_vpp_config()
 
         # template packet should arrive immediately
         ipfix.add_vpp_config()
 
         # template packet should arrive immediately
@@ -499,7 +721,9 @@ class Datapath(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self, layer="l2")
+        ipfix = VppCFLOW(
+            test=self, intf=self.intf1, layer="l2", direction=self.direction
+        )
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -513,7 +737,10 @@ class Datapath(MethodHolder):
         self.vapi.ipfix_flush()
         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
         self.verify_cflow_data_detail(
         self.vapi.ipfix_flush()
         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
         self.verify_cflow_data_detail(
-            ipfix_decoder, capture, cflow, {2: "packets", 256: 8}
+            ipfix_decoder,
+            capture,
+            cflow,
+            {2: "packets", 256: 8, 61: (self.direction == "tx")},
         )
         self.collector.get_capture(2)
 
         )
         self.collector.get_capture(2)
 
@@ -526,7 +753,9 @@ class Datapath(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self, layer="l3")
+        ipfix = VppCFLOW(
+            test=self, intf=self.intf1, layer="l3", direction=self.direction
+        )
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -543,7 +772,13 @@ class Datapath(MethodHolder):
             ipfix_decoder,
             capture,
             cflow,
             ipfix_decoder,
             capture,
             cflow,
-            {2: "packets", 4: 17, 8: "src_ip", 12: "dst_ip"},
+            {
+                2: "packets",
+                4: 17,
+                8: "src_ip",
+                12: "dst_ip",
+                61: (self.direction == "tx"),
+            },
         )
 
         self.collector.get_capture(3)
         )
 
         self.collector.get_capture(3)
@@ -551,13 +786,95 @@ class Datapath(MethodHolder):
         ipfix.remove_vpp_config()
         self.logger.info("FFP_TEST_FINISH_0002")
 
         ipfix.remove_vpp_config()
         self.logger.info("FFP_TEST_FINISH_0002")
 
+    def test_L234onL2(self):
+        """L2/3/4 data on L2 datapath"""
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pkts = []
+
+        ipfix = VppCFLOW(
+            test=self, intf=self.intf1, layer="l2 l3 l4", direction=self.direction
+        )
+        ipfix.add_vpp_config()
+
+        ipfix_decoder = IPFIXDecoder()
+        # template packet should arrive immediately
+        tmpl_l2_field_count = TMPL_COMMON_FIELD_COUNT + TMPL_L2_FIELD_COUNT
+        tmpl_ip_field_count = (
+            TMPL_COMMON_FIELD_COUNT
+            + TMPL_L2_FIELD_COUNT
+            + TMPL_L3_FIELD_COUNT
+            + TMPL_L4_FIELD_COUNT
+        )
+        templates = ipfix.verify_templates(
+            ipfix_decoder,
+            count=3,
+            field_count_in=(tmpl_l2_field_count, tmpl_ip_field_count),
+        )
+
+        # verify IPv4 and IPv6 flows
+        for ip_ver in ("v4", "v6"):
+            self.create_stream(packets=1, ip_ver=ip_ver)
+            capture = self.send_packets()
+
+            # make sure the one packet we expect actually showed up
+            self.vapi.ipfix_flush()
+            cflow = self.wait_for_cflow_packet(
+                self.collector, templates[1 if ip_ver == "v4" else 2]
+            )
+            src_ip_id = 8 if ip_ver == "v4" else 27
+            dst_ip_id = 12 if ip_ver == "v4" else 28
+            self.verify_cflow_data_detail(
+                ipfix_decoder,
+                capture,
+                cflow,
+                {
+                    2: "packets",
+                    256: 8 if ip_ver == "v4" else 56710,
+                    4: 17,
+                    7: "sport",
+                    11: "dport",
+                    src_ip_id: "src_ip",
+                    dst_ip_id: "dst_ip",
+                    61: (self.direction == "tx"),
+                },
+                ip_ver=ip_ver,
+                field_count=tmpl_ip_field_count,
+            )
+
+        # verify non-IP flow
+        self.pkts = [
+            (
+                Ether(dst=self.pg2.local_mac, src=self.pg1.remote_mac)
+                / SlowProtocol()
+                / LACP()
+            )
+        ]
+        capture = self.send_packets()
+
+        # make sure the one packet we expect actually showed up
+        self.vapi.ipfix_flush()
+        cflow = self.wait_for_cflow_packet(self.collector, templates[0])
+        self.verify_cflow_data_detail(
+            ipfix_decoder,
+            capture,
+            cflow,
+            {2: "packets", 256: 2440, 61: (self.direction == "tx")},
+            field_count=tmpl_l2_field_count,
+        )
+
+        self.collector.get_capture(6)
+
+        ipfix.remove_vpp_config()
+
     def test_L4onL2(self):
         """L4 data on L2 datapath"""
         self.logger.info("FFP_TEST_START_0003")
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
     def test_L4onL2(self):
         """L4 data on L2 datapath"""
         self.logger.info("FFP_TEST_START_0003")
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self, layer="l4")
+        ipfix = VppCFLOW(
+            test=self, intf=self.intf1, layer="l4", direction=self.direction
+        )
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -571,7 +888,10 @@ class Datapath(MethodHolder):
         self.vapi.ipfix_flush()
         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
         self.verify_cflow_data_detail(
         self.vapi.ipfix_flush()
         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
         self.verify_cflow_data_detail(
-            ipfix_decoder, capture, cflow, {2: "packets", 7: "sport", 11: "dport"}
+            ipfix_decoder,
+            capture,
+            cflow,
+            {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
         )
 
         self.collector.get_capture(3)
         )
 
         self.collector.get_capture(3)
@@ -585,7 +905,9 @@ class Datapath(MethodHolder):
 
         self.pg_enable_capture(self.pg_interfaces)
 
 
         self.pg_enable_capture(self.pg_interfaces)
 
-        ipfix = VppCFLOW(test=self, datapath="ip4")
+        ipfix = VppCFLOW(
+            test=self, intf=self.intf1, datapath="ip4", direction=self.direction
+        )
         ipfix.add_vpp_config()
 
         # template packet should arrive immediately
         ipfix.add_vpp_config()
 
         # template packet should arrive immediately
@@ -603,7 +925,13 @@ class Datapath(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self, intf="pg4", layer="l2", datapath="ip4")
+        ipfix = VppCFLOW(
+            test=self,
+            intf=self.intf2,
+            layer="l2",
+            datapath="ip4",
+            direction=self.direction,
+        )
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -617,7 +945,10 @@ class Datapath(MethodHolder):
         self.vapi.ipfix_flush()
         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
         self.verify_cflow_data_detail(
         self.vapi.ipfix_flush()
         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
         self.verify_cflow_data_detail(
-            ipfix_decoder, capture, cflow, {2: "packets", 256: 8}
+            ipfix_decoder,
+            capture,
+            cflow,
+            {2: "packets", 256: 8, 61: (self.direction == "tx")},
         )
 
         # expected two templates and one cflow packet
         )
 
         # expected two templates and one cflow packet
@@ -632,7 +963,13 @@ class Datapath(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self, intf="pg4", layer="l3", datapath="ip4")
+        ipfix = VppCFLOW(
+            test=self,
+            intf=self.intf2,
+            layer="l3",
+            datapath="ip4",
+            direction=self.direction,
+        )
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -649,7 +986,13 @@ class Datapath(MethodHolder):
             ipfix_decoder,
             capture,
             cflow,
             ipfix_decoder,
             capture,
             cflow,
-            {1: "octets", 2: "packets", 8: "src_ip", 12: "dst_ip"},
+            {
+                1: "octets",
+                2: "packets",
+                8: "src_ip",
+                12: "dst_ip",
+                61: (self.direction == "tx"),
+            },
         )
 
         # expected two templates and one cflow packet
         )
 
         # expected two templates and one cflow packet
@@ -664,7 +1007,13 @@ class Datapath(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self, intf="pg4", layer="l4", datapath="ip4")
+        ipfix = VppCFLOW(
+            test=self,
+            intf=self.intf2,
+            layer="l4",
+            datapath="ip4",
+            direction=self.direction,
+        )
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -678,7 +1027,10 @@ class Datapath(MethodHolder):
         self.vapi.ipfix_flush()
         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
         self.verify_cflow_data_detail(
         self.vapi.ipfix_flush()
         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
         self.verify_cflow_data_detail(
-            ipfix_decoder, capture, cflow, {2: "packets", 7: "sport", 11: "dport"}
+            ipfix_decoder,
+            capture,
+            cflow,
+            {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
         )
 
         # expected two templates and one cflow packet
         )
 
         # expected two templates and one cflow packet
@@ -692,7 +1044,9 @@ class Datapath(MethodHolder):
         self.logger.info("FFP_TEST_START_0000")
         self.pg_enable_capture(self.pg_interfaces)
 
         self.logger.info("FFP_TEST_START_0000")
         self.pg_enable_capture(self.pg_interfaces)
 
-        ipfix = VppCFLOW(test=self, datapath="ip6")
+        ipfix = VppCFLOW(
+            test=self, intf=self.intf1, datapath="ip6", direction=self.direction
+        )
         ipfix.add_vpp_config()
 
         # template packet should arrive immediately
         ipfix.add_vpp_config()
 
         # template packet should arrive immediately
@@ -709,7 +1063,13 @@ class Datapath(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self, intf="pg6", layer="l2", datapath="ip6")
+        ipfix = VppCFLOW(
+            test=self,
+            intf=self.intf3,
+            layer="l2",
+            datapath="ip6",
+            direction=self.direction,
+        )
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -723,7 +1083,11 @@ class Datapath(MethodHolder):
         self.vapi.ipfix_flush()
         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
         self.verify_cflow_data_detail(
         self.vapi.ipfix_flush()
         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
         self.verify_cflow_data_detail(
-            ipfix_decoder, capture, cflow, {2: "packets", 256: 56710}, ip_ver="v6"
+            ipfix_decoder,
+            capture,
+            cflow,
+            {2: "packets", 256: 56710, 61: (self.direction == "tx")},
+            ip_ver="v6",
         )
 
         # expected two templates and one cflow packet
         )
 
         # expected two templates and one cflow packet
@@ -738,7 +1102,13 @@ class Datapath(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self, intf="pg6", layer="l3", datapath="ip6")
+        ipfix = VppCFLOW(
+            test=self,
+            intf=self.intf3,
+            layer="l3",
+            datapath="ip6",
+            direction=self.direction,
+        )
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -755,7 +1125,7 @@ class Datapath(MethodHolder):
             ipfix_decoder,
             capture,
             cflow,
             ipfix_decoder,
             capture,
             cflow,
-            {2: "packets", 27: "src_ip", 28: "dst_ip"},
+            {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
             ip_ver="v6",
         )
 
             ip_ver="v6",
         )
 
@@ -771,7 +1141,13 @@ class Datapath(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self, intf="pg6", layer="l4", datapath="ip6")
+        ipfix = VppCFLOW(
+            test=self,
+            intf=self.intf3,
+            layer="l4",
+            datapath="ip6",
+            direction=self.direction,
+        )
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -788,7 +1164,7 @@ class Datapath(MethodHolder):
             ipfix_decoder,
             capture,
             cflow,
             ipfix_decoder,
             capture,
             cflow,
-            {2: "packets", 7: "sport", 11: "dport"},
+            {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
             ip_ver="v6",
         )
 
             ip_ver="v6",
         )
 
@@ -804,7 +1180,7 @@ class Datapath(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self)
+        ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -824,12 +1200,12 @@ class Datapath(MethodHolder):
         self.logger.info("FFP_TEST_FINISH_0001")
 
     def test_0002(self):
         self.logger.info("FFP_TEST_FINISH_0001")
 
     def test_0002(self):
-        """no timers, two CFLOW packets (mtu=256), 3 Flows in each"""
+        """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
         self.logger.info("FFP_TEST_START_0002")
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
         self.logger.info("FFP_TEST_START_0002")
         self.pg_enable_capture(self.pg_interfaces)
         self.pkts = []
 
-        ipfix = VppCFLOW(test=self, mtu=256)
+        ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
         ipfix.add_vpp_config()
 
         ipfix_decoder = IPFIXDecoder()
@@ -852,6 +1228,97 @@ class Datapath(MethodHolder):
         self.logger.info("FFP_TEST_FINISH_0002")
 
 
         self.logger.info("FFP_TEST_FINISH_0002")
 
 
+@tag_fixme_vpp_workers
+class DatapathTx(MethodHolder, DatapathTestsHolder):
+    """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
+
+    intf1 = "pg2"
+    intf2 = "pg4"
+    intf3 = "pg6"
+    direction = "tx"
+
+    def test_rewritten_traffic(self):
+        """Rewritten traffic (from subif to ipfix if)"""
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pkts = []
+
+        # prepare a sub-interface
+        subif = VppDot1ADSubint(self, self.pg7, 0, 300, 400)
+        subif.admin_up()
+        subif.config_ip4()
+
+        # enable ip4 datapath for an interface
+        ipfix = VppCFLOW(
+            test=self,
+            intf="pg8",
+            datapath="ip4",
+            layer="l2 l3 l4",
+            direction=self.direction,
+        )
+        ipfix.add_vpp_config()
+
+        # template packet should arrive immediately
+        ipfix_decoder = IPFIXDecoder()
+        templates = ipfix.verify_templates(ipfix_decoder, count=1)
+
+        # forward some traffic through the ipfix interface
+        route = VppIpRoute(
+            self,
+            "9.0.0.0",
+            24,
+            [VppRoutePath(self.pg8.remote_ip4, self.pg8.sw_if_index)],
+        )
+        route.add_vpp_config()
+
+        # prepare an IPv4 packet (subif => ipfix interface)
+        pkt = (
+            Ether(src=subif.remote_mac, dst=self.pg7.local_mac)
+            / IP(src=subif.remote_ip4, dst="9.0.0.1")
+            / UDP(sport=1234, dport=4321)
+            / Raw(b"\xa5" * 123)
+        )
+        self.pkts = [
+            subif.add_dot1ad_layer(pkt, 300, 400),
+        ]
+
+        # send the packet
+        capture = self.send_packets(self.pg7, self.pg8)
+
+        # wait for a flow and verify it
+        self.vapi.ipfix_flush()
+        cflow = self.wait_for_cflow_packet(self.collector, templates[0])
+        self.verify_cflow_data(ipfix_decoder, capture, cflow)
+        self.verify_cflow_data_detail(
+            ipfix_decoder,
+            capture,
+            cflow,
+            {
+                IPFIX_SRC_IP4_ADDR_ID: "src_ip",
+                IPFIX_DST_IP4_ADDR_ID: "dst_ip",
+                IPFIX_SRC_TRANS_PORT_ID: "sport",
+                IPFIX_DST_TRANS_PORT_ID: "dport",
+                IPFIX_FLOW_DIRECTION_ID: (self.direction == "tx"),
+            },
+        )
+
+        self.collector.get_capture(2)
+
+        # cleanup
+        route.remove_vpp_config()
+        subif.remove_vpp_config()
+        ipfix.remove_vpp_config()
+
+
+@tag_fixme_vpp_workers
+class DatapathRx(MethodHolder, DatapathTestsHolder):
+    """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
+
+    intf1 = "pg1"
+    intf2 = "pg3"
+    intf3 = "pg5"
+    direction = "rx"
+
+
 @unittest.skipUnless(config.extended, "part of extended tests")
 class DisableIPFIX(MethodHolder):
     """Disable IPFIX"""
 @unittest.skipUnless(config.extended, "part of extended tests")
 class DisableIPFIX(MethodHolder):
     """Disable IPFIX"""
@@ -1010,9 +1477,70 @@ class DisableFP(MethodHolder):
         self.sleep(1, "wait before verifying no packets sent")
         self.collector.assert_nothing_captured()
 
         self.sleep(1, "wait before verifying no packets sent")
         self.collector.assert_nothing_captured()
 
+        # enable FPP feature so the remove_vpp_config() doesn't fail
+        # due to missing feature on interface.
+        ipfix.enable_flowprobe_feature()
+
         ipfix.remove_vpp_config()
         self.logger.info("FFP_TEST_FINISH_0001")
 
         ipfix.remove_vpp_config()
         self.logger.info("FFP_TEST_FINISH_0001")
 
+    def test_no_leftover_flows_after_disabling(self):
+        """disable flowprobe feature and expect no leftover flows"""
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pkts = []
+
+        # enable ip4 datapath for an interface
+        # set active and passive timers
+        ipfix = VppCFLOW(
+            test=self,
+            active=3,
+            passive=4,
+            intf="pg3",
+            layer="l3",
+            datapath="ip4",
+            direction="rx",
+            mtu=100,
+        )
+        ipfix.add_vpp_config()
+
+        # template packet should arrive immediately
+        ipfix.verify_templates(count=1)
+
+        # send some ip4 packets
+        self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5)
+        self.send_packets(src_if=self.pg3, dst_if=self.pg4)
+
+        # disable feature for the interface
+        # currently stored ip4 flows should be removed
+        ipfix.disable_flowprobe_feature()
+
+        # no leftover ip4 flows are expected
+        self.pg_enable_capture([self.collector])
+        self.sleep(12, "wait for leftover ip4 flows during three passive intervals")
+        self.collector.assert_nothing_captured()
+
+        # re-enable feature for the interface
+        ipfix.enable_flowprobe_feature()
+
+        # template packet should arrive immediately
+        ipfix_decoder = IPFIXDecoder()
+        self.vapi.ipfix_flush()
+        templates = ipfix.verify_templates(ipfix_decoder, count=1)
+
+        # send some ip4 packets
+        self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5)
+        capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
+
+        # verify meta info - packet/octet delta
+        self.vapi.ipfix_flush()
+        cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=8)
+        self.verify_cflow_data(ipfix_decoder, capture, cflow)
+
+        self.collector.get_capture(2)
+
+        # cleanup
+        ipfix.remove_vpp_config()
+
 
 @unittest.skipUnless(config.extended, "part of extended tests")
 class ReenableFP(MethodHolder):
 
 @unittest.skipUnless(config.extended, "part of extended tests")
 class ReenableFP(MethodHolder):