X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_flowprobe.py;h=1d565dcf0a41deae7b9315beb522e51bf43d2ce3;hb=4141ded3ec876313a5c7f74a93dec3e18940180a;hp=cd6fd1c1947a8a7367dacf002538ea8e27302850;hpb=661f91fe0a6bd87040408d45d116b63c0811f4f9;p=vpp.git diff --git a/test/test_flowprobe.py b/test/test_flowprobe.py index cd6fd1c1947..1d565dcf0a4 100644 --- a/test/test_flowprobe.py +++ b/test/test_flowprobe.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 from __future__ import print_function import binascii import random @@ -12,37 +12,67 @@ from scapy.layers.l2 import Ether from scapy.layers.inet import IP, TCP, UDP from scapy.layers.inet6 import IPv6 -from framework import VppTestCase, VppTestRunner, running_extended_tests +from config import config +from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11 +from framework import is_distro_ubuntu2204, is_distro_debian11 +from framework import VppTestCase, VppTestRunner +from framework import tag_run_solo from vpp_object import VppObject from vpp_pg_interface import CaptureTimeoutError from util import ppp from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder from vpp_ip_route import VppIpRoute, VppRoutePath +from vpp_papi.macaddress import mac_ntop +from socket import inet_ntop +from vpp_papi import VppEnum class VppCFLOW(VppObject): """CFLOW object for IPFIX exporter and Flowprobe feature""" - def __init__(self, test, intf='pg2', active=0, passive=0, timeout=100, - mtu=1024, datapath='l2', layer='l2 l3 l4'): + def __init__( + self, + test, + intf="pg2", + active=0, + passive=0, + timeout=100, + mtu=1024, + datapath="l2", + layer="l2 l3 l4", + direction="tx", + ): self._test = test self._intf = intf + self._intf_obj = getattr(self._test, intf) self._active = active if passive == 0 or passive < active: - self._passive = active+1 + self._passive = active + 1 else: self._passive = passive - self._datapath = datapath # l2 ip4 ip6 - self._collect = layer # l2 l3 l4 + self._datapath = datapath # l2 ip4 ip6 + self._collect = layer # l2 l3 l4 + self._direction = direction # rx tx both self._timeout = timeout self._mtu = mtu self._configured = False def add_vpp_config(self): self.enable_exporter() - self._test.vapi.ppcli("flowprobe params record %s active %s " - "passive %s" % (self._collect, self._active, - self._passive)) + l2_flag = 0 + l3_flag = 0 + l4_flag = 0 + if "l2" in self._collect.lower(): + l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2 + if "l3" in self._collect.lower(): + l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3 + if "l4" in self._collect.lower(): + l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4 + self._test.vapi.flowprobe_set_params( + record_flags=(l2_flag | l3_flag | l4_flag), + active_timer=self._active, + passive_timer=self._passive, + ) self.enable_flowprobe_feature() self._test.vapi.cli("ipfix flush") self._configured = True @@ -55,43 +85,50 @@ class VppCFLOW(VppObject): def enable_exporter(self): self._test.vapi.set_ipfix_exporter( - collector_address=self._test.pg0.remote_ip4n, - src_address=self._test.pg0.local_ip4n, + collector_address=self._test.pg0.remote_ip4, + src_address=self._test.pg0.local_ip4, path_mtu=self._mtu, - template_interval=self._timeout) + template_interval=self._timeout, + ) + + def _enable_disable_flowprobe_feature(self, is_add): + which_map = { + "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2, + "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4, + "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6, + } + direction_map = { + "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX, + "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX, + "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH, + } + self._test.vapi.flowprobe_interface_add_del( + is_add=is_add, + which=which_map[self._datapath], + direction=direction_map[self._direction], + sw_if_index=self._intf_obj.sw_if_index, + ) def enable_flowprobe_feature(self): - self._test.vapi.ppcli("flowprobe feature add-del %s %s" % - (self._intf, self._datapath)) + self._enable_disable_flowprobe_feature(is_add=True) def disable_exporter(self): self._test.vapi.cli("set ipfix exporter collector 0.0.0.0") def disable_flowprobe_feature(self): - self._test.vapi.cli("flowprobe feature add-del %s %s disable" % - (self._intf, self._datapath)) + self._enable_disable_flowprobe_feature(is_add=False) def object_id(self): - return "ipfix-collector-%s" % (self._src, self.dst) + return "ipfix-collector-%s-%s" % (self._src, self.dst) def query_vpp_config(self): return self._configured def verify_templates(self, decoder=None, timeout=1, count=3): templates = [] - p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout) - self._test.assertTrue(p.haslayer(IPFIX)) - if decoder is not None and p.haslayer(Template): - templates.append(p[Template].templateID) - decoder.add_template(p.getlayer(Template)) - if count > 1: - p = self._test.wait_for_cflow_packet(self._test.collector, 2) - self._test.assertTrue(p.haslayer(IPFIX)) - if decoder is not None and p.haslayer(Template): - templates.append(p[Template].templateID) - decoder.add_template(p.getlayer(Template)) - if count > 2: - p = self._test.wait_for_cflow_packet(self._test.collector, 2) + self._test.assertIn(count, (1, 2, 3)) + for _ in range(count): + p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout) self._test.assertTrue(p.haslayer(IPFIX)) if decoder is not None and p.haslayer(Template): templates.append(p[Template].templateID) @@ -100,7 +137,7 @@ class VppCFLOW(VppObject): class MethodHolder(VppTestCase): - """ Flow-per-packet plugin: test L2, IP4, IP6 reporting """ + """Flow-per-packet plugin: test L2, IP4, IP6 reporting""" # Test variables debug_print = False @@ -115,6 +152,10 @@ class MethodHolder(VppTestCase): variables and configure VPP. """ super(MethodHolder, cls).setUpClass() + if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr( + cls, "vpp" + ): + return try: # Create pg interfaces cls.create_pg_interfaces(range(9)) @@ -124,9 +165,15 @@ class MethodHolder(VppTestCase): # Create BD with MAC learning and unknown unicast flooding disabled # and put interfaces to this BD - cls.vapi.bridge_domain_add_del(bd_id=1, uu_flood=1, learn=1) - cls.vapi.sw_interface_set_l2_bridge(cls.pg1._sw_if_index, bd_id=1) - cls.vapi.sw_interface_set_l2_bridge(cls.pg2._sw_if_index, bd_id=1) + cls.vapi.bridge_domain_add_del_v2( + bd_id=1, uu_flood=1, learn=1, flood=1, forward=1, is_add=1 + ) + cls.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1 + ) + cls.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1 + ) # Set up all interfaces for i in cls.pg_interfaces: @@ -158,8 +205,13 @@ class MethodHolder(VppTestCase): super(MethodHolder, cls).tearDownClass() raise - def create_stream(self, src_if=None, dst_if=None, packets=None, - size=None, ip_ver='v4'): + @classmethod + def tearDownClass(cls): + super(MethodHolder, cls).tearDownClass() + + def create_stream( + self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4" + ): """Create a packet stream to tickle the plugin :param VppInterface src_if: Source interface for packet stream @@ -179,7 +231,7 @@ class MethodHolder(VppTestCase): info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(info) p = Ether(src=src_if.remote_mac, dst=src_if.local_mac) - if ip_ver == 'v4': + if ip_ver == "v4": p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) else: p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) @@ -211,60 +263,54 @@ class MethodHolder(VppTestCase): self.pg_start() return dst_if.get_capture(len(self.pkts)) - def verify_cflow_data_detail(self, decoder, capture, cflow, - data_set={1: 'octets', 2: 'packets'}, - ip_ver='v4'): + def verify_cflow_data_detail( + self, decoder, capture, cflow, data_set={1: "octets", 2: "packets"}, ip_ver="v4" + ): if self.debug_print: print(capture[0].show()) if cflow.haslayer(Data): data = decoder.decode_data_set(cflow.getlayer(Set)) if self.debug_print: print(data) - if ip_ver == 'v4': + if ip_ver == "v4": ip_layer = capture[0][IP] else: ip_layer = capture[0][IPv6] if data_set is not None: for record in data: - # skip flow if in/out gress interface is 0 + # skip flow if ingress/egress interface is 0 if int(binascii.hexlify(record[10]), 16) == 0: continue if int(binascii.hexlify(record[14]), 16) == 0: continue for field in data_set: - if field not in record.keys(): - continue value = data_set[field] - if value == 'octets': + if value == "octets": value = ip_layer.len - if ip_ver == 'v6': - value += 40 # ??? is this correct - elif value == 'packets': + if ip_ver == "v6": + value += 40 # ??? is this correct + elif value == "packets": value = 1 - elif value == 'src_ip': - if ip_ver == 'v4': - ip = socket.inet_pton(socket.AF_INET, - ip_layer.src) + elif value == "src_ip": + if ip_ver == "v4": + ip = socket.inet_pton(socket.AF_INET, ip_layer.src) else: - ip = socket.inet_pton(socket.AF_INET6, - ip_layer.src) + ip = socket.inet_pton(socket.AF_INET6, ip_layer.src) value = int(binascii.hexlify(ip), 16) - elif value == 'dst_ip': - if ip_ver == 'v4': - ip = socket.inet_pton(socket.AF_INET, - ip_layer.dst) + elif value == "dst_ip": + if ip_ver == "v4": + ip = socket.inet_pton(socket.AF_INET, ip_layer.dst) else: - ip = socket.inet_pton(socket.AF_INET6, - ip_layer.dst) + ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst) value = int(binascii.hexlify(ip), 16) - elif value == 'sport': + elif value == "sport": value = int(capture[0][UDP].sport) - elif value == 'dport': + elif value == "dport": value = int(capture[0][UDP].dport) - self.assertEqual(int(binascii.hexlify( - record[field]), 16), - value) + self.assertEqual( + int(binascii.hexlify(record[field]), 16), value + ) def verify_cflow_data_notimer(self, decoder, capture, cflows): idx = 0 @@ -277,56 +323,42 @@ class MethodHolder(VppTestCase): for rec in data: p = capture[idx] idx += 1 - self.assertEqual(p[IP].len, int( - binascii.hexlify(rec[1]), 16)) - self.assertEqual(1, int( - binascii.hexlify(rec[2]), 16)) + self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16)) + self.assertEqual(1, int(binascii.hexlify(rec[2]), 16)) self.assertEqual(len(capture), idx) - def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1, - expected=True): - """ wait for CFLOW packet and verify its correctness + def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1): + """wait for CFLOW packet and verify its correctness :param timeout: how long to wait - :returns: tuple (packet, time spent waiting for packet) """ self.logger.info("IPFIX: Waiting for CFLOW packet") - deadline = time.time() + timeout - counter = 0 # self.logger.debug(self.vapi.ppcli("show flow table")) - while True: - counter += 1 - # sanity check - self.assert_in_range(counter, 0, 100, "number of packets ignored") - time_left = deadline - time.time() - try: - if time_left < 0 and expected: - # self.logger.debug(self.vapi.ppcli("show flow table")) - raise CaptureTimeoutError( - "Packet did not arrive within timeout") - p = collector_intf.wait_for_packet(timeout=time_left) - except CaptureTimeoutError: - if expected: - # self.logger.debug(self.vapi.ppcli("show flow table")) - raise CaptureTimeoutError( - "Packet did not arrive within timeout") - else: - return - if not expected: - raise CaptureTimeoutError("Packet arrived even not expected") - self.assertEqual(p[Set].setID, set_id) - # self.logger.debug(self.vapi.ppcli("show flow table")) - self.logger.debug(ppp("IPFIX: Got packet:", p)) - break + p = collector_intf.wait_for_packet(timeout=timeout) + self.assertEqual(p[Set].setID, set_id) + # self.logger.debug(self.vapi.ppcli("show flow table")) + self.logger.debug(ppp("IPFIX: Got packet:", p)) return p +@tag_run_solo +@tag_fixme_vpp_workers +@tag_fixme_ubuntu2204 +@tag_fixme_debian11 class Flowprobe(MethodHolder): """Template verification, timer tests""" + @classmethod + def setUpClass(cls): + super(Flowprobe, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(Flowprobe, cls).tearDownClass() + def test_0001(self): - """ timer less than template timeout""" + """timer less than template timeout""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -350,7 +382,7 @@ class Flowprobe(MethodHolder): self.logger.info("FFP_TEST_FINISH_0001") def test_0002(self): - """ timer greater than template timeout""" + """timer greater than template timeout""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -383,26 +415,33 @@ class Flowprobe(MethodHolder): self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg8', datapath="ip4", - layer='l2 l3 l4', active=2) + ipfix = VppCFLOW( + test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2 + ) ipfix.add_vpp_config() - route_9001 = VppIpRoute(self, "9.0.0.0", 24, - [VppRoutePath(self.pg8._remote_hosts[0].ip4, - self.pg8.sw_if_index)]) + route_9001 = VppIpRoute( + self, + "9.0.0.0", + 24, + [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)], + ) route_9001.add_vpp_config() ipfix_decoder = IPFIXDecoder() templates = ipfix.verify_templates(ipfix_decoder, count=1) - self.pkts = [(Ether(dst=self.pg7.local_mac, - src=self.pg7.remote_mac) / - IP(src=self.pg7.remote_ip4, dst="9.0.0.100") / - TCP(sport=1234, dport=4321, flags=80) / - Raw('\xa5' * 100))] + self.pkts = [ + ( + Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac) + / IP(src=self.pg7.remote_ip4, dst="9.0.0.100") + / TCP(sport=1234, dport=4321, flags=80) + / Raw(b"\xa5" * 100) + ) + ] nowUTC = int(time.time()) - nowUNIX = nowUTC+2208988800 + nowUNIX = nowUTC + 2208988800 self.send_packets(src_if=self.pg7, dst_if=self.pg8) cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10) @@ -419,14 +458,14 @@ class Flowprobe(MethodHolder): self.assertEqual(int(binascii.hexlify(record[10]), 16), 8) # egress interface self.assertEqual(int(binascii.hexlify(record[14]), 16), 9) + # direction + self.assertEqual(int(binascii.hexlify(record[61]), 16), 1) # packets self.assertEqual(int(binascii.hexlify(record[2]), 16), 1) # src mac - self.assertEqual(':'.join(re.findall('..', record[56].encode( - 'hex'))), self.pg8.local_mac) + self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac) # dst mac - self.assertEqual(':'.join(re.findall('..', record[80].encode( - 'hex'))), self.pg8.remote_mac) + self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac) flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32 # flow start timestamp self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1) @@ -436,15 +475,9 @@ class Flowprobe(MethodHolder): # ethernet type self.assertEqual(int(binascii.hexlify(record[256]), 16), 8) # src ip - self.assertEqual('.'.join(re.findall('..', record[8].encode( - 'hex'))), - '.'.join('{:02x}'.format(int(n)) for n in - self.pg7.remote_ip4.split('.'))) + self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4) # dst ip - self.assertEqual('.'.join(re.findall('..', record[12].encode( - 'hex'))), - '.'.join('{:02x}'.format(int(n)) for n in - "9.0.0.100".split('.'))) + self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100") # protocol (TCP) self.assertEqual(int(binascii.hexlify(record[4]), 16), 6) # src port @@ -457,20 +490,121 @@ class Flowprobe(MethodHolder): ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0000") + def test_interface_dump(self): + """Dump interfaces with IPFIX flow record generation enabled""" + self.logger.info("FFP_TEST_START_0003") + + # Enable feature for 3 interfaces + ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx") + ipfix1.add_vpp_config() + + ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx") + ipfix2.enable_flowprobe_feature() + + ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both") + ipfix3.enable_flowprobe_feature() + + # When request "all", dump should contain all enabled interfaces + dump = self.vapi.flowprobe_interface_dump() + self.assertEqual(len(dump), 3) + + # Verify 1st interface + self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index) + self.assertEqual( + dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2 + ) + self.assertEqual( + dump[0].direction, + VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX, + ) + + # Verify 2nd interface + self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index) + self.assertEqual( + dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4 + ) + self.assertEqual( + dump[1].direction, + VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX, + ) + + # Verify 3rd interface + self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index) + self.assertEqual( + dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6 + ) + self.assertEqual( + dump[2].direction, + VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH, + ) + + # When request 2nd interface, dump should contain only the specified interface + dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index) + self.assertEqual(len(dump), 1) + + # Verify 2nd interface + self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index) + self.assertEqual( + dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4 + ) + self.assertEqual( + dump[0].direction, + VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX, + ) + + # When request 99th interface, dump should be empty + dump = self.vapi.flowprobe_interface_dump(sw_if_index=99) + self.assertEqual(len(dump), 0) + + ipfix1.remove_vpp_config() + ipfix2.remove_vpp_config() + ipfix3.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0003") + + def test_get_params(self): + """Get IPFIX flow record generation parameters""" + self.logger.info("FFP_TEST_START_0004") + + # Enable feature for an interface with custom parameters + ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4") + ipfix.add_vpp_config() + + # Get and verify parameters + params = self.vapi.flowprobe_get_params() + self.assertEqual(params.active_timer, 20) + self.assertEqual(params.passive_timer, 40) + record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2 + record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3 + record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4 + self.assertEqual(params.record_flags, record_flags) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0004") -class Datapath(MethodHolder): + +class DatapathTestsHolder(object): """collect information on Ethernet, IP4 and IP6 datapath (no timers)""" + @classmethod + def setUpClass(cls): + super(DatapathTestsHolder, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(DatapathTestsHolder, cls).tearDownClass() + def test_templatesL2(self): - """ verify template on L2 datapath""" + """verify template on L2 datapath""" self.logger.info("FFP_TEST_START_0000") self.pg_enable_capture(self.pg_interfaces) - ipfix = VppCFLOW(test=self, layer='l2') + ipfix = VppCFLOW( + test=self, intf=self.intf1, layer="l2", direction=self.direction + ) ipfix.add_vpp_config() # template packet should arrive immediately - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() ipfix.verify_templates(timeout=3, count=1) self.collector.get_capture(1) @@ -478,12 +612,14 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0000") def test_L2onL2(self): - """ L2 data on L2 datapath""" + """L2 data on L2 datapath""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, layer='l2') + ipfix = VppCFLOW( + test=self, intf=self.intf1, layer="l2", direction=self.direction + ) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() @@ -494,22 +630,28 @@ class Datapath(MethodHolder): capture = self.send_packets() # make sure the one packet we expect actually showed up - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 256: 8}) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 256: 8, 61: (self.direction == "tx")}, + ) self.collector.get_capture(2) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") def test_L3onL2(self): - """ L3 data on L2 datapath""" + """L3 data on L2 datapath""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, layer='l3') + ipfix = VppCFLOW( + test=self, intf=self.intf1, layer="l3", direction=self.direction + ) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() @@ -520,11 +662,20 @@ class Datapath(MethodHolder): capture = self.send_packets() # make sure the one packet we expect actually showed up - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 4: 17, - 8: 'src_ip', 12: 'dst_ip'}) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + { + 2: "packets", + 4: 17, + 8: "src_ip", + 12: "dst_ip", + 61: (self.direction == "tx"), + }, + ) self.collector.get_capture(3) @@ -532,12 +683,14 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0002") def test_L4onL2(self): - """ L4 data on L2 datapath""" + """L4 data on L2 datapath""" self.logger.info("FFP_TEST_START_0003") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, layer='l4') + ipfix = VppCFLOW( + test=self, intf=self.intf1, layer="l4", direction=self.direction + ) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() @@ -548,10 +701,14 @@ class Datapath(MethodHolder): capture = self.send_packets() # make sure the one packet we expect actually showed up - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 7: 'sport', 11: 'dport'}) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")}, + ) self.collector.get_capture(3) @@ -559,16 +716,18 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0003") def test_templatesIp4(self): - """ verify templates on IP4 datapath""" + """verify templates on IP4 datapath""" self.logger.info("FFP_TEST_START_0000") self.pg_enable_capture(self.pg_interfaces) - ipfix = VppCFLOW(test=self, datapath='ip4') + ipfix = VppCFLOW( + test=self, intf=self.intf1, datapath="ip4", direction=self.direction + ) ipfix.add_vpp_config() # template packet should arrive immediately - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() ipfix.verify_templates(timeout=3, count=1) self.collector.get_capture(1) @@ -577,12 +736,18 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0000") def test_L2onIP4(self): - """ L2 data on IP4 datapath""" + """L2 data on IP4 datapath""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg4', layer='l2', datapath='ip4') + ipfix = VppCFLOW( + test=self, + intf=self.intf2, + layer="l2", + datapath="ip4", + direction=self.direction, + ) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() @@ -593,10 +758,14 @@ class Datapath(MethodHolder): capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4) # make sure the one packet we expect actually showed up - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 256: 8}) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 256: 8, 61: (self.direction == "tx")}, + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -605,12 +774,18 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0001") def test_L3onIP4(self): - """ L3 data on IP4 datapath""" + """L3 data on IP4 datapath""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg4', layer='l3', datapath='ip4') + ipfix = VppCFLOW( + test=self, + intf=self.intf2, + layer="l3", + datapath="ip4", + direction=self.direction, + ) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() @@ -621,11 +796,20 @@ class Datapath(MethodHolder): capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4) # make sure the one packet we expect actually showed up - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {1: 'octets', 2: 'packets', - 8: 'src_ip', 12: 'dst_ip'}) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + { + 1: "octets", + 2: "packets", + 8: "src_ip", + 12: "dst_ip", + 61: (self.direction == "tx"), + }, + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -634,12 +818,18 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0002") def test_L4onIP4(self): - """ L4 data on IP4 datapath""" + """L4 data on IP4 datapath""" self.logger.info("FFP_TEST_START_0003") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg4', layer='l4', datapath='ip4') + ipfix = VppCFLOW( + test=self, + intf=self.intf2, + layer="l4", + datapath="ip4", + direction=self.direction, + ) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() @@ -650,10 +840,14 @@ class Datapath(MethodHolder): capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4) # make sure the one packet we expect actually showed up - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 7: 'sport', 11: 'dport'}) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")}, + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -662,11 +856,13 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0003") def test_templatesIP6(self): - """ verify templates on IP6 datapath""" + """verify templates on IP6 datapath""" self.logger.info("FFP_TEST_START_0000") self.pg_enable_capture(self.pg_interfaces) - ipfix = VppCFLOW(test=self, datapath='ip6') + ipfix = VppCFLOW( + test=self, intf=self.intf1, datapath="ip6", direction=self.direction + ) ipfix.add_vpp_config() # template packet should arrive immediately @@ -678,28 +874,37 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0000") def test_L2onIP6(self): - """ L2 data on IP6 datapath""" + """L2 data on IP6 datapath""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg6', layer='l2', datapath='ip6') + ipfix = VppCFLOW( + test=self, + intf=self.intf3, + layer="l2", + datapath="ip6", + direction=self.direction, + ) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder, count=1) - self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, - ip_ver='IPv6') + self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6") capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6) # make sure the one packet we expect actually showed up - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 256: 56710}, - ip_ver='v6') + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 256: 56710, 61: (self.direction == "tx")}, + ip_ver="v6", + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -708,29 +913,37 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0001") def test_L3onIP6(self): - """ L3 data on IP6 datapath""" + """L3 data on IP6 datapath""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg6', layer='l3', datapath='ip6') + ipfix = VppCFLOW( + test=self, + intf=self.intf3, + layer="l3", + datapath="ip6", + direction=self.direction, + ) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder, count=1) - self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, - ip_ver='IPv6') + self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6") capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6) # make sure the one packet we expect actually showed up - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', - 27: 'src_ip', 28: 'dst_ip'}, - ip_ver='v6') + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")}, + ip_ver="v6", + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -739,28 +952,37 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0002") def test_L4onIP6(self): - """ L4 data on IP6 datapath""" + """L4 data on IP6 datapath""" self.logger.info("FFP_TEST_START_0003") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg6', layer='l4', datapath='ip6') + ipfix = VppCFLOW( + test=self, + intf=self.intf3, + layer="l4", + datapath="ip6", + direction=self.direction, + ) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder, count=1) - self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, - ip_ver='IPv6') + self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6") capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6) # make sure the one packet we expect actually showed up - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 7: 'sport', 11: 'dport'}, - ip_ver='v6') + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")}, + ip_ver="v6", + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -769,12 +991,12 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0003") def test_0001(self): - """ no timers, one CFLOW packet, 9 Flows inside""" + """no timers, one CFLOW packet, 9 Flows inside""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self) + ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() @@ -785,7 +1007,7 @@ class Datapath(MethodHolder): capture = self.send_packets() # make sure the one packet we expect actually showed up - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[1]) self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow]) self.collector.get_capture(4) @@ -794,17 +1016,17 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0001") def test_0002(self): - """ no timers, two CFLOW packets (mtu=256), 3 Flows in each""" + """no timers, two CFLOW packets (mtu=260), 3 Flows in each""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, mtu=256) + ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() templates = ipfix.verify_templates(ipfix_decoder) self.create_stream(packets=6) @@ -812,11 +1034,9 @@ class Datapath(MethodHolder): # make sure the one packet we expect actually showed up cflows = [] - self.vapi.cli("ipfix flush") - cflows.append(self.wait_for_cflow_packet(self.collector, - templates[1])) - cflows.append(self.wait_for_cflow_packet(self.collector, - templates[1])) + self.vapi.ipfix_flush() + cflows.append(self.wait_for_cflow_packet(self.collector, templates[1])) + cflows.append(self.wait_for_cflow_packet(self.collector, templates[1])) self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows) self.collector.get_capture(5) @@ -824,12 +1044,40 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0002") -@unittest.skipUnless(running_extended_tests(), "part of extended tests") +@tag_fixme_vpp_workers +class DatapathTx(MethodHolder, DatapathTestsHolder): + """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)""" + + intf1 = "pg2" + intf2 = "pg4" + intf3 = "pg6" + direction = "tx" + + +@tag_fixme_vpp_workers +class DatapathRx(MethodHolder, DatapathTestsHolder): + """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)""" + + intf1 = "pg1" + intf2 = "pg3" + intf3 = "pg5" + direction = "rx" + + +@unittest.skipUnless(config.extended, "part of extended tests") class DisableIPFIX(MethodHolder): """Disable IPFIX""" + @classmethod + def setUpClass(cls): + super(DisableIPFIX, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(DisableIPFIX, cls).tearDownClass() + def test_0001(self): - """ disable IPFIX after first packets""" + """disable IPFIX after first packets""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -845,33 +1093,39 @@ class DisableIPFIX(MethodHolder): self.send_packets() # make sure the one packet we expect actually showed up - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() self.wait_for_cflow_packet(self.collector, templates[1]) self.collector.get_capture(4) - # disble IPFIX + # disable IPFIX ipfix.disable_exporter() self.pg_enable_capture([self.collector]) self.send_packets() # make sure no one packet arrived in 1 minute - self.vapi.cli("ipfix flush") - self.wait_for_cflow_packet(self.collector, templates[1], - expected=False) - self.collector.get_capture(0) + self.vapi.ipfix_flush() + self.sleep(1, "wait before verifying no packets sent") + self.collector.assert_nothing_captured() ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") -@unittest.skipUnless(running_extended_tests(), "part of extended tests") +@unittest.skipUnless(config.extended, "part of extended tests") class ReenableIPFIX(MethodHolder): """Re-enable IPFIX""" + @classmethod + def setUpClass(cls): + super(ReenableIPFIX, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(ReenableIPFIX, cls).tearDownClass() + def test_0011(self): - """ disable IPFIX after first packets and re-enable after few packets - """ + """disable IPFIX after first packets and re-enable after few packets""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -887,22 +1141,21 @@ class ReenableIPFIX(MethodHolder): self.send_packets() # make sure the one packet we expect actually showed up - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() self.wait_for_cflow_packet(self.collector, templates[1]) self.collector.get_capture(4) - # disble IPFIX + # disable IPFIX ipfix.disable_exporter() - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() self.pg_enable_capture([self.collector]) self.send_packets() # make sure no one packet arrived in active timer span - self.vapi.cli("ipfix flush") - self.wait_for_cflow_packet(self.collector, templates[1], - expected=False) - self.collector.get_capture(0) + self.vapi.ipfix_flush() + self.sleep(1, "wait before verifying no packets sent") + self.collector.assert_nothing_captured() self.pg2.get_capture(5) # enable IPFIX @@ -926,12 +1179,20 @@ class ReenableIPFIX(MethodHolder): self.logger.info("FFP_TEST_FINISH_0001") -@unittest.skipUnless(running_extended_tests(), "part of extended tests") +@unittest.skipUnless(config.extended, "part of extended tests") class DisableFP(MethodHolder): """Disable Flowprobe feature""" + @classmethod + def setUpClass(cls): + super(DisableFP, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(DisableFP, cls).tearDownClass() + def test_0001(self): - """ disable flowprobe feature after first packets""" + """disable flowprobe feature after first packets""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -946,33 +1207,40 @@ class DisableFP(MethodHolder): self.send_packets() # make sure the one packet we expect actually showed up - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() self.wait_for_cflow_packet(self.collector, templates[1]) self.collector.get_capture(4) - # disble IPFIX + # disable IPFIX ipfix.disable_flowprobe_feature() self.pg_enable_capture([self.collector]) self.send_packets() # make sure no one packet arrived in active timer span - self.vapi.cli("ipfix flush") - self.wait_for_cflow_packet(self.collector, templates[1], - expected=False) - self.collector.get_capture(0) + self.vapi.ipfix_flush() + self.sleep(1, "wait before verifying no packets sent") + self.collector.assert_nothing_captured() ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") -@unittest.skipUnless(running_extended_tests(), "part of extended tests") +@unittest.skipUnless(config.extended, "part of extended tests") class ReenableFP(MethodHolder): """Re-enable Flowprobe feature""" + @classmethod + def setUpClass(cls): + super(ReenableFP, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(ReenableFP, cls).tearDownClass() + def test_0001(self): - """ disable flowprobe feature after first packets and re-enable - after few packets """ + """disable flowprobe feature after first packets and re-enable + after few packets""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -982,39 +1250,38 @@ class ReenableFP(MethodHolder): ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() templates = ipfix.verify_templates(ipfix_decoder, timeout=3) self.create_stream() self.send_packets() # make sure the one packet we expect actually showed up - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() self.wait_for_cflow_packet(self.collector, templates[1], 5) self.collector.get_capture(4) - # disble FPP feature + # disable FPP feature ipfix.disable_flowprobe_feature() self.pg_enable_capture([self.collector]) self.send_packets() # make sure no one packet arrived in active timer span - self.vapi.cli("ipfix flush") - self.wait_for_cflow_packet(self.collector, templates[1], 5, - expected=False) - self.collector.get_capture(0) + self.vapi.ipfix_flush() + self.sleep(5, "wait before verifying no packets sent") + self.collector.assert_nothing_captured() # enable FPP feature ipfix.enable_flowprobe_feature() - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() templates = ipfix.verify_templates(ipfix_decoder, timeout=3) self.send_packets() # make sure the next packets (templates and data) we expect actually # showed up - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() self.wait_for_cflow_packet(self.collector, templates[1], 5) self.collector.get_capture(4) @@ -1022,5 +1289,5 @@ class ReenableFP(MethodHolder): self.logger.info("FFP_TEST_FINISH_0001") -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)