2 from __future__ import print_function
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, TCP, UDP
13 from scapy.layers.inet6 import IPv6
15 from config import config
16 from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11
17 from framework import is_distro_ubuntu2204, is_distro_debian11
18 from framework import VppTestCase, VppTestRunner
19 from framework import tag_run_solo
20 from vpp_object import VppObject
21 from vpp_pg_interface import CaptureTimeoutError
23 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
24 from vpp_ip_route import VppIpRoute, VppRoutePath
25 from vpp_papi.macaddress import mac_ntop
26 from socket import inet_ntop
27 from vpp_papi import VppEnum
30 class VppCFLOW(VppObject):
31 """CFLOW object for IPFIX exporter and Flowprobe feature"""
47 self._intf_obj = getattr(self._test, intf)
49 if passive == 0 or passive < active:
50 self._passive = active + 1
52 self._passive = passive
53 self._datapath = datapath # l2 ip4 ip6
54 self._collect = layer # l2 l3 l4
55 self._direction = direction # rx tx both
56 self._timeout = timeout
58 self._configured = False
60 def add_vpp_config(self):
61 self.enable_exporter()
65 if "l2" in self._collect.lower():
66 l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
67 if "l3" in self._collect.lower():
68 l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
69 if "l4" in self._collect.lower():
70 l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
71 self._test.vapi.flowprobe_set_params(
72 record_flags=(l2_flag | l3_flag | l4_flag),
73 active_timer=self._active,
74 passive_timer=self._passive,
76 self.enable_flowprobe_feature()
77 self._test.vapi.cli("ipfix flush")
78 self._configured = True
80 def remove_vpp_config(self):
81 self.disable_exporter()
82 self.disable_flowprobe_feature()
83 self._test.vapi.cli("ipfix flush")
84 self._configured = False
86 def enable_exporter(self):
87 self._test.vapi.set_ipfix_exporter(
88 collector_address=self._test.pg0.remote_ip4,
89 src_address=self._test.pg0.local_ip4,
91 template_interval=self._timeout,
94 def _enable_disable_flowprobe_feature(self, is_add):
96 "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
97 "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
98 "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
101 "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
102 "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
103 "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
105 self._test.vapi.flowprobe_interface_add_del(
107 which=which_map[self._datapath],
108 direction=direction_map[self._direction],
109 sw_if_index=self._intf_obj.sw_if_index,
112 def enable_flowprobe_feature(self):
113 self._enable_disable_flowprobe_feature(is_add=True)
115 def disable_exporter(self):
116 self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
118 def disable_flowprobe_feature(self):
119 self._enable_disable_flowprobe_feature(is_add=False)
122 return "ipfix-collector-%s-%s" % (self._src, self.dst)
124 def query_vpp_config(self):
125 return self._configured
127 def verify_templates(self, decoder=None, timeout=1, count=3):
129 self._test.assertIn(count, (1, 2, 3))
130 for _ in range(count):
131 p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
132 self._test.assertTrue(p.haslayer(IPFIX))
133 if decoder is not None and p.haslayer(Template):
134 templates.append(p[Template].templateID)
135 decoder.add_template(p.getlayer(Template))
139 class MethodHolder(VppTestCase):
140 """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
144 max_number_of_packets = 10
150 Perform standard class setup (defined by class method setUpClass in
151 class VppTestCase) before running the test case, set test case related
152 variables and configure VPP.
154 super(MethodHolder, cls).setUpClass()
155 if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
160 # Create pg interfaces
161 cls.create_pg_interfaces(range(9))
164 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
166 # Create BD with MAC learning and unknown unicast flooding disabled
167 # and put interfaces to this BD
168 cls.vapi.bridge_domain_add_del(bd_id=1, uu_flood=1, learn=1)
169 cls.vapi.sw_interface_set_l2_bridge(
170 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
172 cls.vapi.sw_interface_set_l2_bridge(
173 rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
176 # Set up all interfaces
177 for i in cls.pg_interfaces:
181 cls.pg0.configure_ipv4_neighbors()
182 cls.collector = cls.pg0
185 cls.pg1.resolve_arp()
187 cls.pg2.resolve_arp()
189 cls.pg3.resolve_arp()
191 cls.pg4.resolve_arp()
194 cls.pg8.configure_ipv4_neighbors()
197 cls.pg5.resolve_ndp()
198 cls.pg5.disable_ipv6_ra()
200 cls.pg6.resolve_ndp()
201 cls.pg6.disable_ipv6_ra()
203 super(MethodHolder, cls).tearDownClass()
207 def tearDownClass(cls):
208 super(MethodHolder, cls).tearDownClass()
211 self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
213 """Create a packet stream to tickle the plugin
215 :param VppInterface src_if: Source interface for packet stream
216 :param VppInterface src_if: Dst interface for packet stream
224 packets = random.randint(1, self.max_number_of_packets)
226 for p in range(0, packets):
228 pkt_size = random.choice(self.pg_if_packet_sizes)
229 info = self.create_packet_info(src_if, dst_if)
230 payload = self.info_to_payload(info)
231 p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
233 p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
235 p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
236 p /= UDP(sport=1234, dport=4321)
239 self.extend_packet(p, pkt_size)
242 def verify_cflow_data(self, decoder, capture, cflow):
248 if cflow.haslayer(Data):
249 data = decoder.decode_data_set(cflow.getlayer(Set))
251 self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
252 self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
254 def send_packets(self, src_if=None, dst_if=None):
259 self.pg_enable_capture([dst_if])
260 src_if.add_stream(self.pkts)
262 return dst_if.get_capture(len(self.pkts))
264 def verify_cflow_data_detail(
265 self, decoder, capture, cflow, data_set={1: "octets", 2: "packets"}, ip_ver="v4"
268 print(capture[0].show())
269 if cflow.haslayer(Data):
270 data = decoder.decode_data_set(cflow.getlayer(Set))
274 ip_layer = capture[0][IP]
276 ip_layer = capture[0][IPv6]
277 if data_set is not None:
279 # skip flow if ingress/egress interface is 0
280 if int(binascii.hexlify(record[10]), 16) == 0:
282 if int(binascii.hexlify(record[14]), 16) == 0:
285 for field in data_set:
286 value = data_set[field]
287 if value == "octets":
290 value += 40 # ??? is this correct
291 elif value == "packets":
293 elif value == "src_ip":
295 ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
297 ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
298 value = int(binascii.hexlify(ip), 16)
299 elif value == "dst_ip":
301 ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
303 ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
304 value = int(binascii.hexlify(ip), 16)
305 elif value == "sport":
306 value = int(capture[0][UDP].sport)
307 elif value == "dport":
308 value = int(capture[0][UDP].dport)
310 int(binascii.hexlify(record[field]), 16), value
313 def verify_cflow_data_notimer(self, decoder, capture, cflows):
316 if cflow.haslayer(Data):
317 data = decoder.decode_data_set(cflow.getlayer(Set))
319 raise Exception("No CFLOW data")
324 self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
325 self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
326 self.assertEqual(len(capture), idx)
328 def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
329 """wait for CFLOW packet and verify its correctness
331 :param timeout: how long to wait
334 self.logger.info("IPFIX: Waiting for CFLOW packet")
335 # self.logger.debug(self.vapi.ppcli("show flow table"))
336 p = collector_intf.wait_for_packet(timeout=timeout)
337 self.assertEqual(p[Set].setID, set_id)
338 # self.logger.debug(self.vapi.ppcli("show flow table"))
339 self.logger.debug(ppp("IPFIX: Got packet:", p))
344 @tag_fixme_vpp_workers
345 @tag_fixme_ubuntu2204
347 class Flowprobe(MethodHolder):
348 """Template verification, timer tests"""
352 super(Flowprobe, cls).setUpClass()
355 def tearDownClass(cls):
356 super(Flowprobe, cls).tearDownClass()
359 """timer less than template timeout"""
360 self.logger.info("FFP_TEST_START_0001")
361 self.pg_enable_capture(self.pg_interfaces)
364 ipfix = VppCFLOW(test=self, active=2)
365 ipfix.add_vpp_config()
367 ipfix_decoder = IPFIXDecoder()
368 # template packet should arrive immediately
369 templates = ipfix.verify_templates(ipfix_decoder)
371 self.create_stream(packets=1)
373 capture = self.pg2.get_capture(1)
375 # make sure the one packet we expect actually showed up
376 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
377 self.verify_cflow_data(ipfix_decoder, capture, cflow)
379 ipfix.remove_vpp_config()
380 self.logger.info("FFP_TEST_FINISH_0001")
383 """timer greater than template timeout"""
384 self.logger.info("FFP_TEST_START_0002")
385 self.pg_enable_capture(self.pg_interfaces)
388 ipfix = VppCFLOW(test=self, timeout=3, active=4)
389 ipfix.add_vpp_config()
391 ipfix_decoder = IPFIXDecoder()
392 # template packet should arrive immediately
393 ipfix.verify_templates()
395 self.create_stream(packets=2)
397 capture = self.pg2.get_capture(2)
399 # next set of template packet should arrive after 20 seconds
400 # template packet should arrive within 20 s
401 templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
403 # make sure the one packet we expect actually showed up
404 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
405 self.verify_cflow_data(ipfix_decoder, capture, cflow)
407 ipfix.remove_vpp_config()
408 self.logger.info("FFP_TEST_FINISH_0002")
410 def test_cflow_packet(self):
411 """verify cflow packet fields"""
412 self.logger.info("FFP_TEST_START_0000")
413 self.pg_enable_capture(self.pg_interfaces)
417 test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
419 ipfix.add_vpp_config()
421 route_9001 = VppIpRoute(
425 [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
427 route_9001.add_vpp_config()
429 ipfix_decoder = IPFIXDecoder()
430 templates = ipfix.verify_templates(ipfix_decoder, count=1)
434 Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
435 / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
436 / TCP(sport=1234, dport=4321, flags=80)
441 nowUTC = int(time.time())
442 nowUNIX = nowUTC + 2208988800
443 self.send_packets(src_if=self.pg7, dst_if=self.pg8)
445 cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
446 self.collector.get_capture(2)
448 if cflow[0].haslayer(IPFIX):
449 self.assertEqual(cflow[IPFIX].version, 10)
450 self.assertEqual(cflow[IPFIX].observationDomainID, 1)
451 self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
452 self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
453 if cflow.haslayer(Data):
454 record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
456 self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
458 self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
460 self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
462 self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
464 self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
466 self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
467 flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
468 # flow start timestamp
469 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
470 flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
472 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
474 self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
476 self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
478 self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
480 self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
482 self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
484 self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
486 self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
488 ipfix.remove_vpp_config()
489 self.logger.info("FFP_TEST_FINISH_0000")
491 def test_interface_dump(self):
492 """Dump interfaces with IPFIX flow record generation enabled"""
493 self.logger.info("FFP_TEST_START_0003")
495 # Enable feature for 3 interfaces
496 ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx")
497 ipfix1.add_vpp_config()
499 ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx")
500 ipfix2.enable_flowprobe_feature()
502 ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both")
503 ipfix3.enable_flowprobe_feature()
505 # When request "all", dump should contain all enabled interfaces
506 dump = self.vapi.flowprobe_interface_dump()
507 self.assertEqual(len(dump), 3)
509 # Verify 1st interface
510 self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index)
512 dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2
516 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
519 # Verify 2nd interface
520 self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index)
522 dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
526 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
529 # Verify 3rd interface
530 self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index)
532 dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6
536 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
539 # When request 2nd interface, dump should contain only the specified interface
540 dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index)
541 self.assertEqual(len(dump), 1)
543 # Verify 2nd interface
544 self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index)
546 dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
550 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
553 # When request 99th interface, dump should be empty
554 dump = self.vapi.flowprobe_interface_dump(sw_if_index=99)
555 self.assertEqual(len(dump), 0)
557 ipfix1.remove_vpp_config()
558 ipfix2.remove_vpp_config()
559 ipfix3.remove_vpp_config()
560 self.logger.info("FFP_TEST_FINISH_0003")
562 def test_get_params(self):
563 """Get IPFIX flow record generation parameters"""
564 self.logger.info("FFP_TEST_START_0004")
566 # Enable feature for an interface with custom parameters
567 ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4")
568 ipfix.add_vpp_config()
570 # Get and verify parameters
571 params = self.vapi.flowprobe_get_params()
572 self.assertEqual(params.active_timer, 20)
573 self.assertEqual(params.passive_timer, 40)
574 record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
575 record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
576 record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
577 self.assertEqual(params.record_flags, record_flags)
579 ipfix.remove_vpp_config()
580 self.logger.info("FFP_TEST_FINISH_0004")
583 class DatapathTestsHolder(object):
584 """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
588 super(DatapathTestsHolder, cls).setUpClass()
591 def tearDownClass(cls):
592 super(DatapathTestsHolder, cls).tearDownClass()
594 def test_templatesL2(self):
595 """verify template on L2 datapath"""
596 self.logger.info("FFP_TEST_START_0000")
597 self.pg_enable_capture(self.pg_interfaces)
600 test=self, intf=self.intf1, layer="l2", direction=self.direction
602 ipfix.add_vpp_config()
604 # template packet should arrive immediately
605 self.vapi.ipfix_flush()
606 ipfix.verify_templates(timeout=3, count=1)
607 self.collector.get_capture(1)
609 ipfix.remove_vpp_config()
610 self.logger.info("FFP_TEST_FINISH_0000")
612 def test_L2onL2(self):
613 """L2 data on L2 datapath"""
614 self.logger.info("FFP_TEST_START_0001")
615 self.pg_enable_capture(self.pg_interfaces)
619 test=self, intf=self.intf1, layer="l2", direction=self.direction
621 ipfix.add_vpp_config()
623 ipfix_decoder = IPFIXDecoder()
624 # template packet should arrive immediately
625 templates = ipfix.verify_templates(ipfix_decoder, count=1)
627 self.create_stream(packets=1)
628 capture = self.send_packets()
630 # make sure the one packet we expect actually showed up
631 self.vapi.ipfix_flush()
632 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
633 self.verify_cflow_data_detail(
637 {2: "packets", 256: 8, 61: (self.direction == "tx")},
639 self.collector.get_capture(2)
641 ipfix.remove_vpp_config()
642 self.logger.info("FFP_TEST_FINISH_0001")
644 def test_L3onL2(self):
645 """L3 data on L2 datapath"""
646 self.logger.info("FFP_TEST_START_0002")
647 self.pg_enable_capture(self.pg_interfaces)
651 test=self, intf=self.intf1, layer="l3", direction=self.direction
653 ipfix.add_vpp_config()
655 ipfix_decoder = IPFIXDecoder()
656 # template packet should arrive immediately
657 templates = ipfix.verify_templates(ipfix_decoder, count=2)
659 self.create_stream(packets=1)
660 capture = self.send_packets()
662 # make sure the one packet we expect actually showed up
663 self.vapi.ipfix_flush()
664 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
665 self.verify_cflow_data_detail(
674 61: (self.direction == "tx"),
678 self.collector.get_capture(3)
680 ipfix.remove_vpp_config()
681 self.logger.info("FFP_TEST_FINISH_0002")
683 def test_L4onL2(self):
684 """L4 data on L2 datapath"""
685 self.logger.info("FFP_TEST_START_0003")
686 self.pg_enable_capture(self.pg_interfaces)
690 test=self, intf=self.intf1, layer="l4", direction=self.direction
692 ipfix.add_vpp_config()
694 ipfix_decoder = IPFIXDecoder()
695 # template packet should arrive immediately
696 templates = ipfix.verify_templates(ipfix_decoder, count=2)
698 self.create_stream(packets=1)
699 capture = self.send_packets()
701 # make sure the one packet we expect actually showed up
702 self.vapi.ipfix_flush()
703 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
704 self.verify_cflow_data_detail(
708 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
711 self.collector.get_capture(3)
713 ipfix.remove_vpp_config()
714 self.logger.info("FFP_TEST_FINISH_0003")
716 def test_templatesIp4(self):
717 """verify templates on IP4 datapath"""
718 self.logger.info("FFP_TEST_START_0000")
720 self.pg_enable_capture(self.pg_interfaces)
723 test=self, intf=self.intf1, datapath="ip4", direction=self.direction
725 ipfix.add_vpp_config()
727 # template packet should arrive immediately
728 self.vapi.ipfix_flush()
729 ipfix.verify_templates(timeout=3, count=1)
730 self.collector.get_capture(1)
732 ipfix.remove_vpp_config()
734 self.logger.info("FFP_TEST_FINISH_0000")
736 def test_L2onIP4(self):
737 """L2 data on IP4 datapath"""
738 self.logger.info("FFP_TEST_START_0001")
739 self.pg_enable_capture(self.pg_interfaces)
747 direction=self.direction,
749 ipfix.add_vpp_config()
751 ipfix_decoder = IPFIXDecoder()
752 # template packet should arrive immediately
753 templates = ipfix.verify_templates(ipfix_decoder, count=1)
755 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
756 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
758 # make sure the one packet we expect actually showed up
759 self.vapi.ipfix_flush()
760 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
761 self.verify_cflow_data_detail(
765 {2: "packets", 256: 8, 61: (self.direction == "tx")},
768 # expected two templates and one cflow packet
769 self.collector.get_capture(2)
771 ipfix.remove_vpp_config()
772 self.logger.info("FFP_TEST_FINISH_0001")
774 def test_L3onIP4(self):
775 """L3 data on IP4 datapath"""
776 self.logger.info("FFP_TEST_START_0002")
777 self.pg_enable_capture(self.pg_interfaces)
785 direction=self.direction,
787 ipfix.add_vpp_config()
789 ipfix_decoder = IPFIXDecoder()
790 # template packet should arrive immediately
791 templates = ipfix.verify_templates(ipfix_decoder, count=1)
793 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
794 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
796 # make sure the one packet we expect actually showed up
797 self.vapi.ipfix_flush()
798 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
799 self.verify_cflow_data_detail(
808 61: (self.direction == "tx"),
812 # expected two templates and one cflow packet
813 self.collector.get_capture(2)
815 ipfix.remove_vpp_config()
816 self.logger.info("FFP_TEST_FINISH_0002")
818 def test_L4onIP4(self):
819 """L4 data on IP4 datapath"""
820 self.logger.info("FFP_TEST_START_0003")
821 self.pg_enable_capture(self.pg_interfaces)
829 direction=self.direction,
831 ipfix.add_vpp_config()
833 ipfix_decoder = IPFIXDecoder()
834 # template packet should arrive immediately
835 templates = ipfix.verify_templates(ipfix_decoder, count=1)
837 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
838 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
840 # make sure the one packet we expect actually showed up
841 self.vapi.ipfix_flush()
842 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
843 self.verify_cflow_data_detail(
847 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
850 # expected two templates and one cflow packet
851 self.collector.get_capture(2)
853 ipfix.remove_vpp_config()
854 self.logger.info("FFP_TEST_FINISH_0003")
856 def test_templatesIP6(self):
857 """verify templates on IP6 datapath"""
858 self.logger.info("FFP_TEST_START_0000")
859 self.pg_enable_capture(self.pg_interfaces)
862 test=self, intf=self.intf1, datapath="ip6", direction=self.direction
864 ipfix.add_vpp_config()
866 # template packet should arrive immediately
867 ipfix.verify_templates(count=1)
868 self.collector.get_capture(1)
870 ipfix.remove_vpp_config()
872 self.logger.info("FFP_TEST_FINISH_0000")
874 def test_L2onIP6(self):
875 """L2 data on IP6 datapath"""
876 self.logger.info("FFP_TEST_START_0001")
877 self.pg_enable_capture(self.pg_interfaces)
885 direction=self.direction,
887 ipfix.add_vpp_config()
889 ipfix_decoder = IPFIXDecoder()
890 # template packet should arrive immediately
891 templates = ipfix.verify_templates(ipfix_decoder, count=1)
893 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
894 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
896 # make sure the one packet we expect actually showed up
897 self.vapi.ipfix_flush()
898 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
899 self.verify_cflow_data_detail(
903 {2: "packets", 256: 56710, 61: (self.direction == "tx")},
907 # expected two templates and one cflow packet
908 self.collector.get_capture(2)
910 ipfix.remove_vpp_config()
911 self.logger.info("FFP_TEST_FINISH_0001")
913 def test_L3onIP6(self):
914 """L3 data on IP6 datapath"""
915 self.logger.info("FFP_TEST_START_0002")
916 self.pg_enable_capture(self.pg_interfaces)
924 direction=self.direction,
926 ipfix.add_vpp_config()
928 ipfix_decoder = IPFIXDecoder()
929 # template packet should arrive immediately
930 templates = ipfix.verify_templates(ipfix_decoder, count=1)
932 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
933 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
935 # make sure the one packet we expect actually showed up
936 self.vapi.ipfix_flush()
937 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
938 self.verify_cflow_data_detail(
942 {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
946 # expected two templates and one cflow packet
947 self.collector.get_capture(2)
949 ipfix.remove_vpp_config()
950 self.logger.info("FFP_TEST_FINISH_0002")
952 def test_L4onIP6(self):
953 """L4 data on IP6 datapath"""
954 self.logger.info("FFP_TEST_START_0003")
955 self.pg_enable_capture(self.pg_interfaces)
963 direction=self.direction,
965 ipfix.add_vpp_config()
967 ipfix_decoder = IPFIXDecoder()
968 # template packet should arrive immediately
969 templates = ipfix.verify_templates(ipfix_decoder, count=1)
971 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
972 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
974 # make sure the one packet we expect actually showed up
975 self.vapi.ipfix_flush()
976 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
977 self.verify_cflow_data_detail(
981 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
985 # expected two templates and one cflow packet
986 self.collector.get_capture(2)
988 ipfix.remove_vpp_config()
989 self.logger.info("FFP_TEST_FINISH_0003")
992 """no timers, one CFLOW packet, 9 Flows inside"""
993 self.logger.info("FFP_TEST_START_0001")
994 self.pg_enable_capture(self.pg_interfaces)
997 ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
998 ipfix.add_vpp_config()
1000 ipfix_decoder = IPFIXDecoder()
1001 # template packet should arrive immediately
1002 templates = ipfix.verify_templates(ipfix_decoder)
1004 self.create_stream(packets=9)
1005 capture = self.send_packets()
1007 # make sure the one packet we expect actually showed up
1008 self.vapi.ipfix_flush()
1009 cflow = self.wait_for_cflow_packet(self.collector, templates[1])
1010 self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
1011 self.collector.get_capture(4)
1013 ipfix.remove_vpp_config()
1014 self.logger.info("FFP_TEST_FINISH_0001")
1016 def test_0002(self):
1017 """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
1018 self.logger.info("FFP_TEST_START_0002")
1019 self.pg_enable_capture(self.pg_interfaces)
1022 ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
1023 ipfix.add_vpp_config()
1025 ipfix_decoder = IPFIXDecoder()
1026 # template packet should arrive immediately
1027 self.vapi.ipfix_flush()
1028 templates = ipfix.verify_templates(ipfix_decoder)
1030 self.create_stream(packets=6)
1031 capture = self.send_packets()
1033 # make sure the one packet we expect actually showed up
1035 self.vapi.ipfix_flush()
1036 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1037 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1038 self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
1039 self.collector.get_capture(5)
1041 ipfix.remove_vpp_config()
1042 self.logger.info("FFP_TEST_FINISH_0002")
1045 @tag_fixme_vpp_workers
1046 class DatapathTx(MethodHolder, DatapathTestsHolder):
1047 """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
1055 @tag_fixme_vpp_workers
1056 class DatapathRx(MethodHolder, DatapathTestsHolder):
1057 """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
1065 @unittest.skipUnless(config.extended, "part of extended tests")
1066 class DisableIPFIX(MethodHolder):
1070 def setUpClass(cls):
1071 super(DisableIPFIX, cls).setUpClass()
1074 def tearDownClass(cls):
1075 super(DisableIPFIX, cls).tearDownClass()
1077 def test_0001(self):
1078 """disable IPFIX after first packets"""
1079 self.logger.info("FFP_TEST_START_0001")
1080 self.pg_enable_capture(self.pg_interfaces)
1083 ipfix = VppCFLOW(test=self)
1084 ipfix.add_vpp_config()
1086 ipfix_decoder = IPFIXDecoder()
1087 # template packet should arrive immediately
1088 templates = ipfix.verify_templates(ipfix_decoder)
1090 self.create_stream()
1093 # make sure the one packet we expect actually showed up
1094 self.vapi.ipfix_flush()
1095 self.wait_for_cflow_packet(self.collector, templates[1])
1096 self.collector.get_capture(4)
1099 ipfix.disable_exporter()
1100 self.pg_enable_capture([self.collector])
1104 # make sure no one packet arrived in 1 minute
1105 self.vapi.ipfix_flush()
1106 self.sleep(1, "wait before verifying no packets sent")
1107 self.collector.assert_nothing_captured()
1109 ipfix.remove_vpp_config()
1110 self.logger.info("FFP_TEST_FINISH_0001")
1113 @unittest.skipUnless(config.extended, "part of extended tests")
1114 class ReenableIPFIX(MethodHolder):
1115 """Re-enable IPFIX"""
1118 def setUpClass(cls):
1119 super(ReenableIPFIX, cls).setUpClass()
1122 def tearDownClass(cls):
1123 super(ReenableIPFIX, cls).tearDownClass()
1125 def test_0011(self):
1126 """disable IPFIX after first packets and re-enable after few packets"""
1127 self.logger.info("FFP_TEST_START_0001")
1128 self.pg_enable_capture(self.pg_interfaces)
1131 ipfix = VppCFLOW(test=self)
1132 ipfix.add_vpp_config()
1134 ipfix_decoder = IPFIXDecoder()
1135 # template packet should arrive immediately
1136 templates = ipfix.verify_templates(ipfix_decoder)
1138 self.create_stream(packets=5)
1141 # make sure the one packet we expect actually showed up
1142 self.vapi.ipfix_flush()
1143 self.wait_for_cflow_packet(self.collector, templates[1])
1144 self.collector.get_capture(4)
1147 ipfix.disable_exporter()
1148 self.vapi.ipfix_flush()
1149 self.pg_enable_capture([self.collector])
1153 # make sure no one packet arrived in active timer span
1154 self.vapi.ipfix_flush()
1155 self.sleep(1, "wait before verifying no packets sent")
1156 self.collector.assert_nothing_captured()
1157 self.pg2.get_capture(5)
1160 ipfix.enable_exporter()
1162 capture = self.collector.get_capture(4)
1166 self.assertTrue(p.haslayer(IPFIX))
1167 if p.haslayer(Template):
1169 self.assertTrue(nr_templates, 3)
1171 self.assertTrue(p.haslayer(IPFIX))
1172 if p.haslayer(Data):
1174 self.assertTrue(nr_templates, 1)
1176 ipfix.remove_vpp_config()
1177 self.logger.info("FFP_TEST_FINISH_0001")
1180 @unittest.skipUnless(config.extended, "part of extended tests")
1181 class DisableFP(MethodHolder):
1182 """Disable Flowprobe feature"""
1185 def setUpClass(cls):
1186 super(DisableFP, cls).setUpClass()
1189 def tearDownClass(cls):
1190 super(DisableFP, cls).tearDownClass()
1192 def test_0001(self):
1193 """disable flowprobe feature after first packets"""
1194 self.logger.info("FFP_TEST_START_0001")
1195 self.pg_enable_capture(self.pg_interfaces)
1197 ipfix = VppCFLOW(test=self)
1198 ipfix.add_vpp_config()
1200 ipfix_decoder = IPFIXDecoder()
1201 # template packet should arrive immediately
1202 templates = ipfix.verify_templates(ipfix_decoder)
1204 self.create_stream()
1207 # make sure the one packet we expect actually showed up
1208 self.vapi.ipfix_flush()
1209 self.wait_for_cflow_packet(self.collector, templates[1])
1210 self.collector.get_capture(4)
1213 ipfix.disable_flowprobe_feature()
1214 self.pg_enable_capture([self.collector])
1218 # make sure no one packet arrived in active timer span
1219 self.vapi.ipfix_flush()
1220 self.sleep(1, "wait before verifying no packets sent")
1221 self.collector.assert_nothing_captured()
1223 ipfix.remove_vpp_config()
1224 self.logger.info("FFP_TEST_FINISH_0001")
1227 @unittest.skipUnless(config.extended, "part of extended tests")
1228 class ReenableFP(MethodHolder):
1229 """Re-enable Flowprobe feature"""
1232 def setUpClass(cls):
1233 super(ReenableFP, cls).setUpClass()
1236 def tearDownClass(cls):
1237 super(ReenableFP, cls).tearDownClass()
1239 def test_0001(self):
1240 """disable flowprobe feature after first packets and re-enable
1241 after few packets"""
1242 self.logger.info("FFP_TEST_START_0001")
1243 self.pg_enable_capture(self.pg_interfaces)
1246 ipfix = VppCFLOW(test=self)
1247 ipfix.add_vpp_config()
1249 ipfix_decoder = IPFIXDecoder()
1250 # template packet should arrive immediately
1251 self.vapi.ipfix_flush()
1252 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1254 self.create_stream()
1257 # make sure the one packet we expect actually showed up
1258 self.vapi.ipfix_flush()
1259 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1260 self.collector.get_capture(4)
1262 # disable FPP feature
1263 ipfix.disable_flowprobe_feature()
1264 self.pg_enable_capture([self.collector])
1268 # make sure no one packet arrived in active timer span
1269 self.vapi.ipfix_flush()
1270 self.sleep(5, "wait before verifying no packets sent")
1271 self.collector.assert_nothing_captured()
1273 # enable FPP feature
1274 ipfix.enable_flowprobe_feature()
1275 self.vapi.ipfix_flush()
1276 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1280 # make sure the next packets (templates and data) we expect actually
1282 self.vapi.ipfix_flush()
1283 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1284 self.collector.get_capture(4)
1286 ipfix.remove_vpp_config()
1287 self.logger.info("FFP_TEST_FINISH_0001")
1290 if __name__ == "__main__":
1291 unittest.main(testRunner=VppTestRunner)