from scapy.layers.inet6 import IPv6
from config import config
-from framework import tag_fixme_vpp_workers
+from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11
+from framework import is_distro_ubuntu2204, is_distro_debian11
from framework import VppTestCase, VppTestRunner
from framework import tag_run_solo
from vpp_object import VppObject
mtu=1024,
datapath="l2",
layer="l2 l3 l4",
+ direction="tx",
):
self._test = test
self._intf = intf
+ self._intf_obj = getattr(self._test, intf)
self._active = active
if passive == 0 or passive < active:
self._passive = active + 1
self._passive = passive
self._datapath = datapath # l2 ip4 ip6
self._collect = layer # l2 l3 l4
+ self._direction = direction # rx tx both
self._timeout = timeout
self._mtu = mtu
self._configured = False
l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
if "l4" in self._collect.lower():
l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
- self._test.vapi.flowprobe_params(
+ self._test.vapi.flowprobe_set_params(
record_flags=(l2_flag | l3_flag | l4_flag),
active_timer=self._active,
passive_timer=self._passive,
template_interval=self._timeout,
)
- def enable_flowprobe_feature(self):
- self._test.vapi.ppcli(
- "flowprobe feature add-del %s %s" % (self._intf, self._datapath)
+ def _enable_disable_flowprobe_feature(self, is_add):
+ which_map = {
+ "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
+ "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
+ "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
+ }
+ direction_map = {
+ "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
+ "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
+ "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
+ }
+ self._test.vapi.flowprobe_interface_add_del(
+ is_add=is_add,
+ which=which_map[self._datapath],
+ direction=direction_map[self._direction],
+ sw_if_index=self._intf_obj.sw_if_index,
)
+ def enable_flowprobe_feature(self):
+ self._enable_disable_flowprobe_feature(is_add=True)
+
def disable_exporter(self):
self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
def disable_flowprobe_feature(self):
- self._test.vapi.cli(
- "flowprobe feature add-del %s %s disable" % (self._intf, self._datapath)
- )
+ self._enable_disable_flowprobe_feature(is_add=False)
def object_id(self):
return "ipfix-collector-%s-%s" % (self._src, self.dst)
variables and configure VPP.
"""
super(MethodHolder, cls).setUpClass()
+ if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
+ cls, "vpp"
+ ):
+ return
try:
# Create pg interfaces
cls.create_pg_interfaces(range(9))
# Create BD with MAC learning and unknown unicast flooding disabled
# and put interfaces to this BD
- cls.vapi.bridge_domain_add_del(bd_id=1, uu_flood=1, learn=1)
+ cls.vapi.bridge_domain_add_del_v2(
+ bd_id=1, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
+ )
cls.vapi.sw_interface_set_l2_bridge(
rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
)
continue
for field in data_set:
- if field not in record.keys():
- continue
value = data_set[field]
if value == "octets":
value = ip_layer.len
@tag_run_solo
@tag_fixme_vpp_workers
+@tag_fixme_ubuntu2204
+@tag_fixme_debian11
class Flowprobe(MethodHolder):
"""Template verification, timer tests"""
self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
# egress interface
self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
+ # direction
+ self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
# packets
self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
# src mac
ipfix.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0000")
+ def test_interface_dump(self):
+ """Dump interfaces with IPFIX flow record generation enabled"""
+ self.logger.info("FFP_TEST_START_0003")
-@tag_fixme_vpp_workers
-class Datapath(MethodHolder):
+ # Enable feature for 3 interfaces
+ ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx")
+ ipfix1.add_vpp_config()
+
+ ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx")
+ ipfix2.enable_flowprobe_feature()
+
+ ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both")
+ ipfix3.enable_flowprobe_feature()
+
+ # When request "all", dump should contain all enabled interfaces
+ dump = self.vapi.flowprobe_interface_dump()
+ self.assertEqual(len(dump), 3)
+
+ # Verify 1st interface
+ self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index)
+ self.assertEqual(
+ dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2
+ )
+ self.assertEqual(
+ dump[0].direction,
+ VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
+ )
+
+ # Verify 2nd interface
+ self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index)
+ self.assertEqual(
+ dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
+ )
+ self.assertEqual(
+ dump[1].direction,
+ VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
+ )
+
+ # Verify 3rd interface
+ self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index)
+ self.assertEqual(
+ dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6
+ )
+ self.assertEqual(
+ dump[2].direction,
+ VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
+ )
+
+ # When request 2nd interface, dump should contain only the specified interface
+ dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index)
+ self.assertEqual(len(dump), 1)
+
+ # Verify 2nd interface
+ self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index)
+ self.assertEqual(
+ dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
+ )
+ self.assertEqual(
+ dump[0].direction,
+ VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
+ )
+
+ # When request 99th interface, dump should be empty
+ dump = self.vapi.flowprobe_interface_dump(sw_if_index=99)
+ self.assertEqual(len(dump), 0)
+
+ ipfix1.remove_vpp_config()
+ ipfix2.remove_vpp_config()
+ ipfix3.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0003")
+
+ def test_get_params(self):
+ """Get IPFIX flow record generation parameters"""
+ self.logger.info("FFP_TEST_START_0004")
+
+ # Enable feature for an interface with custom parameters
+ ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4")
+ ipfix.add_vpp_config()
+
+ # Get and verify parameters
+ params = self.vapi.flowprobe_get_params()
+ self.assertEqual(params.active_timer, 20)
+ self.assertEqual(params.passive_timer, 40)
+ record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
+ record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
+ record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
+ self.assertEqual(params.record_flags, record_flags)
+
+ ipfix.remove_vpp_config()
+ self.logger.info("FFP_TEST_FINISH_0004")
+
+
+class DatapathTestsHolder(object):
"""collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
@classmethod
def setUpClass(cls):
- super(Datapath, cls).setUpClass()
+ super(DatapathTestsHolder, cls).setUpClass()
@classmethod
def tearDownClass(cls):
- super(Datapath, cls).tearDownClass()
+ super(DatapathTestsHolder, cls).tearDownClass()
def test_templatesL2(self):
"""verify template on L2 datapath"""
self.logger.info("FFP_TEST_START_0000")
self.pg_enable_capture(self.pg_interfaces)
- ipfix = VppCFLOW(test=self, layer="l2")
+ ipfix = VppCFLOW(
+ test=self, intf=self.intf1, layer="l2", direction=self.direction
+ )
ipfix.add_vpp_config()
# template packet should arrive immediately
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, layer="l2")
+ ipfix = VppCFLOW(
+ test=self, intf=self.intf1, layer="l2", direction=self.direction
+ )
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(
- ipfix_decoder, capture, cflow, {2: "packets", 256: 8}
+ ipfix_decoder,
+ capture,
+ cflow,
+ {2: "packets", 256: 8, 61: (self.direction == "tx")},
)
self.collector.get_capture(2)
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, layer="l3")
+ ipfix = VppCFLOW(
+ test=self, intf=self.intf1, layer="l3", direction=self.direction
+ )
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
ipfix_decoder,
capture,
cflow,
- {2: "packets", 4: 17, 8: "src_ip", 12: "dst_ip"},
+ {
+ 2: "packets",
+ 4: 17,
+ 8: "src_ip",
+ 12: "dst_ip",
+ 61: (self.direction == "tx"),
+ },
)
self.collector.get_capture(3)
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, layer="l4")
+ ipfix = VppCFLOW(
+ test=self, intf=self.intf1, layer="l4", direction=self.direction
+ )
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(
- ipfix_decoder, capture, cflow, {2: "packets", 7: "sport", 11: "dport"}
+ ipfix_decoder,
+ capture,
+ cflow,
+ {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
)
self.collector.get_capture(3)
self.pg_enable_capture(self.pg_interfaces)
- ipfix = VppCFLOW(test=self, datapath="ip4")
+ ipfix = VppCFLOW(
+ test=self, intf=self.intf1, datapath="ip4", direction=self.direction
+ )
ipfix.add_vpp_config()
# template packet should arrive immediately
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, intf="pg4", layer="l2", datapath="ip4")
+ ipfix = VppCFLOW(
+ test=self,
+ intf=self.intf2,
+ layer="l2",
+ datapath="ip4",
+ direction=self.direction,
+ )
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(
- ipfix_decoder, capture, cflow, {2: "packets", 256: 8}
+ ipfix_decoder,
+ capture,
+ cflow,
+ {2: "packets", 256: 8, 61: (self.direction == "tx")},
)
# expected two templates and one cflow packet
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, intf="pg4", layer="l3", datapath="ip4")
+ ipfix = VppCFLOW(
+ test=self,
+ intf=self.intf2,
+ layer="l3",
+ datapath="ip4",
+ direction=self.direction,
+ )
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
ipfix_decoder,
capture,
cflow,
- {1: "octets", 2: "packets", 8: "src_ip", 12: "dst_ip"},
+ {
+ 1: "octets",
+ 2: "packets",
+ 8: "src_ip",
+ 12: "dst_ip",
+ 61: (self.direction == "tx"),
+ },
)
# expected two templates and one cflow packet
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, intf="pg4", layer="l4", datapath="ip4")
+ ipfix = VppCFLOW(
+ test=self,
+ intf=self.intf2,
+ layer="l4",
+ datapath="ip4",
+ direction=self.direction,
+ )
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(
- ipfix_decoder, capture, cflow, {2: "packets", 7: "sport", 11: "dport"}
+ ipfix_decoder,
+ capture,
+ cflow,
+ {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
)
# expected two templates and one cflow packet
self.logger.info("FFP_TEST_START_0000")
self.pg_enable_capture(self.pg_interfaces)
- ipfix = VppCFLOW(test=self, datapath="ip6")
+ ipfix = VppCFLOW(
+ test=self, intf=self.intf1, datapath="ip6", direction=self.direction
+ )
ipfix.add_vpp_config()
# template packet should arrive immediately
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, intf="pg6", layer="l2", datapath="ip6")
+ ipfix = VppCFLOW(
+ test=self,
+ intf=self.intf3,
+ layer="l2",
+ datapath="ip6",
+ direction=self.direction,
+ )
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
self.verify_cflow_data_detail(
- ipfix_decoder, capture, cflow, {2: "packets", 256: 56710}, ip_ver="v6"
+ ipfix_decoder,
+ capture,
+ cflow,
+ {2: "packets", 256: 56710, 61: (self.direction == "tx")},
+ ip_ver="v6",
)
# expected two templates and one cflow packet
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, intf="pg6", layer="l3", datapath="ip6")
+ ipfix = VppCFLOW(
+ test=self,
+ intf=self.intf3,
+ layer="l3",
+ datapath="ip6",
+ direction=self.direction,
+ )
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
ipfix_decoder,
capture,
cflow,
- {2: "packets", 27: "src_ip", 28: "dst_ip"},
+ {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
ip_ver="v6",
)
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, intf="pg6", layer="l4", datapath="ip6")
+ ipfix = VppCFLOW(
+ test=self,
+ intf=self.intf3,
+ layer="l4",
+ datapath="ip6",
+ direction=self.direction,
+ )
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
ipfix_decoder,
capture,
cflow,
- {2: "packets", 7: "sport", 11: "dport"},
+ {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
ip_ver="v6",
)
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self)
+ ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
self.logger.info("FFP_TEST_FINISH_0001")
def test_0002(self):
- """no timers, two CFLOW packets (mtu=256), 3 Flows in each"""
+ """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
self.logger.info("FFP_TEST_START_0002")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, mtu=256)
+ ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
self.logger.info("FFP_TEST_FINISH_0002")
+@tag_fixme_vpp_workers
+class DatapathTx(MethodHolder, DatapathTestsHolder):
+ """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
+
+ intf1 = "pg2"
+ intf2 = "pg4"
+ intf3 = "pg6"
+ direction = "tx"
+
+
+@tag_fixme_vpp_workers
+class DatapathRx(MethodHolder, DatapathTestsHolder):
+ """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
+
+ intf1 = "pg1"
+ intf2 = "pg3"
+ intf3 = "pg5"
+ direction = "rx"
+
+
@unittest.skipUnless(config.extended, "part of extended tests")
class DisableIPFIX(MethodHolder):
"""Disable IPFIX"""