2 from __future__ import print_function
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, TCP, UDP
13 from scapy.layers.inet6 import IPv6
15 from config import config
16 from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11
17 from framework import is_distro_ubuntu2204, is_distro_debian11
18 from framework import VppTestCase, VppTestRunner
19 from framework import tag_run_solo
20 from vpp_object import VppObject
21 from vpp_pg_interface import CaptureTimeoutError
23 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
24 from vpp_ip_route import VppIpRoute, VppRoutePath
25 from vpp_papi.macaddress import mac_ntop
26 from socket import inet_ntop
27 from vpp_papi import VppEnum
30 class VppCFLOW(VppObject):
31 """CFLOW object for IPFIX exporter and Flowprobe feature"""
47 self._intf_obj = getattr(self._test, intf)
49 if passive == 0 or passive < active:
50 self._passive = active + 1
52 self._passive = passive
53 self._datapath = datapath # l2 ip4 ip6
54 self._collect = layer # l2 l3 l4
55 self._direction = direction # rx tx both
56 self._timeout = timeout
58 self._configured = False
60 def add_vpp_config(self):
61 self.enable_exporter()
65 if "l2" in self._collect.lower():
66 l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
67 if "l3" in self._collect.lower():
68 l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
69 if "l4" in self._collect.lower():
70 l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
71 self._test.vapi.flowprobe_set_params(
72 record_flags=(l2_flag | l3_flag | l4_flag),
73 active_timer=self._active,
74 passive_timer=self._passive,
76 self.enable_flowprobe_feature()
77 self._test.vapi.cli("ipfix flush")
78 self._configured = True
80 def remove_vpp_config(self):
81 self.disable_exporter()
82 self.disable_flowprobe_feature()
83 self._test.vapi.cli("ipfix flush")
84 self._configured = False
86 def enable_exporter(self):
87 self._test.vapi.set_ipfix_exporter(
88 collector_address=self._test.pg0.remote_ip4,
89 src_address=self._test.pg0.local_ip4,
91 template_interval=self._timeout,
94 def _enable_disable_flowprobe_feature(self, is_add):
96 "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
97 "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
98 "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
101 "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
102 "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
103 "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
105 self._test.vapi.flowprobe_interface_add_del(
107 which=which_map[self._datapath],
108 direction=direction_map[self._direction],
109 sw_if_index=self._intf_obj.sw_if_index,
112 def enable_flowprobe_feature(self):
113 self._enable_disable_flowprobe_feature(is_add=True)
115 def disable_exporter(self):
116 self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
118 def disable_flowprobe_feature(self):
119 self._enable_disable_flowprobe_feature(is_add=False)
122 return "ipfix-collector-%s-%s" % (self._src, self.dst)
124 def query_vpp_config(self):
125 return self._configured
127 def verify_templates(self, decoder=None, timeout=1, count=3):
129 self._test.assertIn(count, (1, 2, 3))
130 for _ in range(count):
131 p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
132 self._test.assertTrue(p.haslayer(IPFIX))
133 if decoder is not None and p.haslayer(Template):
134 templates.append(p[Template].templateID)
135 decoder.add_template(p.getlayer(Template))
139 class MethodHolder(VppTestCase):
140 """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
144 max_number_of_packets = 10
150 Perform standard class setup (defined by class method setUpClass in
151 class VppTestCase) before running the test case, set test case related
152 variables and configure VPP.
154 super(MethodHolder, cls).setUpClass()
155 if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
160 # Create pg interfaces
161 cls.create_pg_interfaces(range(9))
164 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
166 # Create BD with MAC learning and unknown unicast flooding disabled
167 # and put interfaces to this BD
168 cls.vapi.bridge_domain_add_del_v2(
169 bd_id=1, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
171 cls.vapi.sw_interface_set_l2_bridge(
172 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
174 cls.vapi.sw_interface_set_l2_bridge(
175 rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
178 # Set up all interfaces
179 for i in cls.pg_interfaces:
183 cls.pg0.configure_ipv4_neighbors()
184 cls.collector = cls.pg0
187 cls.pg1.resolve_arp()
189 cls.pg2.resolve_arp()
191 cls.pg3.resolve_arp()
193 cls.pg4.resolve_arp()
196 cls.pg8.configure_ipv4_neighbors()
199 cls.pg5.resolve_ndp()
200 cls.pg5.disable_ipv6_ra()
202 cls.pg6.resolve_ndp()
203 cls.pg6.disable_ipv6_ra()
205 super(MethodHolder, cls).tearDownClass()
209 def tearDownClass(cls):
210 super(MethodHolder, cls).tearDownClass()
213 self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
215 """Create a packet stream to tickle the plugin
217 :param VppInterface src_if: Source interface for packet stream
218 :param VppInterface src_if: Dst interface for packet stream
226 packets = random.randint(1, self.max_number_of_packets)
228 for p in range(0, packets):
230 pkt_size = random.choice(self.pg_if_packet_sizes)
231 info = self.create_packet_info(src_if, dst_if)
232 payload = self.info_to_payload(info)
233 p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
235 p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
237 p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
238 p /= UDP(sport=1234, dport=4321)
241 self.extend_packet(p, pkt_size)
244 def verify_cflow_data(self, decoder, capture, cflow):
250 if cflow.haslayer(Data):
251 data = decoder.decode_data_set(cflow.getlayer(Set))
253 self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
254 self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
256 def send_packets(self, src_if=None, dst_if=None):
261 self.pg_enable_capture([dst_if])
262 src_if.add_stream(self.pkts)
264 return dst_if.get_capture(len(self.pkts))
266 def verify_cflow_data_detail(
267 self, decoder, capture, cflow, data_set={1: "octets", 2: "packets"}, ip_ver="v4"
270 print(capture[0].show())
271 if cflow.haslayer(Data):
272 data = decoder.decode_data_set(cflow.getlayer(Set))
276 ip_layer = capture[0][IP]
278 ip_layer = capture[0][IPv6]
279 if data_set is not None:
281 # skip flow if ingress/egress interface is 0
282 if int(binascii.hexlify(record[10]), 16) == 0:
284 if int(binascii.hexlify(record[14]), 16) == 0:
287 for field in data_set:
288 value = data_set[field]
289 if value == "octets":
292 value += 40 # ??? is this correct
293 elif value == "packets":
295 elif value == "src_ip":
297 ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
299 ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
300 value = int(binascii.hexlify(ip), 16)
301 elif value == "dst_ip":
303 ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
305 ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
306 value = int(binascii.hexlify(ip), 16)
307 elif value == "sport":
308 value = int(capture[0][UDP].sport)
309 elif value == "dport":
310 value = int(capture[0][UDP].dport)
312 int(binascii.hexlify(record[field]), 16), value
315 def verify_cflow_data_notimer(self, decoder, capture, cflows):
318 if cflow.haslayer(Data):
319 data = decoder.decode_data_set(cflow.getlayer(Set))
321 raise Exception("No CFLOW data")
326 self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
327 self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
328 self.assertEqual(len(capture), idx)
330 def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
331 """wait for CFLOW packet and verify its correctness
333 :param timeout: how long to wait
336 self.logger.info("IPFIX: Waiting for CFLOW packet")
337 # self.logger.debug(self.vapi.ppcli("show flow table"))
338 p = collector_intf.wait_for_packet(timeout=timeout)
339 self.assertEqual(p[Set].setID, set_id)
340 # self.logger.debug(self.vapi.ppcli("show flow table"))
341 self.logger.debug(ppp("IPFIX: Got packet:", p))
346 @tag_fixme_vpp_workers
347 @tag_fixme_ubuntu2204
349 class Flowprobe(MethodHolder):
350 """Template verification, timer tests"""
354 super(Flowprobe, cls).setUpClass()
357 def tearDownClass(cls):
358 super(Flowprobe, cls).tearDownClass()
361 """timer less than template timeout"""
362 self.logger.info("FFP_TEST_START_0001")
363 self.pg_enable_capture(self.pg_interfaces)
366 ipfix = VppCFLOW(test=self, active=2)
367 ipfix.add_vpp_config()
369 ipfix_decoder = IPFIXDecoder()
370 # template packet should arrive immediately
371 templates = ipfix.verify_templates(ipfix_decoder)
373 self.create_stream(packets=1)
375 capture = self.pg2.get_capture(1)
377 # make sure the one packet we expect actually showed up
378 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
379 self.verify_cflow_data(ipfix_decoder, capture, cflow)
381 ipfix.remove_vpp_config()
382 self.logger.info("FFP_TEST_FINISH_0001")
385 """timer greater than template timeout"""
386 self.logger.info("FFP_TEST_START_0002")
387 self.pg_enable_capture(self.pg_interfaces)
390 ipfix = VppCFLOW(test=self, timeout=3, active=4)
391 ipfix.add_vpp_config()
393 ipfix_decoder = IPFIXDecoder()
394 # template packet should arrive immediately
395 ipfix.verify_templates()
397 self.create_stream(packets=2)
399 capture = self.pg2.get_capture(2)
401 # next set of template packet should arrive after 20 seconds
402 # template packet should arrive within 20 s
403 templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
405 # make sure the one packet we expect actually showed up
406 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
407 self.verify_cflow_data(ipfix_decoder, capture, cflow)
409 ipfix.remove_vpp_config()
410 self.logger.info("FFP_TEST_FINISH_0002")
412 def test_cflow_packet(self):
413 """verify cflow packet fields"""
414 self.logger.info("FFP_TEST_START_0000")
415 self.pg_enable_capture(self.pg_interfaces)
419 test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
421 ipfix.add_vpp_config()
423 route_9001 = VppIpRoute(
427 [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
429 route_9001.add_vpp_config()
431 ipfix_decoder = IPFIXDecoder()
432 templates = ipfix.verify_templates(ipfix_decoder, count=1)
436 Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
437 / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
438 / TCP(sport=1234, dport=4321, flags=80)
443 nowUTC = int(time.time())
444 nowUNIX = nowUTC + 2208988800
445 self.send_packets(src_if=self.pg7, dst_if=self.pg8)
447 cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
448 self.collector.get_capture(2)
450 if cflow[0].haslayer(IPFIX):
451 self.assertEqual(cflow[IPFIX].version, 10)
452 self.assertEqual(cflow[IPFIX].observationDomainID, 1)
453 self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
454 self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
455 if cflow.haslayer(Data):
456 record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
458 self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
460 self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
462 self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
464 self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
466 self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
468 self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
469 flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
470 # flow start timestamp
471 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
472 flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
474 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
476 self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
478 self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
480 self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
482 self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
484 self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
486 self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
488 self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
490 ipfix.remove_vpp_config()
491 self.logger.info("FFP_TEST_FINISH_0000")
493 def test_interface_dump(self):
494 """Dump interfaces with IPFIX flow record generation enabled"""
495 self.logger.info("FFP_TEST_START_0003")
497 # Enable feature for 3 interfaces
498 ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx")
499 ipfix1.add_vpp_config()
501 ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx")
502 ipfix2.enable_flowprobe_feature()
504 ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both")
505 ipfix3.enable_flowprobe_feature()
507 # When request "all", dump should contain all enabled interfaces
508 dump = self.vapi.flowprobe_interface_dump()
509 self.assertEqual(len(dump), 3)
511 # Verify 1st interface
512 self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index)
514 dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2
518 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
521 # Verify 2nd interface
522 self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index)
524 dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
528 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
531 # Verify 3rd interface
532 self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index)
534 dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6
538 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
541 # When request 2nd interface, dump should contain only the specified interface
542 dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index)
543 self.assertEqual(len(dump), 1)
545 # Verify 2nd interface
546 self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index)
548 dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
552 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
555 # When request 99th interface, dump should be empty
556 dump = self.vapi.flowprobe_interface_dump(sw_if_index=99)
557 self.assertEqual(len(dump), 0)
559 ipfix1.remove_vpp_config()
560 ipfix2.remove_vpp_config()
561 ipfix3.remove_vpp_config()
562 self.logger.info("FFP_TEST_FINISH_0003")
564 def test_get_params(self):
565 """Get IPFIX flow record generation parameters"""
566 self.logger.info("FFP_TEST_START_0004")
568 # Enable feature for an interface with custom parameters
569 ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4")
570 ipfix.add_vpp_config()
572 # Get and verify parameters
573 params = self.vapi.flowprobe_get_params()
574 self.assertEqual(params.active_timer, 20)
575 self.assertEqual(params.passive_timer, 40)
576 record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
577 record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
578 record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
579 self.assertEqual(params.record_flags, record_flags)
581 ipfix.remove_vpp_config()
582 self.logger.info("FFP_TEST_FINISH_0004")
585 class DatapathTestsHolder(object):
586 """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
590 super(DatapathTestsHolder, cls).setUpClass()
593 def tearDownClass(cls):
594 super(DatapathTestsHolder, cls).tearDownClass()
596 def test_templatesL2(self):
597 """verify template on L2 datapath"""
598 self.logger.info("FFP_TEST_START_0000")
599 self.pg_enable_capture(self.pg_interfaces)
602 test=self, intf=self.intf1, layer="l2", direction=self.direction
604 ipfix.add_vpp_config()
606 # template packet should arrive immediately
607 self.vapi.ipfix_flush()
608 ipfix.verify_templates(timeout=3, count=1)
609 self.collector.get_capture(1)
611 ipfix.remove_vpp_config()
612 self.logger.info("FFP_TEST_FINISH_0000")
614 def test_L2onL2(self):
615 """L2 data on L2 datapath"""
616 self.logger.info("FFP_TEST_START_0001")
617 self.pg_enable_capture(self.pg_interfaces)
621 test=self, intf=self.intf1, layer="l2", direction=self.direction
623 ipfix.add_vpp_config()
625 ipfix_decoder = IPFIXDecoder()
626 # template packet should arrive immediately
627 templates = ipfix.verify_templates(ipfix_decoder, count=1)
629 self.create_stream(packets=1)
630 capture = self.send_packets()
632 # make sure the one packet we expect actually showed up
633 self.vapi.ipfix_flush()
634 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
635 self.verify_cflow_data_detail(
639 {2: "packets", 256: 8, 61: (self.direction == "tx")},
641 self.collector.get_capture(2)
643 ipfix.remove_vpp_config()
644 self.logger.info("FFP_TEST_FINISH_0001")
646 def test_L3onL2(self):
647 """L3 data on L2 datapath"""
648 self.logger.info("FFP_TEST_START_0002")
649 self.pg_enable_capture(self.pg_interfaces)
653 test=self, intf=self.intf1, layer="l3", direction=self.direction
655 ipfix.add_vpp_config()
657 ipfix_decoder = IPFIXDecoder()
658 # template packet should arrive immediately
659 templates = ipfix.verify_templates(ipfix_decoder, count=2)
661 self.create_stream(packets=1)
662 capture = self.send_packets()
664 # make sure the one packet we expect actually showed up
665 self.vapi.ipfix_flush()
666 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
667 self.verify_cflow_data_detail(
676 61: (self.direction == "tx"),
680 self.collector.get_capture(3)
682 ipfix.remove_vpp_config()
683 self.logger.info("FFP_TEST_FINISH_0002")
685 def test_L4onL2(self):
686 """L4 data on L2 datapath"""
687 self.logger.info("FFP_TEST_START_0003")
688 self.pg_enable_capture(self.pg_interfaces)
692 test=self, intf=self.intf1, layer="l4", direction=self.direction
694 ipfix.add_vpp_config()
696 ipfix_decoder = IPFIXDecoder()
697 # template packet should arrive immediately
698 templates = ipfix.verify_templates(ipfix_decoder, count=2)
700 self.create_stream(packets=1)
701 capture = self.send_packets()
703 # make sure the one packet we expect actually showed up
704 self.vapi.ipfix_flush()
705 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
706 self.verify_cflow_data_detail(
710 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
713 self.collector.get_capture(3)
715 ipfix.remove_vpp_config()
716 self.logger.info("FFP_TEST_FINISH_0003")
718 def test_templatesIp4(self):
719 """verify templates on IP4 datapath"""
720 self.logger.info("FFP_TEST_START_0000")
722 self.pg_enable_capture(self.pg_interfaces)
725 test=self, intf=self.intf1, datapath="ip4", direction=self.direction
727 ipfix.add_vpp_config()
729 # template packet should arrive immediately
730 self.vapi.ipfix_flush()
731 ipfix.verify_templates(timeout=3, count=1)
732 self.collector.get_capture(1)
734 ipfix.remove_vpp_config()
736 self.logger.info("FFP_TEST_FINISH_0000")
738 def test_L2onIP4(self):
739 """L2 data on IP4 datapath"""
740 self.logger.info("FFP_TEST_START_0001")
741 self.pg_enable_capture(self.pg_interfaces)
749 direction=self.direction,
751 ipfix.add_vpp_config()
753 ipfix_decoder = IPFIXDecoder()
754 # template packet should arrive immediately
755 templates = ipfix.verify_templates(ipfix_decoder, count=1)
757 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
758 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
760 # make sure the one packet we expect actually showed up
761 self.vapi.ipfix_flush()
762 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
763 self.verify_cflow_data_detail(
767 {2: "packets", 256: 8, 61: (self.direction == "tx")},
770 # expected two templates and one cflow packet
771 self.collector.get_capture(2)
773 ipfix.remove_vpp_config()
774 self.logger.info("FFP_TEST_FINISH_0001")
776 def test_L3onIP4(self):
777 """L3 data on IP4 datapath"""
778 self.logger.info("FFP_TEST_START_0002")
779 self.pg_enable_capture(self.pg_interfaces)
787 direction=self.direction,
789 ipfix.add_vpp_config()
791 ipfix_decoder = IPFIXDecoder()
792 # template packet should arrive immediately
793 templates = ipfix.verify_templates(ipfix_decoder, count=1)
795 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
796 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
798 # make sure the one packet we expect actually showed up
799 self.vapi.ipfix_flush()
800 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
801 self.verify_cflow_data_detail(
810 61: (self.direction == "tx"),
814 # expected two templates and one cflow packet
815 self.collector.get_capture(2)
817 ipfix.remove_vpp_config()
818 self.logger.info("FFP_TEST_FINISH_0002")
820 def test_L4onIP4(self):
821 """L4 data on IP4 datapath"""
822 self.logger.info("FFP_TEST_START_0003")
823 self.pg_enable_capture(self.pg_interfaces)
831 direction=self.direction,
833 ipfix.add_vpp_config()
835 ipfix_decoder = IPFIXDecoder()
836 # template packet should arrive immediately
837 templates = ipfix.verify_templates(ipfix_decoder, count=1)
839 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
840 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
842 # make sure the one packet we expect actually showed up
843 self.vapi.ipfix_flush()
844 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
845 self.verify_cflow_data_detail(
849 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
852 # expected two templates and one cflow packet
853 self.collector.get_capture(2)
855 ipfix.remove_vpp_config()
856 self.logger.info("FFP_TEST_FINISH_0003")
858 def test_templatesIP6(self):
859 """verify templates on IP6 datapath"""
860 self.logger.info("FFP_TEST_START_0000")
861 self.pg_enable_capture(self.pg_interfaces)
864 test=self, intf=self.intf1, datapath="ip6", direction=self.direction
866 ipfix.add_vpp_config()
868 # template packet should arrive immediately
869 ipfix.verify_templates(count=1)
870 self.collector.get_capture(1)
872 ipfix.remove_vpp_config()
874 self.logger.info("FFP_TEST_FINISH_0000")
876 def test_L2onIP6(self):
877 """L2 data on IP6 datapath"""
878 self.logger.info("FFP_TEST_START_0001")
879 self.pg_enable_capture(self.pg_interfaces)
887 direction=self.direction,
889 ipfix.add_vpp_config()
891 ipfix_decoder = IPFIXDecoder()
892 # template packet should arrive immediately
893 templates = ipfix.verify_templates(ipfix_decoder, count=1)
895 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
896 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
898 # make sure the one packet we expect actually showed up
899 self.vapi.ipfix_flush()
900 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
901 self.verify_cflow_data_detail(
905 {2: "packets", 256: 56710, 61: (self.direction == "tx")},
909 # expected two templates and one cflow packet
910 self.collector.get_capture(2)
912 ipfix.remove_vpp_config()
913 self.logger.info("FFP_TEST_FINISH_0001")
915 def test_L3onIP6(self):
916 """L3 data on IP6 datapath"""
917 self.logger.info("FFP_TEST_START_0002")
918 self.pg_enable_capture(self.pg_interfaces)
926 direction=self.direction,
928 ipfix.add_vpp_config()
930 ipfix_decoder = IPFIXDecoder()
931 # template packet should arrive immediately
932 templates = ipfix.verify_templates(ipfix_decoder, count=1)
934 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
935 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
937 # make sure the one packet we expect actually showed up
938 self.vapi.ipfix_flush()
939 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
940 self.verify_cflow_data_detail(
944 {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
948 # expected two templates and one cflow packet
949 self.collector.get_capture(2)
951 ipfix.remove_vpp_config()
952 self.logger.info("FFP_TEST_FINISH_0002")
954 def test_L4onIP6(self):
955 """L4 data on IP6 datapath"""
956 self.logger.info("FFP_TEST_START_0003")
957 self.pg_enable_capture(self.pg_interfaces)
965 direction=self.direction,
967 ipfix.add_vpp_config()
969 ipfix_decoder = IPFIXDecoder()
970 # template packet should arrive immediately
971 templates = ipfix.verify_templates(ipfix_decoder, count=1)
973 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
974 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
976 # make sure the one packet we expect actually showed up
977 self.vapi.ipfix_flush()
978 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
979 self.verify_cflow_data_detail(
983 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
987 # expected two templates and one cflow packet
988 self.collector.get_capture(2)
990 ipfix.remove_vpp_config()
991 self.logger.info("FFP_TEST_FINISH_0003")
994 """no timers, one CFLOW packet, 9 Flows inside"""
995 self.logger.info("FFP_TEST_START_0001")
996 self.pg_enable_capture(self.pg_interfaces)
999 ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
1000 ipfix.add_vpp_config()
1002 ipfix_decoder = IPFIXDecoder()
1003 # template packet should arrive immediately
1004 templates = ipfix.verify_templates(ipfix_decoder)
1006 self.create_stream(packets=9)
1007 capture = self.send_packets()
1009 # make sure the one packet we expect actually showed up
1010 self.vapi.ipfix_flush()
1011 cflow = self.wait_for_cflow_packet(self.collector, templates[1])
1012 self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
1013 self.collector.get_capture(4)
1015 ipfix.remove_vpp_config()
1016 self.logger.info("FFP_TEST_FINISH_0001")
1018 def test_0002(self):
1019 """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
1020 self.logger.info("FFP_TEST_START_0002")
1021 self.pg_enable_capture(self.pg_interfaces)
1024 ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
1025 ipfix.add_vpp_config()
1027 ipfix_decoder = IPFIXDecoder()
1028 # template packet should arrive immediately
1029 self.vapi.ipfix_flush()
1030 templates = ipfix.verify_templates(ipfix_decoder)
1032 self.create_stream(packets=6)
1033 capture = self.send_packets()
1035 # make sure the one packet we expect actually showed up
1037 self.vapi.ipfix_flush()
1038 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1039 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1040 self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
1041 self.collector.get_capture(5)
1043 ipfix.remove_vpp_config()
1044 self.logger.info("FFP_TEST_FINISH_0002")
1047 @tag_fixme_vpp_workers
1048 class DatapathTx(MethodHolder, DatapathTestsHolder):
1049 """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
1057 @tag_fixme_vpp_workers
1058 class DatapathRx(MethodHolder, DatapathTestsHolder):
1059 """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
1067 @unittest.skipUnless(config.extended, "part of extended tests")
1068 class DisableIPFIX(MethodHolder):
1072 def setUpClass(cls):
1073 super(DisableIPFIX, cls).setUpClass()
1076 def tearDownClass(cls):
1077 super(DisableIPFIX, cls).tearDownClass()
1079 def test_0001(self):
1080 """disable IPFIX after first packets"""
1081 self.logger.info("FFP_TEST_START_0001")
1082 self.pg_enable_capture(self.pg_interfaces)
1085 ipfix = VppCFLOW(test=self)
1086 ipfix.add_vpp_config()
1088 ipfix_decoder = IPFIXDecoder()
1089 # template packet should arrive immediately
1090 templates = ipfix.verify_templates(ipfix_decoder)
1092 self.create_stream()
1095 # make sure the one packet we expect actually showed up
1096 self.vapi.ipfix_flush()
1097 self.wait_for_cflow_packet(self.collector, templates[1])
1098 self.collector.get_capture(4)
1101 ipfix.disable_exporter()
1102 self.pg_enable_capture([self.collector])
1106 # make sure no one packet arrived in 1 minute
1107 self.vapi.ipfix_flush()
1108 self.sleep(1, "wait before verifying no packets sent")
1109 self.collector.assert_nothing_captured()
1111 ipfix.remove_vpp_config()
1112 self.logger.info("FFP_TEST_FINISH_0001")
1115 @unittest.skipUnless(config.extended, "part of extended tests")
1116 class ReenableIPFIX(MethodHolder):
1117 """Re-enable IPFIX"""
1120 def setUpClass(cls):
1121 super(ReenableIPFIX, cls).setUpClass()
1124 def tearDownClass(cls):
1125 super(ReenableIPFIX, cls).tearDownClass()
1127 def test_0011(self):
1128 """disable IPFIX after first packets and re-enable after few packets"""
1129 self.logger.info("FFP_TEST_START_0001")
1130 self.pg_enable_capture(self.pg_interfaces)
1133 ipfix = VppCFLOW(test=self)
1134 ipfix.add_vpp_config()
1136 ipfix_decoder = IPFIXDecoder()
1137 # template packet should arrive immediately
1138 templates = ipfix.verify_templates(ipfix_decoder)
1140 self.create_stream(packets=5)
1143 # make sure the one packet we expect actually showed up
1144 self.vapi.ipfix_flush()
1145 self.wait_for_cflow_packet(self.collector, templates[1])
1146 self.collector.get_capture(4)
1149 ipfix.disable_exporter()
1150 self.vapi.ipfix_flush()
1151 self.pg_enable_capture([self.collector])
1155 # make sure no one packet arrived in active timer span
1156 self.vapi.ipfix_flush()
1157 self.sleep(1, "wait before verifying no packets sent")
1158 self.collector.assert_nothing_captured()
1159 self.pg2.get_capture(5)
1162 ipfix.enable_exporter()
1164 capture = self.collector.get_capture(4)
1168 self.assertTrue(p.haslayer(IPFIX))
1169 if p.haslayer(Template):
1171 self.assertTrue(nr_templates, 3)
1173 self.assertTrue(p.haslayer(IPFIX))
1174 if p.haslayer(Data):
1176 self.assertTrue(nr_templates, 1)
1178 ipfix.remove_vpp_config()
1179 self.logger.info("FFP_TEST_FINISH_0001")
1182 @unittest.skipUnless(config.extended, "part of extended tests")
1183 class DisableFP(MethodHolder):
1184 """Disable Flowprobe feature"""
1187 def setUpClass(cls):
1188 super(DisableFP, cls).setUpClass()
1191 def tearDownClass(cls):
1192 super(DisableFP, cls).tearDownClass()
1194 def test_0001(self):
1195 """disable flowprobe feature after first packets"""
1196 self.logger.info("FFP_TEST_START_0001")
1197 self.pg_enable_capture(self.pg_interfaces)
1199 ipfix = VppCFLOW(test=self)
1200 ipfix.add_vpp_config()
1202 ipfix_decoder = IPFIXDecoder()
1203 # template packet should arrive immediately
1204 templates = ipfix.verify_templates(ipfix_decoder)
1206 self.create_stream()
1209 # make sure the one packet we expect actually showed up
1210 self.vapi.ipfix_flush()
1211 self.wait_for_cflow_packet(self.collector, templates[1])
1212 self.collector.get_capture(4)
1215 ipfix.disable_flowprobe_feature()
1216 self.pg_enable_capture([self.collector])
1220 # make sure no one packet arrived in active timer span
1221 self.vapi.ipfix_flush()
1222 self.sleep(1, "wait before verifying no packets sent")
1223 self.collector.assert_nothing_captured()
1225 ipfix.remove_vpp_config()
1226 self.logger.info("FFP_TEST_FINISH_0001")
1229 @unittest.skipUnless(config.extended, "part of extended tests")
1230 class ReenableFP(MethodHolder):
1231 """Re-enable Flowprobe feature"""
1234 def setUpClass(cls):
1235 super(ReenableFP, cls).setUpClass()
1238 def tearDownClass(cls):
1239 super(ReenableFP, cls).tearDownClass()
1241 def test_0001(self):
1242 """disable flowprobe feature after first packets and re-enable
1243 after few packets"""
1244 self.logger.info("FFP_TEST_START_0001")
1245 self.pg_enable_capture(self.pg_interfaces)
1248 ipfix = VppCFLOW(test=self)
1249 ipfix.add_vpp_config()
1251 ipfix_decoder = IPFIXDecoder()
1252 # template packet should arrive immediately
1253 self.vapi.ipfix_flush()
1254 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1256 self.create_stream()
1259 # make sure the one packet we expect actually showed up
1260 self.vapi.ipfix_flush()
1261 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1262 self.collector.get_capture(4)
1264 # disable FPP feature
1265 ipfix.disable_flowprobe_feature()
1266 self.pg_enable_capture([self.collector])
1270 # make sure no one packet arrived in active timer span
1271 self.vapi.ipfix_flush()
1272 self.sleep(5, "wait before verifying no packets sent")
1273 self.collector.assert_nothing_captured()
1275 # enable FPP feature
1276 ipfix.enable_flowprobe_feature()
1277 self.vapi.ipfix_flush()
1278 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1282 # make sure the next packets (templates and data) we expect actually
1284 self.vapi.ipfix_flush()
1285 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1286 self.collector.get_capture(4)
1288 ipfix.remove_vpp_config()
1289 self.logger.info("FFP_TEST_FINISH_0001")
1292 if __name__ == "__main__":
1293 unittest.main(testRunner=VppTestRunner)