X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_flowprobe.py;h=ca2bbb53cbcb319ba9425f694b4b0d2b78bf0f5f;hb=535364e90459566b603661c3dbe360c72f59ad71;hp=1d86d19ac5aaa808cc9b51f341b60f0ed6db33bc;hpb=6f5ddf3461906bbc88f679882744afc74a81cae1;p=vpp.git diff --git a/test/test_flowprobe.py b/test/test_flowprobe.py index 1d86d19ac5a..ca2bbb53cbc 100644 --- a/test/test_flowprobe.py +++ b/test/test_flowprobe.py @@ -5,25 +5,54 @@ import random import socket import unittest import time -import re from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, TCP, UDP from scapy.layers.inet6 import IPv6 +from scapy.contrib.lacp import SlowProtocol, LACP from config import config -from framework import tag_fixme_vpp_workers -from framework import VppTestCase, VppTestRunner -from framework import tag_run_solo +from framework import VppTestCase +from asfframework import ( + tag_fixme_vpp_workers, + tag_fixme_ubuntu2204, + tag_fixme_debian11, + tag_run_solo, + is_distro_ubuntu2204, + is_distro_debian11, + VppTestRunner, +) from vpp_object import VppObject -from vpp_pg_interface import CaptureTimeoutError from util import ppp from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder from vpp_ip_route import VppIpRoute, VppRoutePath from vpp_papi.macaddress import mac_ntop from socket import inet_ntop from vpp_papi import VppEnum +from vpp_sub_interface import VppDot1ADSubint + + +TMPL_COMMON_FIELD_COUNT = 6 +TMPL_L2_FIELD_COUNT = 3 +TMPL_L3_FIELD_COUNT = 4 +TMPL_L4_FIELD_COUNT = 3 + +IPFIX_TCP_FLAGS_ID = 6 +IPFIX_SRC_TRANS_PORT_ID = 7 +IPFIX_DST_TRANS_PORT_ID = 11 +IPFIX_SRC_IP4_ADDR_ID = 8 +IPFIX_DST_IP4_ADDR_ID = 12 +IPFIX_FLOW_DIRECTION_ID = 61 + +TCP_F_FIN = 0x01 +TCP_F_SYN = 0x02 +TCP_F_RST = 0x04 +TCP_F_PSH = 0x08 +TCP_F_ACK = 0x10 +TCP_F_URG = 0x20 +TCP_F_ECE = 0x40 +TCP_F_CWR = 0x80 class VppCFLOW(VppObject): @@ -67,7 +96,7 @@ class VppCFLOW(VppObject): l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3 if "l4" in self._collect.lower(): l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4 - self._test.vapi.flowprobe_params( + self._test.vapi.flowprobe_set_params( record_flags=(l2_flag | l3_flag | l4_flag), active_timer=self._active, passive_timer=self._passive, @@ -123,15 +152,18 @@ class VppCFLOW(VppObject): def query_vpp_config(self): return self._configured - def verify_templates(self, decoder=None, timeout=1, count=3): + def verify_templates(self, decoder=None, timeout=1, count=3, field_count_in=None): templates = [] self._test.assertIn(count, (1, 2, 3)) for _ in range(count): p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout) self._test.assertTrue(p.haslayer(IPFIX)) - if decoder is not None and p.haslayer(Template): + self._test.assertTrue(p.haslayer(Template)) + if decoder is not None: templates.append(p[Template].templateID) decoder.add_template(p.getlayer(Template)) + if field_count_in is not None: + self._test.assertIn(p[Template].fieldCount, field_count_in) return templates @@ -151,6 +183,10 @@ class MethodHolder(VppTestCase): variables and configure VPP. """ super(MethodHolder, cls).setUpClass() + if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr( + cls, "vpp" + ): + return try: # Create pg interfaces cls.create_pg_interfaces(range(9)) @@ -160,7 +196,9 @@ class MethodHolder(VppTestCase): # Create BD with MAC learning and unknown unicast flooding disabled # and put interfaces to this BD - cls.vapi.bridge_domain_add_del(bd_id=1, uu_flood=1, learn=1) + cls.vapi.bridge_domain_add_del_v2( + bd_id=1, uu_flood=1, learn=1, flood=1, forward=1, is_add=1 + ) cls.vapi.sw_interface_set_l2_bridge( rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1 ) @@ -257,7 +295,13 @@ class MethodHolder(VppTestCase): return dst_if.get_capture(len(self.pkts)) def verify_cflow_data_detail( - self, decoder, capture, cflow, data_set={1: "octets", 2: "packets"}, ip_ver="v4" + self, + decoder, + capture, + cflow, + data_set={1: "octets", 2: "packets"}, + ip_ver="v4", + field_count=None, ): if self.debug_print: print(capture[0].show()) @@ -266,9 +310,9 @@ class MethodHolder(VppTestCase): if self.debug_print: print(data) if ip_ver == "v4": - ip_layer = capture[0][IP] + ip_layer = capture[0][IP] if capture[0].haslayer(IP) else None else: - ip_layer = capture[0][IPv6] + ip_layer = capture[0][IPv6] if capture[0].haslayer(IPv6) else None if data_set is not None: for record in data: # skip flow if ingress/egress interface is 0 @@ -304,6 +348,9 @@ class MethodHolder(VppTestCase): self.assertEqual( int(binascii.hexlify(record[field]), 16), value ) + if field_count is not None: + for record in data: + self.assertEqual(len(record), field_count) def verify_cflow_data_notimer(self, decoder, capture, cflows): idx = 0 @@ -337,6 +384,8 @@ class MethodHolder(VppTestCase): @tag_run_solo @tag_fixme_vpp_workers +@tag_fixme_ubuntu2204 +@tag_fixme_debian11 class Flowprobe(MethodHolder): """Template verification, timer tests""" @@ -481,6 +530,158 @@ class Flowprobe(MethodHolder): ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0000") + def test_flow_entry_reuse(self): + """Verify flow entry reuse doesn't accumulate meta info""" + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + # enable ip4 datapath for an interface + # set active and passive timers + ipfix = VppCFLOW( + test=self, + active=2, + passive=3, + intf="pg3", + layer="l3 l4", + datapath="ip4", + direction="rx", + mtu=100, + ) + ipfix.add_vpp_config() + + # template packet should arrive immediately + ipfix_decoder = IPFIXDecoder() + templates = ipfix.verify_templates(ipfix_decoder, count=1) + + # make a tcp packet + self.pkts = [ + ( + Ether(src=self.pg3.remote_mac, dst=self.pg4.local_mac) + / IP(src=self.pg3.remote_ip4, dst=self.pg4.remote_ip4) + / TCP(sport=1234, dport=4321) + / Raw(b"\xa5" * 50) + ) + ] + + # send the tcp packet two times, each time with new set of flags + tcp_flags = ( + TCP_F_SYN | TCP_F_ACK, + TCP_F_RST | TCP_F_PSH, + ) + for f in tcp_flags: + self.pkts[0][TCP].flags = f + capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4) + + # verify meta info - packet/octet delta and tcp flags + cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=6) + self.verify_cflow_data(ipfix_decoder, capture, cflow) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + { + IPFIX_TCP_FLAGS_ID: f, + IPFIX_SRC_TRANS_PORT_ID: 1234, + IPFIX_DST_TRANS_PORT_ID: 4321, + }, + ) + + self.collector.get_capture(3) + + # cleanup + ipfix.remove_vpp_config() + + def test_interface_dump(self): + """Dump interfaces with IPFIX flow record generation enabled""" + self.logger.info("FFP_TEST_START_0003") + + # Enable feature for 3 interfaces + ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx") + ipfix1.add_vpp_config() + + ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx") + ipfix2.enable_flowprobe_feature() + + ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both") + ipfix3.enable_flowprobe_feature() + + # When request "all", dump should contain all enabled interfaces + dump = self.vapi.flowprobe_interface_dump() + self.assertEqual(len(dump), 3) + + # Verify 1st interface + self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index) + self.assertEqual( + dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2 + ) + self.assertEqual( + dump[0].direction, + VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX, + ) + + # Verify 2nd interface + self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index) + self.assertEqual( + dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4 + ) + self.assertEqual( + dump[1].direction, + VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX, + ) + + # Verify 3rd interface + self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index) + self.assertEqual( + dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6 + ) + self.assertEqual( + dump[2].direction, + VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH, + ) + + # When request 2nd interface, dump should contain only the specified interface + dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index) + self.assertEqual(len(dump), 1) + + # Verify 2nd interface + self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index) + self.assertEqual( + dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4 + ) + self.assertEqual( + dump[0].direction, + VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX, + ) + + # When request 99th interface, dump should be empty + dump = self.vapi.flowprobe_interface_dump(sw_if_index=99) + self.assertEqual(len(dump), 0) + + ipfix1.remove_vpp_config() + ipfix2.remove_vpp_config() + ipfix3.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0003") + + def test_get_params(self): + """Get IPFIX flow record generation parameters""" + self.logger.info("FFP_TEST_START_0004") + + # Enable feature for an interface with custom parameters + ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4") + ipfix.add_vpp_config() + + # Get and verify parameters + params = self.vapi.flowprobe_get_params() + self.assertEqual(params.active_timer, 20) + self.assertEqual(params.passive_timer, 40) + record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2 + record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3 + record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4 + self.assertEqual(params.record_flags, record_flags) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0004") + class DatapathTestsHolder(object): """collect information on Ethernet, IP4 and IP6 datapath (no timers)""" @@ -582,6 +783,86 @@ class DatapathTestsHolder(object): ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0002") + def test_L234onL2(self): + """L2/3/4 data on L2 datapath""" + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + ipfix = VppCFLOW( + test=self, intf=self.intf1, layer="l2 l3 l4", direction=self.direction + ) + ipfix.add_vpp_config() + + ipfix_decoder = IPFIXDecoder() + # template packet should arrive immediately + tmpl_l2_field_count = TMPL_COMMON_FIELD_COUNT + TMPL_L2_FIELD_COUNT + tmpl_ip_field_count = ( + TMPL_COMMON_FIELD_COUNT + + TMPL_L2_FIELD_COUNT + + TMPL_L3_FIELD_COUNT + + TMPL_L4_FIELD_COUNT + ) + templates = ipfix.verify_templates( + ipfix_decoder, + count=3, + field_count_in=(tmpl_l2_field_count, tmpl_ip_field_count), + ) + + # verify IPv4 and IPv6 flows + for ip_ver in ("v4", "v6"): + self.create_stream(packets=1, ip_ver=ip_ver) + capture = self.send_packets() + + # make sure the one packet we expect actually showed up + self.vapi.ipfix_flush() + cflow = self.wait_for_cflow_packet( + self.collector, templates[1 if ip_ver == "v4" else 2] + ) + src_ip_id = 8 if ip_ver == "v4" else 27 + dst_ip_id = 12 if ip_ver == "v4" else 28 + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + { + 2: "packets", + 256: 8 if ip_ver == "v4" else 56710, + 4: 17, + 7: "sport", + 11: "dport", + src_ip_id: "src_ip", + dst_ip_id: "dst_ip", + 61: (self.direction == "tx"), + }, + ip_ver=ip_ver, + field_count=tmpl_ip_field_count, + ) + + # verify non-IP flow + self.pkts = [ + ( + Ether(dst=self.pg2.local_mac, src=self.pg1.remote_mac) + / SlowProtocol() + / LACP() + ) + ] + capture = self.send_packets() + + # make sure the one packet we expect actually showed up + self.vapi.ipfix_flush() + cflow = self.wait_for_cflow_packet(self.collector, templates[0]) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 256: 2440, 61: (self.direction == "tx")}, + field_count=tmpl_l2_field_count, + ) + + self.collector.get_capture(6) + + ipfix.remove_vpp_config() + def test_L4onL2(self): """L4 data on L2 datapath""" self.logger.info("FFP_TEST_START_0003") @@ -953,6 +1234,77 @@ class DatapathTx(MethodHolder, DatapathTestsHolder): intf3 = "pg6" direction = "tx" + def test_rewritten_traffic(self): + """Rewritten traffic (from subif to ipfix if)""" + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + # prepare a sub-interface + subif = VppDot1ADSubint(self, self.pg7, 0, 300, 400) + subif.admin_up() + subif.config_ip4() + + # enable ip4 datapath for an interface + ipfix = VppCFLOW( + test=self, + intf="pg8", + datapath="ip4", + layer="l2 l3 l4", + direction=self.direction, + ) + ipfix.add_vpp_config() + + # template packet should arrive immediately + ipfix_decoder = IPFIXDecoder() + templates = ipfix.verify_templates(ipfix_decoder, count=1) + + # forward some traffic through the ipfix interface + route = VppIpRoute( + self, + "9.0.0.0", + 24, + [VppRoutePath(self.pg8.remote_ip4, self.pg8.sw_if_index)], + ) + route.add_vpp_config() + + # prepare an IPv4 packet (subif => ipfix interface) + pkt = ( + Ether(src=subif.remote_mac, dst=self.pg7.local_mac) + / IP(src=subif.remote_ip4, dst="9.0.0.1") + / UDP(sport=1234, dport=4321) + / Raw(b"\xa5" * 123) + ) + self.pkts = [ + subif.add_dot1ad_layer(pkt, 300, 400), + ] + + # send the packet + capture = self.send_packets(self.pg7, self.pg8) + + # wait for a flow and verify it + self.vapi.ipfix_flush() + cflow = self.wait_for_cflow_packet(self.collector, templates[0]) + self.verify_cflow_data(ipfix_decoder, capture, cflow) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + { + IPFIX_SRC_IP4_ADDR_ID: "src_ip", + IPFIX_DST_IP4_ADDR_ID: "dst_ip", + IPFIX_SRC_TRANS_PORT_ID: "sport", + IPFIX_DST_TRANS_PORT_ID: "dport", + IPFIX_FLOW_DIRECTION_ID: (self.direction == "tx"), + }, + ) + + self.collector.get_capture(2) + + # cleanup + route.remove_vpp_config() + subif.remove_vpp_config() + ipfix.remove_vpp_config() + @tag_fixme_vpp_workers class DatapathRx(MethodHolder, DatapathTestsHolder): @@ -1122,9 +1474,70 @@ class DisableFP(MethodHolder): self.sleep(1, "wait before verifying no packets sent") self.collector.assert_nothing_captured() + # enable FPP feature so the remove_vpp_config() doesn't fail + # due to missing feature on interface. + ipfix.enable_flowprobe_feature() + ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") + def test_no_leftover_flows_after_disabling(self): + """disable flowprobe feature and expect no leftover flows""" + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + # enable ip4 datapath for an interface + # set active and passive timers + ipfix = VppCFLOW( + test=self, + active=3, + passive=4, + intf="pg3", + layer="l3", + datapath="ip4", + direction="rx", + mtu=100, + ) + ipfix.add_vpp_config() + + # template packet should arrive immediately + ipfix.verify_templates(count=1) + + # send some ip4 packets + self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5) + self.send_packets(src_if=self.pg3, dst_if=self.pg4) + + # disable feature for the interface + # currently stored ip4 flows should be removed + ipfix.disable_flowprobe_feature() + + # no leftover ip4 flows are expected + self.pg_enable_capture([self.collector]) + self.sleep(12, "wait for leftover ip4 flows during three passive intervals") + self.collector.assert_nothing_captured() + + # re-enable feature for the interface + ipfix.enable_flowprobe_feature() + + # template packet should arrive immediately + ipfix_decoder = IPFIXDecoder() + self.vapi.ipfix_flush() + templates = ipfix.verify_templates(ipfix_decoder, count=1) + + # send some ip4 packets + self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5) + capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4) + + # verify meta info - packet/octet delta + self.vapi.ipfix_flush() + cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=8) + self.verify_cflow_data(ipfix_decoder, capture, cflow) + + self.collector.get_capture(2) + + # cleanup + ipfix.remove_vpp_config() + @unittest.skipUnless(config.extended, "part of extended tests") class ReenableFP(MethodHolder):