2 from __future__ import print_function
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, TCP, UDP
13 from scapy.layers.inet6 import IPv6
14 from scapy.contrib.lacp import SlowProtocol, LACP
16 from config import config
17 from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11
18 from framework import is_distro_ubuntu2204, is_distro_debian11
19 from framework import VppTestCase, VppTestRunner
20 from framework import tag_run_solo
21 from vpp_object import VppObject
22 from vpp_pg_interface import CaptureTimeoutError
24 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
25 from vpp_ip_route import VppIpRoute, VppRoutePath
26 from vpp_papi.macaddress import mac_ntop
27 from socket import inet_ntop
28 from vpp_papi import VppEnum
31 TMPL_COMMON_FIELD_COUNT = 6
32 TMPL_L2_FIELD_COUNT = 3
33 TMPL_L3_FIELD_COUNT = 4
34 TMPL_L4_FIELD_COUNT = 3
36 IPFIX_TCP_FLAGS_ID = 6
37 IPFIX_SRC_TRANS_PORT_ID = 7
38 IPFIX_DST_TRANS_PORT_ID = 11
50 class VppCFLOW(VppObject):
51 """CFLOW object for IPFIX exporter and Flowprobe feature"""
67 self._intf_obj = getattr(self._test, intf)
69 if passive == 0 or passive < active:
70 self._passive = active + 1
72 self._passive = passive
73 self._datapath = datapath # l2 ip4 ip6
74 self._collect = layer # l2 l3 l4
75 self._direction = direction # rx tx both
76 self._timeout = timeout
78 self._configured = False
80 def add_vpp_config(self):
81 self.enable_exporter()
85 if "l2" in self._collect.lower():
86 l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
87 if "l3" in self._collect.lower():
88 l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
89 if "l4" in self._collect.lower():
90 l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
91 self._test.vapi.flowprobe_set_params(
92 record_flags=(l2_flag | l3_flag | l4_flag),
93 active_timer=self._active,
94 passive_timer=self._passive,
96 self.enable_flowprobe_feature()
97 self._test.vapi.cli("ipfix flush")
98 self._configured = True
100 def remove_vpp_config(self):
101 self.disable_exporter()
102 self.disable_flowprobe_feature()
103 self._test.vapi.cli("ipfix flush")
104 self._configured = False
106 def enable_exporter(self):
107 self._test.vapi.set_ipfix_exporter(
108 collector_address=self._test.pg0.remote_ip4,
109 src_address=self._test.pg0.local_ip4,
111 template_interval=self._timeout,
114 def _enable_disable_flowprobe_feature(self, is_add):
116 "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
117 "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
118 "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
121 "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
122 "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
123 "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
125 self._test.vapi.flowprobe_interface_add_del(
127 which=which_map[self._datapath],
128 direction=direction_map[self._direction],
129 sw_if_index=self._intf_obj.sw_if_index,
132 def enable_flowprobe_feature(self):
133 self._enable_disable_flowprobe_feature(is_add=True)
135 def disable_exporter(self):
136 self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
138 def disable_flowprobe_feature(self):
139 self._enable_disable_flowprobe_feature(is_add=False)
142 return "ipfix-collector-%s-%s" % (self._src, self.dst)
144 def query_vpp_config(self):
145 return self._configured
147 def verify_templates(self, decoder=None, timeout=1, count=3, field_count_in=None):
149 self._test.assertIn(count, (1, 2, 3))
150 for _ in range(count):
151 p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
152 self._test.assertTrue(p.haslayer(IPFIX))
153 self._test.assertTrue(p.haslayer(Template))
154 if decoder is not None:
155 templates.append(p[Template].templateID)
156 decoder.add_template(p.getlayer(Template))
157 if field_count_in is not None:
158 self._test.assertIn(p[Template].fieldCount, field_count_in)
162 class MethodHolder(VppTestCase):
163 """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
167 max_number_of_packets = 10
173 Perform standard class setup (defined by class method setUpClass in
174 class VppTestCase) before running the test case, set test case related
175 variables and configure VPP.
177 super(MethodHolder, cls).setUpClass()
178 if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
183 # Create pg interfaces
184 cls.create_pg_interfaces(range(9))
187 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
189 # Create BD with MAC learning and unknown unicast flooding disabled
190 # and put interfaces to this BD
191 cls.vapi.bridge_domain_add_del_v2(
192 bd_id=1, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
194 cls.vapi.sw_interface_set_l2_bridge(
195 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
197 cls.vapi.sw_interface_set_l2_bridge(
198 rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
201 # Set up all interfaces
202 for i in cls.pg_interfaces:
206 cls.pg0.configure_ipv4_neighbors()
207 cls.collector = cls.pg0
210 cls.pg1.resolve_arp()
212 cls.pg2.resolve_arp()
214 cls.pg3.resolve_arp()
216 cls.pg4.resolve_arp()
219 cls.pg8.configure_ipv4_neighbors()
222 cls.pg5.resolve_ndp()
223 cls.pg5.disable_ipv6_ra()
225 cls.pg6.resolve_ndp()
226 cls.pg6.disable_ipv6_ra()
228 super(MethodHolder, cls).tearDownClass()
232 def tearDownClass(cls):
233 super(MethodHolder, cls).tearDownClass()
236 self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
238 """Create a packet stream to tickle the plugin
240 :param VppInterface src_if: Source interface for packet stream
241 :param VppInterface src_if: Dst interface for packet stream
249 packets = random.randint(1, self.max_number_of_packets)
251 for p in range(0, packets):
253 pkt_size = random.choice(self.pg_if_packet_sizes)
254 info = self.create_packet_info(src_if, dst_if)
255 payload = self.info_to_payload(info)
256 p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
258 p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
260 p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
261 p /= UDP(sport=1234, dport=4321)
264 self.extend_packet(p, pkt_size)
267 def verify_cflow_data(self, decoder, capture, cflow):
273 if cflow.haslayer(Data):
274 data = decoder.decode_data_set(cflow.getlayer(Set))
276 self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
277 self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
279 def send_packets(self, src_if=None, dst_if=None):
284 self.pg_enable_capture([dst_if])
285 src_if.add_stream(self.pkts)
287 return dst_if.get_capture(len(self.pkts))
289 def verify_cflow_data_detail(
294 data_set={1: "octets", 2: "packets"},
299 print(capture[0].show())
300 if cflow.haslayer(Data):
301 data = decoder.decode_data_set(cflow.getlayer(Set))
305 ip_layer = capture[0][IP] if capture[0].haslayer(IP) else None
307 ip_layer = capture[0][IPv6] if capture[0].haslayer(IPv6) else None
308 if data_set is not None:
310 # skip flow if ingress/egress interface is 0
311 if int(binascii.hexlify(record[10]), 16) == 0:
313 if int(binascii.hexlify(record[14]), 16) == 0:
316 for field in data_set:
317 value = data_set[field]
318 if value == "octets":
321 value += 40 # ??? is this correct
322 elif value == "packets":
324 elif value == "src_ip":
326 ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
328 ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
329 value = int(binascii.hexlify(ip), 16)
330 elif value == "dst_ip":
332 ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
334 ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
335 value = int(binascii.hexlify(ip), 16)
336 elif value == "sport":
337 value = int(capture[0][UDP].sport)
338 elif value == "dport":
339 value = int(capture[0][UDP].dport)
341 int(binascii.hexlify(record[field]), 16), value
343 if field_count is not None:
345 self.assertEqual(len(record), field_count)
347 def verify_cflow_data_notimer(self, decoder, capture, cflows):
350 if cflow.haslayer(Data):
351 data = decoder.decode_data_set(cflow.getlayer(Set))
353 raise Exception("No CFLOW data")
358 self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
359 self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
360 self.assertEqual(len(capture), idx)
362 def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
363 """wait for CFLOW packet and verify its correctness
365 :param timeout: how long to wait
368 self.logger.info("IPFIX: Waiting for CFLOW packet")
369 # self.logger.debug(self.vapi.ppcli("show flow table"))
370 p = collector_intf.wait_for_packet(timeout=timeout)
371 self.assertEqual(p[Set].setID, set_id)
372 # self.logger.debug(self.vapi.ppcli("show flow table"))
373 self.logger.debug(ppp("IPFIX: Got packet:", p))
378 @tag_fixme_vpp_workers
379 @tag_fixme_ubuntu2204
381 class Flowprobe(MethodHolder):
382 """Template verification, timer tests"""
386 super(Flowprobe, cls).setUpClass()
389 def tearDownClass(cls):
390 super(Flowprobe, cls).tearDownClass()
393 """timer less than template timeout"""
394 self.logger.info("FFP_TEST_START_0001")
395 self.pg_enable_capture(self.pg_interfaces)
398 ipfix = VppCFLOW(test=self, active=2)
399 ipfix.add_vpp_config()
401 ipfix_decoder = IPFIXDecoder()
402 # template packet should arrive immediately
403 templates = ipfix.verify_templates(ipfix_decoder)
405 self.create_stream(packets=1)
407 capture = self.pg2.get_capture(1)
409 # make sure the one packet we expect actually showed up
410 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
411 self.verify_cflow_data(ipfix_decoder, capture, cflow)
413 ipfix.remove_vpp_config()
414 self.logger.info("FFP_TEST_FINISH_0001")
417 """timer greater than template timeout"""
418 self.logger.info("FFP_TEST_START_0002")
419 self.pg_enable_capture(self.pg_interfaces)
422 ipfix = VppCFLOW(test=self, timeout=3, active=4)
423 ipfix.add_vpp_config()
425 ipfix_decoder = IPFIXDecoder()
426 # template packet should arrive immediately
427 ipfix.verify_templates()
429 self.create_stream(packets=2)
431 capture = self.pg2.get_capture(2)
433 # next set of template packet should arrive after 20 seconds
434 # template packet should arrive within 20 s
435 templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
437 # make sure the one packet we expect actually showed up
438 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
439 self.verify_cflow_data(ipfix_decoder, capture, cflow)
441 ipfix.remove_vpp_config()
442 self.logger.info("FFP_TEST_FINISH_0002")
444 def test_cflow_packet(self):
445 """verify cflow packet fields"""
446 self.logger.info("FFP_TEST_START_0000")
447 self.pg_enable_capture(self.pg_interfaces)
451 test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
453 ipfix.add_vpp_config()
455 route_9001 = VppIpRoute(
459 [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
461 route_9001.add_vpp_config()
463 ipfix_decoder = IPFIXDecoder()
464 templates = ipfix.verify_templates(ipfix_decoder, count=1)
468 Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
469 / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
470 / TCP(sport=1234, dport=4321, flags=80)
475 nowUTC = int(time.time())
476 nowUNIX = nowUTC + 2208988800
477 self.send_packets(src_if=self.pg7, dst_if=self.pg8)
479 cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
480 self.collector.get_capture(2)
482 if cflow[0].haslayer(IPFIX):
483 self.assertEqual(cflow[IPFIX].version, 10)
484 self.assertEqual(cflow[IPFIX].observationDomainID, 1)
485 self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
486 self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
487 if cflow.haslayer(Data):
488 record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
490 self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
492 self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
494 self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
496 self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
498 self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
500 self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
501 flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
502 # flow start timestamp
503 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
504 flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
506 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
508 self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
510 self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
512 self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
514 self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
516 self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
518 self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
520 self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
522 ipfix.remove_vpp_config()
523 self.logger.info("FFP_TEST_FINISH_0000")
525 def test_flow_entry_reuse(self):
526 """Verify flow entry reuse doesn't accumulate meta info"""
527 self.pg_enable_capture(self.pg_interfaces)
530 # enable ip4 datapath for an interface
531 # set active and passive timers
542 ipfix.add_vpp_config()
544 # template packet should arrive immediately
545 ipfix_decoder = IPFIXDecoder()
546 templates = ipfix.verify_templates(ipfix_decoder, count=1)
551 Ether(src=self.pg3.remote_mac, dst=self.pg4.local_mac)
552 / IP(src=self.pg3.remote_ip4, dst=self.pg4.remote_ip4)
553 / TCP(sport=1234, dport=4321)
558 # send the tcp packet two times, each time with new set of flags
560 TCP_F_SYN | TCP_F_ACK,
561 TCP_F_RST | TCP_F_PSH,
564 self.pkts[0][TCP].flags = f
565 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
567 # verify meta info - packet/octet delta and tcp flags
568 cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=6)
569 self.verify_cflow_data(ipfix_decoder, capture, cflow)
570 self.verify_cflow_data_detail(
575 IPFIX_TCP_FLAGS_ID: f,
576 IPFIX_SRC_TRANS_PORT_ID: 1234,
577 IPFIX_DST_TRANS_PORT_ID: 4321,
581 self.collector.get_capture(3)
584 ipfix.remove_vpp_config()
586 def test_interface_dump(self):
587 """Dump interfaces with IPFIX flow record generation enabled"""
588 self.logger.info("FFP_TEST_START_0003")
590 # Enable feature for 3 interfaces
591 ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx")
592 ipfix1.add_vpp_config()
594 ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx")
595 ipfix2.enable_flowprobe_feature()
597 ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both")
598 ipfix3.enable_flowprobe_feature()
600 # When request "all", dump should contain all enabled interfaces
601 dump = self.vapi.flowprobe_interface_dump()
602 self.assertEqual(len(dump), 3)
604 # Verify 1st interface
605 self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index)
607 dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2
611 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
614 # Verify 2nd interface
615 self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index)
617 dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
621 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
624 # Verify 3rd interface
625 self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index)
627 dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6
631 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
634 # When request 2nd interface, dump should contain only the specified interface
635 dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index)
636 self.assertEqual(len(dump), 1)
638 # Verify 2nd interface
639 self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index)
641 dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
645 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
648 # When request 99th interface, dump should be empty
649 dump = self.vapi.flowprobe_interface_dump(sw_if_index=99)
650 self.assertEqual(len(dump), 0)
652 ipfix1.remove_vpp_config()
653 ipfix2.remove_vpp_config()
654 ipfix3.remove_vpp_config()
655 self.logger.info("FFP_TEST_FINISH_0003")
657 def test_get_params(self):
658 """Get IPFIX flow record generation parameters"""
659 self.logger.info("FFP_TEST_START_0004")
661 # Enable feature for an interface with custom parameters
662 ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4")
663 ipfix.add_vpp_config()
665 # Get and verify parameters
666 params = self.vapi.flowprobe_get_params()
667 self.assertEqual(params.active_timer, 20)
668 self.assertEqual(params.passive_timer, 40)
669 record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
670 record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
671 record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
672 self.assertEqual(params.record_flags, record_flags)
674 ipfix.remove_vpp_config()
675 self.logger.info("FFP_TEST_FINISH_0004")
678 class DatapathTestsHolder(object):
679 """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
683 super(DatapathTestsHolder, cls).setUpClass()
686 def tearDownClass(cls):
687 super(DatapathTestsHolder, cls).tearDownClass()
689 def test_templatesL2(self):
690 """verify template on L2 datapath"""
691 self.logger.info("FFP_TEST_START_0000")
692 self.pg_enable_capture(self.pg_interfaces)
695 test=self, intf=self.intf1, layer="l2", direction=self.direction
697 ipfix.add_vpp_config()
699 # template packet should arrive immediately
700 self.vapi.ipfix_flush()
701 ipfix.verify_templates(timeout=3, count=1)
702 self.collector.get_capture(1)
704 ipfix.remove_vpp_config()
705 self.logger.info("FFP_TEST_FINISH_0000")
707 def test_L2onL2(self):
708 """L2 data on L2 datapath"""
709 self.logger.info("FFP_TEST_START_0001")
710 self.pg_enable_capture(self.pg_interfaces)
714 test=self, intf=self.intf1, layer="l2", direction=self.direction
716 ipfix.add_vpp_config()
718 ipfix_decoder = IPFIXDecoder()
719 # template packet should arrive immediately
720 templates = ipfix.verify_templates(ipfix_decoder, count=1)
722 self.create_stream(packets=1)
723 capture = self.send_packets()
725 # make sure the one packet we expect actually showed up
726 self.vapi.ipfix_flush()
727 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
728 self.verify_cflow_data_detail(
732 {2: "packets", 256: 8, 61: (self.direction == "tx")},
734 self.collector.get_capture(2)
736 ipfix.remove_vpp_config()
737 self.logger.info("FFP_TEST_FINISH_0001")
739 def test_L3onL2(self):
740 """L3 data on L2 datapath"""
741 self.logger.info("FFP_TEST_START_0002")
742 self.pg_enable_capture(self.pg_interfaces)
746 test=self, intf=self.intf1, layer="l3", direction=self.direction
748 ipfix.add_vpp_config()
750 ipfix_decoder = IPFIXDecoder()
751 # template packet should arrive immediately
752 templates = ipfix.verify_templates(ipfix_decoder, count=2)
754 self.create_stream(packets=1)
755 capture = self.send_packets()
757 # make sure the one packet we expect actually showed up
758 self.vapi.ipfix_flush()
759 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
760 self.verify_cflow_data_detail(
769 61: (self.direction == "tx"),
773 self.collector.get_capture(3)
775 ipfix.remove_vpp_config()
776 self.logger.info("FFP_TEST_FINISH_0002")
778 def test_L234onL2(self):
779 """L2/3/4 data on L2 datapath"""
780 self.pg_enable_capture(self.pg_interfaces)
784 test=self, intf=self.intf1, layer="l2 l3 l4", direction=self.direction
786 ipfix.add_vpp_config()
788 ipfix_decoder = IPFIXDecoder()
789 # template packet should arrive immediately
790 tmpl_l2_field_count = TMPL_COMMON_FIELD_COUNT + TMPL_L2_FIELD_COUNT
791 tmpl_ip_field_count = (
792 TMPL_COMMON_FIELD_COUNT
793 + TMPL_L2_FIELD_COUNT
794 + TMPL_L3_FIELD_COUNT
795 + TMPL_L4_FIELD_COUNT
797 templates = ipfix.verify_templates(
800 field_count_in=(tmpl_l2_field_count, tmpl_ip_field_count),
803 # verify IPv4 and IPv6 flows
804 for ip_ver in ("v4", "v6"):
805 self.create_stream(packets=1, ip_ver=ip_ver)
806 capture = self.send_packets()
808 # make sure the one packet we expect actually showed up
809 self.vapi.ipfix_flush()
810 cflow = self.wait_for_cflow_packet(
811 self.collector, templates[1 if ip_ver == "v4" else 2]
813 src_ip_id = 8 if ip_ver == "v4" else 27
814 dst_ip_id = 12 if ip_ver == "v4" else 28
815 self.verify_cflow_data_detail(
821 256: 8 if ip_ver == "v4" else 56710,
827 61: (self.direction == "tx"),
830 field_count=tmpl_ip_field_count,
836 Ether(dst=self.pg2.local_mac, src=self.pg1.remote_mac)
841 capture = self.send_packets()
843 # make sure the one packet we expect actually showed up
844 self.vapi.ipfix_flush()
845 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
846 self.verify_cflow_data_detail(
850 {2: "packets", 256: 2440, 61: (self.direction == "tx")},
851 field_count=tmpl_l2_field_count,
854 self.collector.get_capture(6)
856 ipfix.remove_vpp_config()
858 def test_L4onL2(self):
859 """L4 data on L2 datapath"""
860 self.logger.info("FFP_TEST_START_0003")
861 self.pg_enable_capture(self.pg_interfaces)
865 test=self, intf=self.intf1, layer="l4", direction=self.direction
867 ipfix.add_vpp_config()
869 ipfix_decoder = IPFIXDecoder()
870 # template packet should arrive immediately
871 templates = ipfix.verify_templates(ipfix_decoder, count=2)
873 self.create_stream(packets=1)
874 capture = self.send_packets()
876 # make sure the one packet we expect actually showed up
877 self.vapi.ipfix_flush()
878 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
879 self.verify_cflow_data_detail(
883 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
886 self.collector.get_capture(3)
888 ipfix.remove_vpp_config()
889 self.logger.info("FFP_TEST_FINISH_0003")
891 def test_templatesIp4(self):
892 """verify templates on IP4 datapath"""
893 self.logger.info("FFP_TEST_START_0000")
895 self.pg_enable_capture(self.pg_interfaces)
898 test=self, intf=self.intf1, datapath="ip4", direction=self.direction
900 ipfix.add_vpp_config()
902 # template packet should arrive immediately
903 self.vapi.ipfix_flush()
904 ipfix.verify_templates(timeout=3, count=1)
905 self.collector.get_capture(1)
907 ipfix.remove_vpp_config()
909 self.logger.info("FFP_TEST_FINISH_0000")
911 def test_L2onIP4(self):
912 """L2 data on IP4 datapath"""
913 self.logger.info("FFP_TEST_START_0001")
914 self.pg_enable_capture(self.pg_interfaces)
922 direction=self.direction,
924 ipfix.add_vpp_config()
926 ipfix_decoder = IPFIXDecoder()
927 # template packet should arrive immediately
928 templates = ipfix.verify_templates(ipfix_decoder, count=1)
930 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
931 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
933 # make sure the one packet we expect actually showed up
934 self.vapi.ipfix_flush()
935 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
936 self.verify_cflow_data_detail(
940 {2: "packets", 256: 8, 61: (self.direction == "tx")},
943 # expected two templates and one cflow packet
944 self.collector.get_capture(2)
946 ipfix.remove_vpp_config()
947 self.logger.info("FFP_TEST_FINISH_0001")
949 def test_L3onIP4(self):
950 """L3 data on IP4 datapath"""
951 self.logger.info("FFP_TEST_START_0002")
952 self.pg_enable_capture(self.pg_interfaces)
960 direction=self.direction,
962 ipfix.add_vpp_config()
964 ipfix_decoder = IPFIXDecoder()
965 # template packet should arrive immediately
966 templates = ipfix.verify_templates(ipfix_decoder, count=1)
968 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
969 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
971 # make sure the one packet we expect actually showed up
972 self.vapi.ipfix_flush()
973 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
974 self.verify_cflow_data_detail(
983 61: (self.direction == "tx"),
987 # expected two templates and one cflow packet
988 self.collector.get_capture(2)
990 ipfix.remove_vpp_config()
991 self.logger.info("FFP_TEST_FINISH_0002")
993 def test_L4onIP4(self):
994 """L4 data on IP4 datapath"""
995 self.logger.info("FFP_TEST_START_0003")
996 self.pg_enable_capture(self.pg_interfaces)
1004 direction=self.direction,
1006 ipfix.add_vpp_config()
1008 ipfix_decoder = IPFIXDecoder()
1009 # template packet should arrive immediately
1010 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1012 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
1013 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1015 # make sure the one packet we expect actually showed up
1016 self.vapi.ipfix_flush()
1017 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1018 self.verify_cflow_data_detail(
1022 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
1025 # expected two templates and one cflow packet
1026 self.collector.get_capture(2)
1028 ipfix.remove_vpp_config()
1029 self.logger.info("FFP_TEST_FINISH_0003")
1031 def test_templatesIP6(self):
1032 """verify templates on IP6 datapath"""
1033 self.logger.info("FFP_TEST_START_0000")
1034 self.pg_enable_capture(self.pg_interfaces)
1037 test=self, intf=self.intf1, datapath="ip6", direction=self.direction
1039 ipfix.add_vpp_config()
1041 # template packet should arrive immediately
1042 ipfix.verify_templates(count=1)
1043 self.collector.get_capture(1)
1045 ipfix.remove_vpp_config()
1047 self.logger.info("FFP_TEST_FINISH_0000")
1049 def test_L2onIP6(self):
1050 """L2 data on IP6 datapath"""
1051 self.logger.info("FFP_TEST_START_0001")
1052 self.pg_enable_capture(self.pg_interfaces)
1060 direction=self.direction,
1062 ipfix.add_vpp_config()
1064 ipfix_decoder = IPFIXDecoder()
1065 # template packet should arrive immediately
1066 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1068 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1069 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1071 # make sure the one packet we expect actually showed up
1072 self.vapi.ipfix_flush()
1073 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1074 self.verify_cflow_data_detail(
1078 {2: "packets", 256: 56710, 61: (self.direction == "tx")},
1082 # expected two templates and one cflow packet
1083 self.collector.get_capture(2)
1085 ipfix.remove_vpp_config()
1086 self.logger.info("FFP_TEST_FINISH_0001")
1088 def test_L3onIP6(self):
1089 """L3 data on IP6 datapath"""
1090 self.logger.info("FFP_TEST_START_0002")
1091 self.pg_enable_capture(self.pg_interfaces)
1099 direction=self.direction,
1101 ipfix.add_vpp_config()
1103 ipfix_decoder = IPFIXDecoder()
1104 # template packet should arrive immediately
1105 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1107 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1108 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1110 # make sure the one packet we expect actually showed up
1111 self.vapi.ipfix_flush()
1112 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1113 self.verify_cflow_data_detail(
1117 {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
1121 # expected two templates and one cflow packet
1122 self.collector.get_capture(2)
1124 ipfix.remove_vpp_config()
1125 self.logger.info("FFP_TEST_FINISH_0002")
1127 def test_L4onIP6(self):
1128 """L4 data on IP6 datapath"""
1129 self.logger.info("FFP_TEST_START_0003")
1130 self.pg_enable_capture(self.pg_interfaces)
1138 direction=self.direction,
1140 ipfix.add_vpp_config()
1142 ipfix_decoder = IPFIXDecoder()
1143 # template packet should arrive immediately
1144 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1146 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1147 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1149 # make sure the one packet we expect actually showed up
1150 self.vapi.ipfix_flush()
1151 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1152 self.verify_cflow_data_detail(
1156 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
1160 # expected two templates and one cflow packet
1161 self.collector.get_capture(2)
1163 ipfix.remove_vpp_config()
1164 self.logger.info("FFP_TEST_FINISH_0003")
1166 def test_0001(self):
1167 """no timers, one CFLOW packet, 9 Flows inside"""
1168 self.logger.info("FFP_TEST_START_0001")
1169 self.pg_enable_capture(self.pg_interfaces)
1172 ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
1173 ipfix.add_vpp_config()
1175 ipfix_decoder = IPFIXDecoder()
1176 # template packet should arrive immediately
1177 templates = ipfix.verify_templates(ipfix_decoder)
1179 self.create_stream(packets=9)
1180 capture = self.send_packets()
1182 # make sure the one packet we expect actually showed up
1183 self.vapi.ipfix_flush()
1184 cflow = self.wait_for_cflow_packet(self.collector, templates[1])
1185 self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
1186 self.collector.get_capture(4)
1188 ipfix.remove_vpp_config()
1189 self.logger.info("FFP_TEST_FINISH_0001")
1191 def test_0002(self):
1192 """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
1193 self.logger.info("FFP_TEST_START_0002")
1194 self.pg_enable_capture(self.pg_interfaces)
1197 ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
1198 ipfix.add_vpp_config()
1200 ipfix_decoder = IPFIXDecoder()
1201 # template packet should arrive immediately
1202 self.vapi.ipfix_flush()
1203 templates = ipfix.verify_templates(ipfix_decoder)
1205 self.create_stream(packets=6)
1206 capture = self.send_packets()
1208 # make sure the one packet we expect actually showed up
1210 self.vapi.ipfix_flush()
1211 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1212 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1213 self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
1214 self.collector.get_capture(5)
1216 ipfix.remove_vpp_config()
1217 self.logger.info("FFP_TEST_FINISH_0002")
1220 @tag_fixme_vpp_workers
1221 class DatapathTx(MethodHolder, DatapathTestsHolder):
1222 """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
1230 @tag_fixme_vpp_workers
1231 class DatapathRx(MethodHolder, DatapathTestsHolder):
1232 """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
1240 @unittest.skipUnless(config.extended, "part of extended tests")
1241 class DisableIPFIX(MethodHolder):
1245 def setUpClass(cls):
1246 super(DisableIPFIX, cls).setUpClass()
1249 def tearDownClass(cls):
1250 super(DisableIPFIX, cls).tearDownClass()
1252 def test_0001(self):
1253 """disable IPFIX after first packets"""
1254 self.logger.info("FFP_TEST_START_0001")
1255 self.pg_enable_capture(self.pg_interfaces)
1258 ipfix = VppCFLOW(test=self)
1259 ipfix.add_vpp_config()
1261 ipfix_decoder = IPFIXDecoder()
1262 # template packet should arrive immediately
1263 templates = ipfix.verify_templates(ipfix_decoder)
1265 self.create_stream()
1268 # make sure the one packet we expect actually showed up
1269 self.vapi.ipfix_flush()
1270 self.wait_for_cflow_packet(self.collector, templates[1])
1271 self.collector.get_capture(4)
1274 ipfix.disable_exporter()
1275 self.pg_enable_capture([self.collector])
1279 # make sure no one packet arrived in 1 minute
1280 self.vapi.ipfix_flush()
1281 self.sleep(1, "wait before verifying no packets sent")
1282 self.collector.assert_nothing_captured()
1284 ipfix.remove_vpp_config()
1285 self.logger.info("FFP_TEST_FINISH_0001")
1288 @unittest.skipUnless(config.extended, "part of extended tests")
1289 class ReenableIPFIX(MethodHolder):
1290 """Re-enable IPFIX"""
1293 def setUpClass(cls):
1294 super(ReenableIPFIX, cls).setUpClass()
1297 def tearDownClass(cls):
1298 super(ReenableIPFIX, cls).tearDownClass()
1300 def test_0011(self):
1301 """disable IPFIX after first packets and re-enable after few packets"""
1302 self.logger.info("FFP_TEST_START_0001")
1303 self.pg_enable_capture(self.pg_interfaces)
1306 ipfix = VppCFLOW(test=self)
1307 ipfix.add_vpp_config()
1309 ipfix_decoder = IPFIXDecoder()
1310 # template packet should arrive immediately
1311 templates = ipfix.verify_templates(ipfix_decoder)
1313 self.create_stream(packets=5)
1316 # make sure the one packet we expect actually showed up
1317 self.vapi.ipfix_flush()
1318 self.wait_for_cflow_packet(self.collector, templates[1])
1319 self.collector.get_capture(4)
1322 ipfix.disable_exporter()
1323 self.vapi.ipfix_flush()
1324 self.pg_enable_capture([self.collector])
1328 # make sure no one packet arrived in active timer span
1329 self.vapi.ipfix_flush()
1330 self.sleep(1, "wait before verifying no packets sent")
1331 self.collector.assert_nothing_captured()
1332 self.pg2.get_capture(5)
1335 ipfix.enable_exporter()
1337 capture = self.collector.get_capture(4)
1341 self.assertTrue(p.haslayer(IPFIX))
1342 if p.haslayer(Template):
1344 self.assertTrue(nr_templates, 3)
1346 self.assertTrue(p.haslayer(IPFIX))
1347 if p.haslayer(Data):
1349 self.assertTrue(nr_templates, 1)
1351 ipfix.remove_vpp_config()
1352 self.logger.info("FFP_TEST_FINISH_0001")
1355 @unittest.skipUnless(config.extended, "part of extended tests")
1356 class DisableFP(MethodHolder):
1357 """Disable Flowprobe feature"""
1360 def setUpClass(cls):
1361 super(DisableFP, cls).setUpClass()
1364 def tearDownClass(cls):
1365 super(DisableFP, cls).tearDownClass()
1367 def test_0001(self):
1368 """disable flowprobe feature after first packets"""
1369 self.logger.info("FFP_TEST_START_0001")
1370 self.pg_enable_capture(self.pg_interfaces)
1372 ipfix = VppCFLOW(test=self)
1373 ipfix.add_vpp_config()
1375 ipfix_decoder = IPFIXDecoder()
1376 # template packet should arrive immediately
1377 templates = ipfix.verify_templates(ipfix_decoder)
1379 self.create_stream()
1382 # make sure the one packet we expect actually showed up
1383 self.vapi.ipfix_flush()
1384 self.wait_for_cflow_packet(self.collector, templates[1])
1385 self.collector.get_capture(4)
1388 ipfix.disable_flowprobe_feature()
1389 self.pg_enable_capture([self.collector])
1393 # make sure no one packet arrived in active timer span
1394 self.vapi.ipfix_flush()
1395 self.sleep(1, "wait before verifying no packets sent")
1396 self.collector.assert_nothing_captured()
1398 ipfix.remove_vpp_config()
1399 self.logger.info("FFP_TEST_FINISH_0001")
1401 def test_no_leftover_flows_after_disabling(self):
1402 """disable flowprobe feature and expect no leftover flows"""
1403 self.pg_enable_capture(self.pg_interfaces)
1406 # enable ip4 datapath for an interface
1407 # set active and passive timers
1418 ipfix.add_vpp_config()
1420 # template packet should arrive immediately
1421 ipfix.verify_templates(count=1)
1423 # send some ip4 packets
1424 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5)
1425 self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1427 # disable feature for the interface
1428 # currently stored ip4 flows should be removed
1429 ipfix.disable_flowprobe_feature()
1431 # no leftover ip4 flows are expected
1432 self.pg_enable_capture([self.collector])
1433 self.sleep(12, "wait for leftover ip4 flows during three passive intervals")
1434 self.collector.assert_nothing_captured()
1437 ipfix.disable_exporter()
1440 @unittest.skipUnless(config.extended, "part of extended tests")
1441 class ReenableFP(MethodHolder):
1442 """Re-enable Flowprobe feature"""
1445 def setUpClass(cls):
1446 super(ReenableFP, cls).setUpClass()
1449 def tearDownClass(cls):
1450 super(ReenableFP, cls).tearDownClass()
1452 def test_0001(self):
1453 """disable flowprobe feature after first packets and re-enable
1454 after few packets"""
1455 self.logger.info("FFP_TEST_START_0001")
1456 self.pg_enable_capture(self.pg_interfaces)
1459 ipfix = VppCFLOW(test=self)
1460 ipfix.add_vpp_config()
1462 ipfix_decoder = IPFIXDecoder()
1463 # template packet should arrive immediately
1464 self.vapi.ipfix_flush()
1465 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1467 self.create_stream()
1470 # make sure the one packet we expect actually showed up
1471 self.vapi.ipfix_flush()
1472 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1473 self.collector.get_capture(4)
1475 # disable FPP feature
1476 ipfix.disable_flowprobe_feature()
1477 self.pg_enable_capture([self.collector])
1481 # make sure no one packet arrived in active timer span
1482 self.vapi.ipfix_flush()
1483 self.sleep(5, "wait before verifying no packets sent")
1484 self.collector.assert_nothing_captured()
1486 # enable FPP feature
1487 ipfix.enable_flowprobe_feature()
1488 self.vapi.ipfix_flush()
1489 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1493 # make sure the next packets (templates and data) we expect actually
1495 self.vapi.ipfix_flush()
1496 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1497 self.collector.get_capture(4)
1499 ipfix.remove_vpp_config()
1500 self.logger.info("FFP_TEST_FINISH_0001")
1503 if __name__ == "__main__":
1504 unittest.main(testRunner=VppTestRunner)