X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=test%2Ftest_flowprobe.py;h=ac0433abc006dbc591db4d6a1437a81b5615c6c4;hb=853cc9f2ad3ee52cbdd891fb09d51c25678baed0;hp=560b44cc991f9cb656527eb0d095b9107f3dfd1e;hpb=5c749734b14c2d3be8689b0c5b72ae8d1ddec099;p=vpp.git diff --git a/test/test_flowprobe.py b/test/test_flowprobe.py index 560b44cc991..ac0433abc00 100644 --- a/test/test_flowprobe.py +++ b/test/test_flowprobe.py @@ -1,4 +1,6 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 +from __future__ import print_function +import binascii import random import socket import unittest @@ -6,39 +8,99 @@ import time from scapy.packet import Raw from scapy.layers.l2 import Ether -from scapy.layers.inet import IP, UDP +from scapy.layers.inet import IP, TCP, UDP from scapy.layers.inet6 import IPv6 - -from framework import VppTestCase, VppTestRunner +from scapy.contrib.lacp import SlowProtocol, LACP + +from config import config +from framework import VppTestCase +from asfframework import ( + tag_fixme_vpp_workers, + tag_fixme_ubuntu2204, + tag_fixme_debian11, + tag_run_solo, + is_distro_ubuntu2204, + is_distro_debian11, + VppTestRunner, +) from vpp_object import VppObject -from vpp_pg_interface import CaptureTimeoutError from util import ppp from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder +from vpp_ip_route import VppIpRoute, VppRoutePath +from vpp_papi.macaddress import mac_ntop +from socket import inet_ntop +from vpp_papi import VppEnum +from vpp_sub_interface import VppDot1ADSubint + + +TMPL_COMMON_FIELD_COUNT = 6 +TMPL_L2_FIELD_COUNT = 3 +TMPL_L3_FIELD_COUNT = 4 +TMPL_L4_FIELD_COUNT = 3 + +IPFIX_TCP_FLAGS_ID = 6 +IPFIX_SRC_TRANS_PORT_ID = 7 +IPFIX_DST_TRANS_PORT_ID = 11 +IPFIX_SRC_IP4_ADDR_ID = 8 +IPFIX_DST_IP4_ADDR_ID = 12 +IPFIX_FLOW_DIRECTION_ID = 61 + +TCP_F_FIN = 0x01 +TCP_F_SYN = 0x02 +TCP_F_RST = 0x04 +TCP_F_PSH = 0x08 +TCP_F_ACK = 0x10 +TCP_F_URG = 0x20 +TCP_F_ECE = 0x40 +TCP_F_CWR = 0x80 class VppCFLOW(VppObject): """CFLOW object for IPFIX exporter and Flowprobe feature""" - def __init__(self, test, intf='pg2', active=0, passive=0, timeout=300, - mtu=512, datapath='l2', layer='l2 l3 l4'): + def __init__( + self, + test, + intf="pg2", + active=0, + passive=0, + timeout=100, + mtu=1024, + datapath="l2", + layer="l2 l3 l4", + direction="tx", + ): self._test = test self._intf = intf + self._intf_obj = getattr(self._test, intf) self._active = active if passive == 0 or passive < active: - self._passive = active+5 + self._passive = active + 1 else: self._passive = passive - self._datapath = datapath # l2 ip4 ip6 - self._collect = layer # l2 l3 l4 + self._datapath = datapath # l2 ip4 ip6 + self._collect = layer # l2 l3 l4 + self._direction = direction # rx tx both self._timeout = timeout self._mtu = mtu self._configured = False def add_vpp_config(self): self.enable_exporter() - self._test.vapi.ppcli("flowprobe params record %s active %s " - "passive %s" % (self._collect, self._active, - self._passive)) + l2_flag = 0 + l3_flag = 0 + l4_flag = 0 + if "l2" in self._collect.lower(): + l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2 + if "l3" in self._collect.lower(): + l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3 + if "l4" in self._collect.lower(): + l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4 + self._test.vapi.flowprobe_set_params( + record_flags=(l2_flag | l3_flag | l4_flag), + active_timer=self._active, + passive_timer=self._passive, + ) self.enable_flowprobe_feature() self._test.vapi.cli("ipfix flush") self._configured = True @@ -51,56 +113,66 @@ class VppCFLOW(VppObject): def enable_exporter(self): self._test.vapi.set_ipfix_exporter( - collector_address=self._test.pg0.remote_ip4n, - src_address=self._test.pg0.local_ip4n, + collector_address=self._test.pg0.remote_ip4, + src_address=self._test.pg0.local_ip4, path_mtu=self._mtu, - template_interval=self._timeout) + template_interval=self._timeout, + ) + + def _enable_disable_flowprobe_feature(self, is_add): + which_map = { + "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2, + "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4, + "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6, + } + direction_map = { + "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX, + "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX, + "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH, + } + self._test.vapi.flowprobe_interface_add_del( + is_add=is_add, + which=which_map[self._datapath], + direction=direction_map[self._direction], + sw_if_index=self._intf_obj.sw_if_index, + ) def enable_flowprobe_feature(self): - self._test.vapi.ppcli("flowprobe feature add-del %s %s" % - (self._intf, self._datapath)) + self._enable_disable_flowprobe_feature(is_add=True) def disable_exporter(self): self._test.vapi.cli("set ipfix exporter collector 0.0.0.0") def disable_flowprobe_feature(self): - self._test.vapi.cli("flowprobe feature add-del %s %s disable" % - (self._intf, self._datapath)) + self._enable_disable_flowprobe_feature(is_add=False) def object_id(self): - return "ipfix-collector-%s" % (self._src, self.dst) + return "ipfix-collector-%s-%s" % (self._src, self.dst) def query_vpp_config(self): return self._configured - def verify_templates(self, decoder=None, timeout=1, count=3): + def verify_templates(self, decoder=None, timeout=1, count=3, field_count_in=None): templates = [] - p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout) - self._test.assertTrue(p.haslayer(IPFIX)) - if decoder is not None and p.haslayer(Template): - templates.append(p[Template].templateID) - decoder.add_template(p.getlayer(Template)) - if count > 1: - p = self._test.wait_for_cflow_packet(self._test.collector, 2) - self._test.assertTrue(p.haslayer(IPFIX)) - if decoder is not None and p.haslayer(Template): - templates.append(p[Template].templateID) - decoder.add_template(p.getlayer(Template)) - if count > 2: - p = self._test.wait_for_cflow_packet(self._test.collector, 2) + self._test.assertIn(count, (1, 2, 3)) + for _ in range(count): + p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout) self._test.assertTrue(p.haslayer(IPFIX)) - if decoder is not None and p.haslayer(Template): + self._test.assertTrue(p.haslayer(Template)) + if decoder is not None: templates.append(p[Template].templateID) decoder.add_template(p.getlayer(Template)) + if field_count_in is not None: + self._test.assertIn(p[Template].fieldCount, field_count_in) return templates class MethodHolder(VppTestCase): - """ Flow-per-packet plugin: test L2, IP4, IP6 reporting """ + """Flow-per-packet plugin: test L2, IP4, IP6 reporting""" # Test variables debug_print = False - max_number_of_packets = 16 + max_number_of_packets = 10 pkts = [] @classmethod @@ -111,18 +183,28 @@ class MethodHolder(VppTestCase): variables and configure VPP. """ super(MethodHolder, cls).setUpClass() + if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr( + cls, "vpp" + ): + return try: # Create pg interfaces - cls.create_pg_interfaces(range(7)) + cls.create_pg_interfaces(range(9)) # Packet sizes cls.pg_if_packet_sizes = [64, 512, 1518, 9018] # Create BD with MAC learning and unknown unicast flooding disabled # and put interfaces to this BD - cls.vapi.bridge_domain_add_del(bd_id=1, uu_flood=1, learn=1) - cls.vapi.sw_interface_set_l2_bridge(cls.pg1._sw_if_index, bd_id=1) - cls.vapi.sw_interface_set_l2_bridge(cls.pg2._sw_if_index, bd_id=1) + cls.vapi.bridge_domain_add_del_v2( + bd_id=1, uu_flood=1, learn=1, flood=1, forward=1, is_add=1 + ) + cls.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1 + ) + cls.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1 + ) # Set up all interfaces for i in cls.pg_interfaces: @@ -140,6 +222,9 @@ class MethodHolder(VppTestCase): cls.pg3.resolve_arp() cls.pg4.config_ip4() cls.pg4.resolve_arp() + cls.pg7.config_ip4() + cls.pg8.config_ip4() + cls.pg8.configure_ipv4_neighbors() cls.pg5.config_ip6() cls.pg5.resolve_ndp() @@ -151,8 +236,13 @@ class MethodHolder(VppTestCase): super(MethodHolder, cls).tearDownClass() raise - def create_stream(self, src_if=None, dst_if=None, packets=None, - size=None, ip_ver='v4'): + @classmethod + def tearDownClass(cls): + super(MethodHolder, cls).tearDownClass() + + def create_stream( + self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4" + ): """Create a packet stream to tickle the plugin :param VppInterface src_if: Source interface for packet stream @@ -172,11 +262,12 @@ class MethodHolder(VppTestCase): info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(info) p = Ether(src=src_if.remote_mac, dst=src_if.local_mac) - if ip_ver == 'v4': + if ip_ver == "v4": p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) else: p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) - p /= (UDP(sport=1234, dport=4321) / Raw(payload)) + p /= UDP(sport=1234, dport=4321) + p /= Raw(payload) info.data = p.copy() self.extend_packet(p, pkt_size) self.pkts.append(p) @@ -190,62 +281,76 @@ class MethodHolder(VppTestCase): if cflow.haslayer(Data): data = decoder.decode_data_set(cflow.getlayer(Set)) for record in data: - self.assertEqual(int(record[1].encode('hex'), 16), octets) - self.assertEqual(int(record[2].encode('hex'), 16), packets) + self.assertEqual(int(binascii.hexlify(record[1]), 16), octets) + self.assertEqual(int(binascii.hexlify(record[2]), 16), packets) - def verify_cflow_data_detail(self, decoder, capture, cflow, - data_set={1: 'octets', 2: 'packets'}, - ip_ver='v4'): + def send_packets(self, src_if=None, dst_if=None): + if src_if is None: + src_if = self.pg1 + if dst_if is None: + dst_if = self.pg2 + self.pg_enable_capture([dst_if]) + src_if.add_stream(self.pkts) + self.pg_start() + return dst_if.get_capture(len(self.pkts)) + + def verify_cflow_data_detail( + self, + decoder, + capture, + cflow, + data_set={1: "octets", 2: "packets"}, + ip_ver="v4", + field_count=None, + ): if self.debug_print: - print capture[0].show() + print(capture[0].show()) if cflow.haslayer(Data): data = decoder.decode_data_set(cflow.getlayer(Set)) if self.debug_print: - print data - if ip_ver == 'v4': - ip_layer = capture[0][IP] + print(data) + if ip_ver == "v4": + ip_layer = capture[0][IP] if capture[0].haslayer(IP) else None else: - ip_layer = capture[0][IPv6] + ip_layer = capture[0][IPv6] if capture[0].haslayer(IPv6) else None if data_set is not None: for record in data: - # skip flow if in/out gress interface is 0 - if int(record[10].encode('hex'), 16) == 0: + # skip flow if ingress/egress interface is 0 + if int(binascii.hexlify(record[10]), 16) == 0: continue - if int(record[14].encode('hex'), 16) == 0: + if int(binascii.hexlify(record[14]), 16) == 0: continue for field in data_set: - if field not in record.keys(): - continue value = data_set[field] - if value == 'octets': + if value == "octets": value = ip_layer.len - if ip_ver == 'v6': - value += 40 # ??? is this correct - elif value == 'packets': + if ip_ver == "v6": + value += 40 # ??? is this correct + elif value == "packets": value = 1 - elif value == 'src_ip': - if ip_ver == 'v4': - ip = socket.inet_pton(socket.AF_INET, - ip_layer.src) + elif value == "src_ip": + if ip_ver == "v4": + ip = socket.inet_pton(socket.AF_INET, ip_layer.src) else: - ip = socket.inet_pton(socket.AF_INET6, - ip_layer.src) - value = int(ip.encode('hex'), 16) - elif value == 'dst_ip': - if ip_ver == 'v4': - ip = socket.inet_pton(socket.AF_INET, - ip_layer.dst) + ip = socket.inet_pton(socket.AF_INET6, ip_layer.src) + value = int(binascii.hexlify(ip), 16) + elif value == "dst_ip": + if ip_ver == "v4": + ip = socket.inet_pton(socket.AF_INET, ip_layer.dst) else: - ip = socket.inet_pton(socket.AF_INET6, - ip_layer.dst) - value = int(ip.encode('hex'), 16) - elif value == 'sport': + ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst) + value = int(binascii.hexlify(ip), 16) + elif value == "sport": value = int(capture[0][UDP].sport) - elif value == 'dport': + elif value == "dport": value = int(capture[0][UDP].dport) - self.assertEqual(int(record[field].encode('hex'), 16), - value) + self.assertEqual( + int(binascii.hexlify(record[field]), 16), value + ) + if field_count is not None: + for record in data: + self.assertEqual(len(record), field_count) def verify_cflow_data_notimer(self, decoder, capture, cflows): idx = 0 @@ -258,272 +363,555 @@ class MethodHolder(VppTestCase): for rec in data: p = capture[idx] idx += 1 - self.assertEqual(p[IP].len, int(rec[1].encode('hex'), 16)) - self.assertEqual(1, int(rec[2].encode('hex'), 16)) + self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16)) + self.assertEqual(1, int(binascii.hexlify(rec[2]), 16)) self.assertEqual(len(capture), idx) - def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1, - expected=True): - """ wait for CFLOW packet and verify its correctness + def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1): + """wait for CFLOW packet and verify its correctness :param timeout: how long to wait - :returns: tuple (packet, time spent waiting for packet) """ self.logger.info("IPFIX: Waiting for CFLOW packet") - deadline = time.time() + timeout - counter = 0 # self.logger.debug(self.vapi.ppcli("show flow table")) - while True: - counter += 1 - # sanity check - self.assert_in_range(counter, 0, 100, "number of packets ignored") - time_left = deadline - time.time() - try: - if time_left < 0 and expected: - # self.logger.debug(self.vapi.ppcli("show flow table")) - raise CaptureTimeoutError( - "Packet did not arrive within timeout") - p = collector_intf.wait_for_packet(timeout=time_left) - except CaptureTimeoutError: - if expected: - # self.logger.debug(self.vapi.ppcli("show flow table")) - raise CaptureTimeoutError( - "Packet did not arrive within timeout") - else: - return - if not expected: - raise CaptureTimeoutError("Packet arrived even not expected") - self.assertEqual(p[Set].setID, set_id) - # self.logger.debug(self.vapi.ppcli("show flow table")) - self.logger.debug(ppp("IPFIX: Got packet:", p)) - break + p = collector_intf.wait_for_packet(timeout=timeout) + self.assertEqual(p[Set].setID, set_id) + # self.logger.debug(self.vapi.ppcli("show flow table")) + self.logger.debug(ppp("IPFIX: Got packet:", p)) return p - def send_packets(self, src_if=None, dst_if=None): - self.sleep(3) - if src_if is None: - src_if = self.pg1 - if dst_if is None: - dst_if = self.pg2 - self.pg_enable_capture([dst_if]) - src_if.add_stream(self.pkts) - self.pg_start() - return dst_if.get_capture(len(self.pkts)) - -class TestFFP_Timers(MethodHolder): +@tag_run_solo +@tag_fixme_vpp_workers +@tag_fixme_ubuntu2204 +@tag_fixme_debian11 +class Flowprobe(MethodHolder): """Template verification, timer tests""" + @classmethod + def setUpClass(cls): + super(Flowprobe, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(Flowprobe, cls).tearDownClass() + def test_0001(self): - """ receive template data packets""" + """timer less than template timeout""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] - ipfix = VppCFLOW(test=self, active=10) + ipfix = VppCFLOW(test=self, active=2) ipfix.add_vpp_config() + ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately - ipfix.verify_templates(timeout=3) + templates = ipfix.verify_templates(ipfix_decoder) + + self.create_stream(packets=1) + self.send_packets() + capture = self.pg2.get_capture(1) + + # make sure the one packet we expect actually showed up + cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15) + self.verify_cflow_data(ipfix_decoder, capture, cflow) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") + @unittest.skipUnless( + config.extended, "Test is unstable (assertion error, needs to be fixed" + ) def test_0002(self): - """ timer=10s, less than template timeout""" + """timer greater than template timeout [UNSTABLE, FIX ME]""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, active=20) + ipfix = VppCFLOW(test=self, timeout=3, active=4) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately - templates = ipfix.verify_templates(ipfix_decoder, timeout=3) + ipfix.verify_templates() - self.create_stream() - capture = self.send_packets() + self.create_stream(packets=2) + self.send_packets() + capture = self.pg2.get_capture(2) + + # next set of template packet should arrive after 20 seconds + # template packet should arrive within 20 s + templates = ipfix.verify_templates(ipfix_decoder, timeout=5) # make sure the one packet we expect actually showed up - cflow = self.wait_for_cflow_packet(self.collector, templates[1], 39) + cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15) self.verify_cflow_data(ipfix_decoder, capture, cflow) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0002") - def test_0003(self): - """ timer=30s, greater than template timeout""" - self.logger.info("FFP_TEST_START_0003") + def test_cflow_packet(self): + """verify cflow packet fields""" + self.logger.info("FFP_TEST_START_0000") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, timeout=20, active=25) + ipfix = VppCFLOW( + test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2 + ) ipfix.add_vpp_config() - ipfix_decoder = IPFIXDecoder() - # template packet should arrive immediately - ipfix.verify_templates(timeout=3) + route_9001 = VppIpRoute( + self, + "9.0.0.0", + 24, + [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)], + ) + route_9001.add_vpp_config() - self.create_stream() - capture = self.send_packets() - - # next set of template packet should arrive after 20 seconds - # template packet should arrive within 20 s - templates = ipfix.verify_templates(ipfix_decoder, timeout=25) + ipfix_decoder = IPFIXDecoder() + templates = ipfix.verify_templates(ipfix_decoder, count=1) + + self.pkts = [ + ( + Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac) + / IP(src=self.pg7.remote_ip4, dst="9.0.0.100") + / TCP(sport=1234, dport=4321, flags=80) + / Raw(b"\xa5" * 100) + ) + ] + + nowUTC = int(time.time()) + nowUNIX = nowUTC + 2208988800 + self.send_packets(src_if=self.pg7, dst_if=self.pg8) + + cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10) + self.collector.get_capture(2) - # make sure the one packet we expect actually showed up - cflow = self.wait_for_cflow_packet(self.collector, templates[1], 55) - self.verify_cflow_data(ipfix_decoder, capture, cflow) + if cflow[0].haslayer(IPFIX): + self.assertEqual(cflow[IPFIX].version, 10) + self.assertEqual(cflow[IPFIX].observationDomainID, 1) + self.assertEqual(cflow[IPFIX].sequenceNumber, 0) + self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5) + if cflow.haslayer(Data): + record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0] + # ingress interface + self.assertEqual(int(binascii.hexlify(record[10]), 16), 8) + # egress interface + self.assertEqual(int(binascii.hexlify(record[14]), 16), 9) + # direction + self.assertEqual(int(binascii.hexlify(record[61]), 16), 1) + # packets + self.assertEqual(int(binascii.hexlify(record[2]), 16), 1) + # src mac + self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac) + # dst mac + self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac) + flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32 + # flow start timestamp + self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1) + flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32 + # flow end timestamp + self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1) + # ethernet type + self.assertEqual(int(binascii.hexlify(record[256]), 16), 8) + # src ip + self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4) + # dst ip + self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100") + # protocol (TCP) + self.assertEqual(int(binascii.hexlify(record[4]), 16), 6) + # src port + self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234) + # dst port + self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321) + # tcp flags + self.assertEqual(int(binascii.hexlify(record[6]), 16), 80) ipfix.remove_vpp_config() - self.logger.info("FFP_TEST_FINISH_0003") + self.logger.info("FFP_TEST_FINISH_0000") - def test_0004(self): - """ sent packet after first cflow packet arrived""" - self.logger.info("FFP_TEST_START_0004") + def test_flow_entry_reuse(self): + """Verify flow entry reuse doesn't accumulate meta info""" self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, active=20) + # enable ip4 datapath for an interface + # set active and passive timers + ipfix = VppCFLOW( + test=self, + active=2, + passive=3, + intf="pg3", + layer="l3 l4", + datapath="ip4", + direction="rx", + mtu=100, + ) ipfix.add_vpp_config() - ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately - templates = ipfix.verify_templates(ipfix_decoder, timeout=3) + ipfix_decoder = IPFIXDecoder() + templates = ipfix.verify_templates(ipfix_decoder, count=1) + + # make a tcp packet + self.pkts = [ + ( + Ether(src=self.pg3.remote_mac, dst=self.pg4.local_mac) + / IP(src=self.pg3.remote_ip4, dst=self.pg4.remote_ip4) + / TCP(sport=1234, dport=4321) + / Raw(b"\xa5" * 50) + ) + ] + + # send the tcp packet two times, each time with new set of flags + tcp_flags = ( + TCP_F_SYN | TCP_F_ACK, + TCP_F_RST | TCP_F_PSH, + ) + for f in tcp_flags: + self.pkts[0][TCP].flags = f + capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4) + + # verify meta info - packet/octet delta and tcp flags + cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=6) + self.verify_cflow_data(ipfix_decoder, capture, cflow) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + { + IPFIX_TCP_FLAGS_ID: f, + IPFIX_SRC_TRANS_PORT_ID: 1234, + IPFIX_DST_TRANS_PORT_ID: 4321, + }, + ) - self.create_stream() - self.send_packets() + self.collector.get_capture(3) - # make sure the one packet we expect actually showed up - self.wait_for_cflow_packet(self.collector, templates[1], 39) + # cleanup + ipfix.remove_vpp_config() - self.pg_enable_capture([self.pg2]) + def test_interface_dump(self): + """Dump interfaces with IPFIX flow record generation enabled""" + self.logger.info("FFP_TEST_START_0003") - capture = self.send_packets() + # Enable feature for 3 interfaces + ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx") + ipfix1.add_vpp_config() + + ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx") + ipfix2.enable_flowprobe_feature() + + ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both") + ipfix3.enable_flowprobe_feature() + + # When request "all", dump should contain all enabled interfaces + dump = self.vapi.flowprobe_interface_dump() + self.assertEqual(len(dump), 3) + + # Verify 1st interface + self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index) + self.assertEqual( + dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2 + ) + self.assertEqual( + dump[0].direction, + VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX, + ) + + # Verify 2nd interface + self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index) + self.assertEqual( + dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4 + ) + self.assertEqual( + dump[1].direction, + VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX, + ) + + # Verify 3rd interface + self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index) + self.assertEqual( + dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6 + ) + self.assertEqual( + dump[2].direction, + VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH, + ) + + # When request 2nd interface, dump should contain only the specified interface + dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index) + self.assertEqual(len(dump), 1) + + # Verify 2nd interface + self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index) + self.assertEqual( + dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4 + ) + self.assertEqual( + dump[0].direction, + VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX, + ) + + # When request 99th interface, dump should be empty + dump = self.vapi.flowprobe_interface_dump(sw_if_index=99) + self.assertEqual(len(dump), 0) + + ipfix1.remove_vpp_config() + ipfix2.remove_vpp_config() + ipfix3.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0003") - # make sure the one packet we expect actually showed up - cflow = self.wait_for_cflow_packet(self.collector, templates[1], 30) - self.verify_cflow_data(ipfix_decoder, capture, cflow) + def test_get_params(self): + """Get IPFIX flow record generation parameters""" + self.logger.info("FFP_TEST_START_0004") + + # Enable feature for an interface with custom parameters + ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4") + ipfix.add_vpp_config() + + # Get and verify parameters + params = self.vapi.flowprobe_get_params() + self.assertEqual(params.active_timer, 20) + self.assertEqual(params.passive_timer, 40) + record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2 + record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3 + record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4 + self.assertEqual(params.record_flags, record_flags) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0004") -class TestFFP_DatapathL2(MethodHolder): - """collect information on Ethernet datapath""" +class DatapathTestsHolder(object): + """collect information on Ethernet, IP4 and IP6 datapath (no timers)""" - def test_0000(self): - """ verify template on L2 datapath""" + @classmethod + def setUpClass(cls): + super(DatapathTestsHolder, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(DatapathTestsHolder, cls).tearDownClass() + + def test_templatesL2(self): + """verify template on L2 datapath""" self.logger.info("FFP_TEST_START_0000") self.pg_enable_capture(self.pg_interfaces) - ipfix = VppCFLOW(test=self, active=10) + ipfix = VppCFLOW( + test=self, intf=self.intf1, layer="l2", direction=self.direction + ) ipfix.add_vpp_config() # template packet should arrive immediately - ipfix.verify_templates(timeout=3, count=3) - self.collector.get_capture(3) + self.vapi.ipfix_flush() + ipfix.verify_templates(timeout=3, count=1) + self.collector.get_capture(1) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0000") - def test_0001(self): - """ L2 data on L2 datapath""" + def test_L2onL2(self): + """L2 data on L2 datapath""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, active=10, layer='l2') + ipfix = VppCFLOW( + test=self, intf=self.intf1, layer="l2", direction=self.direction + ) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately - templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=1) + templates = ipfix.verify_templates(ipfix_decoder, count=1) self.create_stream(packets=1) capture = self.send_packets() # make sure the one packet we expect actually showed up - cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 256: 8}) - - # expected two templates and one cflow packet + self.vapi.ipfix_flush() + cflow = self.wait_for_cflow_packet(self.collector, templates[0]) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 256: 8, 61: (self.direction == "tx")}, + ) self.collector.get_capture(2) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") - def test_0002(self): - """ L3 data on L2 datapath""" + def test_L3onL2(self): + """L3 data on L2 datapath""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, active=10, layer='l3') + ipfix = VppCFLOW( + test=self, intf=self.intf1, layer="l3", direction=self.direction + ) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately - templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=2) + templates = ipfix.verify_templates(ipfix_decoder, count=2) self.create_stream(packets=1) capture = self.send_packets() # make sure the one packet we expect actually showed up - cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 4: 17, - 8: 'src_ip', 12: 'dst_ip'}) + self.vapi.ipfix_flush() + cflow = self.wait_for_cflow_packet(self.collector, templates[0]) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + { + 2: "packets", + 4: 17, + 8: "src_ip", + 12: "dst_ip", + 61: (self.direction == "tx"), + }, + ) - # expected one template and one cflow packet self.collector.get_capture(3) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0002") - def test_0003(self): - """ L4 data on L2 datapath""" + def test_L234onL2(self): + """L2/3/4 data on L2 datapath""" + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + ipfix = VppCFLOW( + test=self, intf=self.intf1, layer="l2 l3 l4", direction=self.direction + ) + ipfix.add_vpp_config() + + ipfix_decoder = IPFIXDecoder() + # template packet should arrive immediately + tmpl_l2_field_count = TMPL_COMMON_FIELD_COUNT + TMPL_L2_FIELD_COUNT + tmpl_ip_field_count = ( + TMPL_COMMON_FIELD_COUNT + + TMPL_L2_FIELD_COUNT + + TMPL_L3_FIELD_COUNT + + TMPL_L4_FIELD_COUNT + ) + templates = ipfix.verify_templates( + ipfix_decoder, + count=3, + field_count_in=(tmpl_l2_field_count, tmpl_ip_field_count), + ) + + # verify IPv4 and IPv6 flows + for ip_ver in ("v4", "v6"): + self.create_stream(packets=1, ip_ver=ip_ver) + capture = self.send_packets() + + # make sure the one packet we expect actually showed up + self.vapi.ipfix_flush() + cflow = self.wait_for_cflow_packet( + self.collector, templates[1 if ip_ver == "v4" else 2] + ) + src_ip_id = 8 if ip_ver == "v4" else 27 + dst_ip_id = 12 if ip_ver == "v4" else 28 + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + { + 2: "packets", + 256: 8 if ip_ver == "v4" else 56710, + 4: 17, + 7: "sport", + 11: "dport", + src_ip_id: "src_ip", + dst_ip_id: "dst_ip", + 61: (self.direction == "tx"), + }, + ip_ver=ip_ver, + field_count=tmpl_ip_field_count, + ) + + # verify non-IP flow + self.pkts = [ + ( + Ether(dst=self.pg2.local_mac, src=self.pg1.remote_mac) + / SlowProtocol() + / LACP() + ) + ] + capture = self.send_packets() + + # make sure the one packet we expect actually showed up + self.vapi.ipfix_flush() + cflow = self.wait_for_cflow_packet(self.collector, templates[0]) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 256: 2440, 61: (self.direction == "tx")}, + field_count=tmpl_l2_field_count, + ) + + self.collector.get_capture(6) + + ipfix.remove_vpp_config() + + def test_L4onL2(self): + """L4 data on L2 datapath""" self.logger.info("FFP_TEST_START_0003") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, active=10, layer='l4') + ipfix = VppCFLOW( + test=self, intf=self.intf1, layer="l4", direction=self.direction + ) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately - templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=2) + templates = ipfix.verify_templates(ipfix_decoder, count=2) self.create_stream(packets=1) capture = self.send_packets() # make sure the one packet we expect actually showed up - cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 7: 'sport', 11: 'dport'}) + self.vapi.ipfix_flush() + cflow = self.wait_for_cflow_packet(self.collector, templates[0]) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")}, + ) - # expected one template and one cflow packet self.collector.get_capture(3) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0003") - -class TestFFP_DatapathIP4(MethodHolder): - """collect information on IP4 datapath""" - - def test_0000(self): - """ verify templates on IP4 datapath""" + def test_templatesIp4(self): + """verify templates on IP4 datapath""" self.logger.info("FFP_TEST_START_0000") self.pg_enable_capture(self.pg_interfaces) - ipfix = VppCFLOW(test=self, active=10, datapath='ip4') + ipfix = VppCFLOW( + test=self, intf=self.intf1, datapath="ip4", direction=self.direction + ) ipfix.add_vpp_config() # template packet should arrive immediately + self.vapi.ipfix_flush() ipfix.verify_templates(timeout=3, count=1) self.collector.get_capture(1) @@ -531,27 +919,37 @@ class TestFFP_DatapathIP4(MethodHolder): self.logger.info("FFP_TEST_FINISH_0000") - def test_0001(self): - """ L2 data on IP4 datapath""" + def test_L2onIP4(self): + """L2 data on IP4 datapath""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg4', active=10, - layer='l2', datapath='ip4') + ipfix = VppCFLOW( + test=self, + intf=self.intf2, + layer="l2", + datapath="ip4", + direction=self.direction, + ) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately - templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=1) + templates = ipfix.verify_templates(ipfix_decoder, count=1) self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1) capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4) # make sure the one packet we expect actually showed up - cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 256: 8}) + self.vapi.ipfix_flush() + cflow = self.wait_for_cflow_packet(self.collector, templates[0]) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 256: 8, 61: (self.direction == "tx")}, + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -559,28 +957,43 @@ class TestFFP_DatapathIP4(MethodHolder): ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") - def test_0002(self): - """ L3 data on IP4 datapath""" + def test_L3onIP4(self): + """L3 data on IP4 datapath""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg4', active=10, - layer='l3', datapath='ip4') + ipfix = VppCFLOW( + test=self, + intf=self.intf2, + layer="l3", + datapath="ip4", + direction=self.direction, + ) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately - templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=1) + templates = ipfix.verify_templates(ipfix_decoder, count=1) self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1) capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4) # make sure the one packet we expect actually showed up - cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {1: 'octets', 2: 'packets', - 8: 'src_ip', 12: 'dst_ip'}) + self.vapi.ipfix_flush() + cflow = self.wait_for_cflow_packet(self.collector, templates[0]) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + { + 1: "octets", + 2: "packets", + 8: "src_ip", + 12: "dst_ip", + 61: (self.direction == "tx"), + }, + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -588,27 +1001,37 @@ class TestFFP_DatapathIP4(MethodHolder): ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0002") - def test_0003(self): - """ L4 data on IP4 datapath""" + def test_L4onIP4(self): + """L4 data on IP4 datapath""" self.logger.info("FFP_TEST_START_0003") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg4', active=10, - layer='l4', datapath='ip4') + ipfix = VppCFLOW( + test=self, + intf=self.intf2, + layer="l4", + datapath="ip4", + direction=self.direction, + ) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately - templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=1) + templates = ipfix.verify_templates(ipfix_decoder, count=1) self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1) capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4) # make sure the one packet we expect actually showed up - cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 7: 'sport', 11: 'dport'}) + self.vapi.ipfix_flush() + cflow = self.wait_for_cflow_packet(self.collector, templates[0]) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")}, + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -616,49 +1039,56 @@ class TestFFP_DatapathIP4(MethodHolder): ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0003") - -class TestFFP_DatapathIP6(MethodHolder): - """collect information on IP6 datapath""" - - def test_0000(self): - """ verify templates on IP6 datapath""" + def test_templatesIP6(self): + """verify templates on IP6 datapath""" self.logger.info("FFP_TEST_START_0000") self.pg_enable_capture(self.pg_interfaces) - ipfix = VppCFLOW(test=self, active=10, datapath='ip6') + ipfix = VppCFLOW( + test=self, intf=self.intf1, datapath="ip6", direction=self.direction + ) ipfix.add_vpp_config() # template packet should arrive immediately - ipfix.verify_templates(timeout=3, count=1) + ipfix.verify_templates(count=1) self.collector.get_capture(1) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0000") - def test_0001(self): - """ L2 data on IP6 datapath""" + def test_L2onIP6(self): + """L2 data on IP6 datapath""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg6', active=20, - layer='l2', datapath='ip6') + ipfix = VppCFLOW( + test=self, + intf=self.intf3, + layer="l2", + datapath="ip6", + direction=self.direction, + ) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately - templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=1) + templates = ipfix.verify_templates(ipfix_decoder, count=1) - self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, - ip_ver='IPv6') + self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6") capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6) # make sure the one packet we expect actually showed up - cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 256: 56710}, - ip_ver='v6') + self.vapi.ipfix_flush() + cflow = self.wait_for_cflow_packet(self.collector, templates[0]) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 256: 56710, 61: (self.direction == "tx")}, + ip_ver="v6", + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -666,30 +1096,38 @@ class TestFFP_DatapathIP6(MethodHolder): ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") - def test_0002(self): - """ L3 data on IP6 datapath""" + def test_L3onIP6(self): + """L3 data on IP6 datapath""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg6', active=10, - layer='l3', datapath='ip6') + ipfix = VppCFLOW( + test=self, + intf=self.intf3, + layer="l3", + datapath="ip6", + direction=self.direction, + ) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately - templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=1) + templates = ipfix.verify_templates(ipfix_decoder, count=1) - self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, - ip_ver='IPv6') + self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6") capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6) # make sure the one packet we expect actually showed up - cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', - 27: 'src_ip', 28: 'dst_ip'}, - ip_ver='v6') + self.vapi.ipfix_flush() + cflow = self.wait_for_cflow_packet(self.collector, templates[0]) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")}, + ip_ver="v6", + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -697,29 +1135,38 @@ class TestFFP_DatapathIP6(MethodHolder): ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0002") - def test_0003(self): - """ L4 data on IP6 datapath""" + def test_L4onIP6(self): + """L4 data on IP6 datapath""" self.logger.info("FFP_TEST_START_0003") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg6', active=10, - layer='l4', datapath='ip6') + ipfix = VppCFLOW( + test=self, + intf=self.intf3, + layer="l4", + datapath="ip6", + direction=self.direction, + ) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately - templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=1) + templates = ipfix.verify_templates(ipfix_decoder, count=1) - self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, - ip_ver='IPv6') + self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6") capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6) # make sure the one packet we expect actually showed up - cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 7: 'sport', 11: 'dport'}, - ip_ver='v6') + self.vapi.ipfix_flush() + cflow = self.wait_for_cflow_packet(self.collector, templates[0]) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")}, + ip_ver="v6", + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -727,58 +1174,53 @@ class TestFFP_DatapathIP6(MethodHolder): ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0003") - -class TestFFP_NoTimers(MethodHolder): - """No timers""" - def test_0001(self): - """ no timers, one CFLOW packet, 9 Flows inside""" + """no timers, one CFLOW packet, 9 Flows inside""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, active=0) + ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately - templates = ipfix.verify_templates(ipfix_decoder, timeout=3) + templates = ipfix.verify_templates(ipfix_decoder) self.create_stream(packets=9) capture = self.send_packets() # make sure the one packet we expect actually showed up - cflow = self.wait_for_cflow_packet(self.collector, templates[1], 10) + self.vapi.ipfix_flush() + cflow = self.wait_for_cflow_packet(self.collector, templates[1]) self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow]) - self.wait_for_cflow_packet(self.collector, templates[1], 10, - expected=False) self.collector.get_capture(4) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") def test_0002(self): - """ no timers, two CFLOW packets (mtu=256), 3 Flows in each""" + """no timers, two CFLOW packets (mtu=260), 3 Flows in each""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, active=0, mtu=256) + ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately - templates = ipfix.verify_templates(ipfix_decoder, timeout=3) + self.vapi.ipfix_flush() + templates = ipfix.verify_templates(ipfix_decoder) self.create_stream(packets=6) capture = self.send_packets() # make sure the one packet we expect actually showed up cflows = [] - cflows.append(self.wait_for_cflow_packet(self.collector, - templates[1], 10)) - cflows.append(self.wait_for_cflow_packet(self.collector, - templates[1], 10)) + self.vapi.ipfix_flush() + cflows.append(self.wait_for_cflow_packet(self.collector, templates[1])) + cflows.append(self.wait_for_cflow_packet(self.collector, templates[1])) self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows) self.collector.get_capture(5) @@ -786,182 +1228,380 @@ class TestFFP_NoTimers(MethodHolder): self.logger.info("FFP_TEST_FINISH_0002") -class TestFFP_DisableIPFIX(MethodHolder): +class DatapathTx(MethodHolder, DatapathTestsHolder): + """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)""" + + intf1 = "pg2" + intf2 = "pg4" + intf3 = "pg6" + direction = "tx" + + def test_rewritten_traffic(self): + """Rewritten traffic (from subif to ipfix if)""" + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + # prepare a sub-interface + subif = VppDot1ADSubint(self, self.pg7, 0, 300, 400) + subif.admin_up() + subif.config_ip4() + + # enable ip4 datapath for an interface + ipfix = VppCFLOW( + test=self, + intf="pg8", + datapath="ip4", + layer="l2 l3 l4", + direction=self.direction, + ) + ipfix.add_vpp_config() + + # template packet should arrive immediately + ipfix_decoder = IPFIXDecoder() + templates = ipfix.verify_templates(ipfix_decoder, count=1) + + # forward some traffic through the ipfix interface + route = VppIpRoute( + self, + "9.0.0.0", + 24, + [VppRoutePath(self.pg8.remote_ip4, self.pg8.sw_if_index)], + ) + route.add_vpp_config() + + # prepare an IPv4 packet (subif => ipfix interface) + pkt = ( + Ether(src=subif.remote_mac, dst=self.pg7.local_mac) + / IP(src=subif.remote_ip4, dst="9.0.0.1") + / UDP(sport=1234, dport=4321) + / Raw(b"\xa5" * 123) + ) + self.pkts = [ + subif.add_dot1ad_layer(pkt, 300, 400), + ] + + # send the packet + capture = self.send_packets(self.pg7, self.pg8) + + # wait for a flow and verify it + self.vapi.ipfix_flush() + cflow = self.wait_for_cflow_packet(self.collector, templates[0]) + self.verify_cflow_data(ipfix_decoder, capture, cflow) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + { + IPFIX_SRC_IP4_ADDR_ID: "src_ip", + IPFIX_DST_IP4_ADDR_ID: "dst_ip", + IPFIX_SRC_TRANS_PORT_ID: "sport", + IPFIX_DST_TRANS_PORT_ID: "dport", + IPFIX_FLOW_DIRECTION_ID: (self.direction == "tx"), + }, + ) + + self.collector.get_capture(2) + + # cleanup + route.remove_vpp_config() + subif.remove_vpp_config() + ipfix.remove_vpp_config() + + +class DatapathRx(MethodHolder, DatapathTestsHolder): + """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)""" + + intf1 = "pg1" + intf2 = "pg3" + intf3 = "pg5" + direction = "rx" + + +@unittest.skipUnless(config.extended, "part of extended tests") +class DisableIPFIX(MethodHolder): """Disable IPFIX""" + @classmethod + def setUpClass(cls): + super(DisableIPFIX, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(DisableIPFIX, cls).tearDownClass() + def test_0001(self): - """ disable IPFIX after first packets""" + """disable IPFIX after first packets""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, active=10) + ipfix = VppCFLOW(test=self) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately - templates = ipfix.verify_templates(ipfix_decoder, timeout=30) + templates = ipfix.verify_templates(ipfix_decoder) self.create_stream() self.send_packets() # make sure the one packet we expect actually showed up - self.wait_for_cflow_packet(self.collector, templates[1], 30) + self.vapi.ipfix_flush() + self.wait_for_cflow_packet(self.collector, templates[1]) self.collector.get_capture(4) - # disble IPFIX + # disable IPFIX ipfix.disable_exporter() self.pg_enable_capture([self.collector]) self.send_packets() # make sure no one packet arrived in 1 minute - self.wait_for_cflow_packet(self.collector, templates[1], 30, - expected=False) - self.collector.get_capture(0) + self.vapi.ipfix_flush() + self.sleep(1, "wait before verifying no packets sent") + self.collector.assert_nothing_captured() ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") -class TestFFP_ReenableIPFIX(MethodHolder): +@unittest.skipUnless(config.extended, "part of extended tests") +class ReenableIPFIX(MethodHolder): """Re-enable IPFIX""" - def test_0001(self): - """ disable IPFIX after first packets and re-enable after few packets - """ + @classmethod + def setUpClass(cls): + super(ReenableIPFIX, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(ReenableIPFIX, cls).tearDownClass() + + def test_0011(self): + """disable IPFIX after first packets and re-enable after few packets""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, active=10) + ipfix = VppCFLOW(test=self) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately - templates = ipfix.verify_templates(ipfix_decoder, timeout=3) + templates = ipfix.verify_templates(ipfix_decoder) - self.create_stream() + self.create_stream(packets=5) self.send_packets() # make sure the one packet we expect actually showed up - self.wait_for_cflow_packet(self.collector, templates[1], 30) + self.vapi.ipfix_flush() + self.wait_for_cflow_packet(self.collector, templates[1]) self.collector.get_capture(4) - # disble IPFIX + # disable IPFIX ipfix.disable_exporter() + self.vapi.ipfix_flush() self.pg_enable_capture([self.collector]) self.send_packets() # make sure no one packet arrived in active timer span - self.wait_for_cflow_packet(self.collector, templates[1], 30, - expected=False) - self.collector.get_capture(0) + self.vapi.ipfix_flush() + self.sleep(1, "wait before verifying no packets sent") + self.collector.assert_nothing_captured() + self.pg2.get_capture(5) # enable IPFIX ipfix.enable_exporter() - self.vapi.cli("ipfix flush") - templates = ipfix.verify_templates(ipfix_decoder, timeout=3) - self.send_packets() - - # make sure the next packets (templates and data) we expect actually - # showed up - self.wait_for_cflow_packet(self.collector, templates[1], 30) - self.collector.get_capture(4) + capture = self.collector.get_capture(4) + nr_templates = 0 + nr_data = 0 + for p in capture: + self.assertTrue(p.haslayer(IPFIX)) + if p.haslayer(Template): + nr_templates += 1 + self.assertTrue(nr_templates, 3) + for p in capture: + self.assertTrue(p.haslayer(IPFIX)) + if p.haslayer(Data): + nr_data += 1 + self.assertTrue(nr_templates, 1) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") -class TestFFP_DisableFFP(MethodHolder): +@unittest.skipUnless(config.extended, "part of extended tests") +class DisableFP(MethodHolder): """Disable Flowprobe feature""" + @classmethod + def setUpClass(cls): + super(DisableFP, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(DisableFP, cls).tearDownClass() + def test_0001(self): - """ disable flowprobe feature after first packets""" + """disable flowprobe feature after first packets""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, active=10) + ipfix = VppCFLOW(test=self) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately - templates = ipfix.verify_templates(ipfix_decoder, timeout=3) + templates = ipfix.verify_templates(ipfix_decoder) self.create_stream() self.send_packets() # make sure the one packet we expect actually showed up - self.wait_for_cflow_packet(self.collector, templates[1], 30) + self.vapi.ipfix_flush() + self.wait_for_cflow_packet(self.collector, templates[1]) self.collector.get_capture(4) - # disble IPFIX + # disable IPFIX ipfix.disable_flowprobe_feature() self.pg_enable_capture([self.collector]) self.send_packets() # make sure no one packet arrived in active timer span - self.wait_for_cflow_packet(self.collector, templates[1], 30, - expected=False) - self.collector.get_capture(0) + self.vapi.ipfix_flush() + self.sleep(1, "wait before verifying no packets sent") + self.collector.assert_nothing_captured() + + # enable FPP feature so the remove_vpp_config() doesn't fail + # due to missing feature on interface. + ipfix.enable_flowprobe_feature() ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") + def test_no_leftover_flows_after_disabling(self): + """disable flowprobe feature and expect no leftover flows""" + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + # enable ip4 datapath for an interface + # set active and passive timers + ipfix = VppCFLOW( + test=self, + active=3, + passive=4, + intf="pg3", + layer="l3", + datapath="ip4", + direction="rx", + mtu=100, + ) + ipfix.add_vpp_config() + + # template packet should arrive immediately + ipfix.verify_templates(count=1) -class TestFFP_ReenableFFP(MethodHolder): + # send some ip4 packets + self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5) + self.send_packets(src_if=self.pg3, dst_if=self.pg4) + + # disable feature for the interface + # currently stored ip4 flows should be removed + ipfix.disable_flowprobe_feature() + + # no leftover ip4 flows are expected + self.pg_enable_capture([self.collector]) + self.sleep(12, "wait for leftover ip4 flows during three passive intervals") + self.collector.assert_nothing_captured() + + # re-enable feature for the interface + ipfix.enable_flowprobe_feature() + + # template packet should arrive immediately + ipfix_decoder = IPFIXDecoder() + self.vapi.ipfix_flush() + templates = ipfix.verify_templates(ipfix_decoder, count=1) + + # send some ip4 packets + self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5) + capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4) + + # verify meta info - packet/octet delta + self.vapi.ipfix_flush() + cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=8) + self.verify_cflow_data(ipfix_decoder, capture, cflow) + + self.collector.get_capture(2) + + # cleanup + ipfix.remove_vpp_config() + + +@unittest.skipUnless(config.extended, "part of extended tests") +class ReenableFP(MethodHolder): """Re-enable Flowprobe feature""" + @classmethod + def setUpClass(cls): + super(ReenableFP, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(ReenableFP, cls).tearDownClass() + def test_0001(self): - """ disable flowprobe feature after first packets and re-enable - after few packets """ + """disable flowprobe feature after first packets and re-enable + after few packets""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, active=10) + ipfix = VppCFLOW(test=self) ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately + self.vapi.ipfix_flush() templates = ipfix.verify_templates(ipfix_decoder, timeout=3) self.create_stream() self.send_packets() # make sure the one packet we expect actually showed up - self.wait_for_cflow_packet(self.collector, templates[1], 30) + self.vapi.ipfix_flush() + self.wait_for_cflow_packet(self.collector, templates[1], 5) self.collector.get_capture(4) - # disble FPP feature + # disable FPP feature ipfix.disable_flowprobe_feature() self.pg_enable_capture([self.collector]) self.send_packets() # make sure no one packet arrived in active timer span - self.wait_for_cflow_packet(self.collector, templates[1], 30, - expected=False) - self.collector.get_capture(0) + self.vapi.ipfix_flush() + self.sleep(5, "wait before verifying no packets sent") + self.collector.assert_nothing_captured() # enable FPP feature ipfix.enable_flowprobe_feature() - self.vapi.cli("ipfix flush") + self.vapi.ipfix_flush() templates = ipfix.verify_templates(ipfix_decoder, timeout=3) self.send_packets() # make sure the next packets (templates and data) we expect actually # showed up - self.wait_for_cflow_packet(self.collector, templates[1], 30) + self.vapi.ipfix_flush() + self.wait_for_cflow_packet(self.collector, templates[1], 5) self.collector.get_capture(4) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)