2 from __future__ import print_function
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, TCP, UDP
13 from scapy.layers.inet6 import IPv6
14 from scapy.contrib.lacp import SlowProtocol, LACP
16 from config import config
17 from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11
18 from framework import is_distro_ubuntu2204, is_distro_debian11
19 from framework import VppTestCase, VppTestRunner
20 from framework import tag_run_solo
21 from vpp_object import VppObject
22 from vpp_pg_interface import CaptureTimeoutError
24 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
25 from vpp_ip_route import VppIpRoute, VppRoutePath
26 from vpp_papi.macaddress import mac_ntop
27 from socket import inet_ntop
28 from vpp_papi import VppEnum
31 class VppCFLOW(VppObject):
32 """CFLOW object for IPFIX exporter and Flowprobe feature"""
48 self._intf_obj = getattr(self._test, intf)
50 if passive == 0 or passive < active:
51 self._passive = active + 1
53 self._passive = passive
54 self._datapath = datapath # l2 ip4 ip6
55 self._collect = layer # l2 l3 l4
56 self._direction = direction # rx tx both
57 self._timeout = timeout
59 self._configured = False
61 def add_vpp_config(self):
62 self.enable_exporter()
66 if "l2" in self._collect.lower():
67 l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
68 if "l3" in self._collect.lower():
69 l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
70 if "l4" in self._collect.lower():
71 l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
72 self._test.vapi.flowprobe_set_params(
73 record_flags=(l2_flag | l3_flag | l4_flag),
74 active_timer=self._active,
75 passive_timer=self._passive,
77 self.enable_flowprobe_feature()
78 self._test.vapi.cli("ipfix flush")
79 self._configured = True
81 def remove_vpp_config(self):
82 self.disable_exporter()
83 self.disable_flowprobe_feature()
84 self._test.vapi.cli("ipfix flush")
85 self._configured = False
87 def enable_exporter(self):
88 self._test.vapi.set_ipfix_exporter(
89 collector_address=self._test.pg0.remote_ip4,
90 src_address=self._test.pg0.local_ip4,
92 template_interval=self._timeout,
95 def _enable_disable_flowprobe_feature(self, is_add):
97 "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
98 "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
99 "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
102 "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
103 "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
104 "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
106 self._test.vapi.flowprobe_interface_add_del(
108 which=which_map[self._datapath],
109 direction=direction_map[self._direction],
110 sw_if_index=self._intf_obj.sw_if_index,
113 def enable_flowprobe_feature(self):
114 self._enable_disable_flowprobe_feature(is_add=True)
116 def disable_exporter(self):
117 self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
119 def disable_flowprobe_feature(self):
120 self._enable_disable_flowprobe_feature(is_add=False)
123 return "ipfix-collector-%s-%s" % (self._src, self.dst)
125 def query_vpp_config(self):
126 return self._configured
128 def verify_templates(self, decoder=None, timeout=1, count=3):
130 self._test.assertIn(count, (1, 2, 3))
131 for _ in range(count):
132 p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
133 self._test.assertTrue(p.haslayer(IPFIX))
134 if decoder is not None and p.haslayer(Template):
135 templates.append(p[Template].templateID)
136 decoder.add_template(p.getlayer(Template))
140 class MethodHolder(VppTestCase):
141 """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
145 max_number_of_packets = 10
151 Perform standard class setup (defined by class method setUpClass in
152 class VppTestCase) before running the test case, set test case related
153 variables and configure VPP.
155 super(MethodHolder, cls).setUpClass()
156 if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
161 # Create pg interfaces
162 cls.create_pg_interfaces(range(9))
165 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
167 # Create BD with MAC learning and unknown unicast flooding disabled
168 # and put interfaces to this BD
169 cls.vapi.bridge_domain_add_del_v2(
170 bd_id=1, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
172 cls.vapi.sw_interface_set_l2_bridge(
173 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
175 cls.vapi.sw_interface_set_l2_bridge(
176 rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
179 # Set up all interfaces
180 for i in cls.pg_interfaces:
184 cls.pg0.configure_ipv4_neighbors()
185 cls.collector = cls.pg0
188 cls.pg1.resolve_arp()
190 cls.pg2.resolve_arp()
192 cls.pg3.resolve_arp()
194 cls.pg4.resolve_arp()
197 cls.pg8.configure_ipv4_neighbors()
200 cls.pg5.resolve_ndp()
201 cls.pg5.disable_ipv6_ra()
203 cls.pg6.resolve_ndp()
204 cls.pg6.disable_ipv6_ra()
206 super(MethodHolder, cls).tearDownClass()
210 def tearDownClass(cls):
211 super(MethodHolder, cls).tearDownClass()
214 self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
216 """Create a packet stream to tickle the plugin
218 :param VppInterface src_if: Source interface for packet stream
219 :param VppInterface src_if: Dst interface for packet stream
227 packets = random.randint(1, self.max_number_of_packets)
229 for p in range(0, packets):
231 pkt_size = random.choice(self.pg_if_packet_sizes)
232 info = self.create_packet_info(src_if, dst_if)
233 payload = self.info_to_payload(info)
234 p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
236 p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
238 p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
239 p /= UDP(sport=1234, dport=4321)
242 self.extend_packet(p, pkt_size)
245 def verify_cflow_data(self, decoder, capture, cflow):
251 if cflow.haslayer(Data):
252 data = decoder.decode_data_set(cflow.getlayer(Set))
254 self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
255 self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
257 def send_packets(self, src_if=None, dst_if=None):
262 self.pg_enable_capture([dst_if])
263 src_if.add_stream(self.pkts)
265 return dst_if.get_capture(len(self.pkts))
267 def verify_cflow_data_detail(
268 self, decoder, capture, cflow, data_set={1: "octets", 2: "packets"}, ip_ver="v4"
271 print(capture[0].show())
272 if cflow.haslayer(Data):
273 data = decoder.decode_data_set(cflow.getlayer(Set))
277 ip_layer = capture[0][IP] if capture[0].haslayer(IP) else None
279 ip_layer = capture[0][IPv6] if capture[0].haslayer(IPv6) else None
280 if data_set is not None:
282 # skip flow if ingress/egress interface is 0
283 if int(binascii.hexlify(record[10]), 16) == 0:
285 if int(binascii.hexlify(record[14]), 16) == 0:
288 for field in data_set:
289 value = data_set[field]
290 if value == "octets":
293 value += 40 # ??? is this correct
294 elif value == "packets":
296 elif value == "src_ip":
298 ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
300 ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
301 value = int(binascii.hexlify(ip), 16)
302 elif value == "dst_ip":
304 ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
306 ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
307 value = int(binascii.hexlify(ip), 16)
308 elif value == "sport":
309 value = int(capture[0][UDP].sport)
310 elif value == "dport":
311 value = int(capture[0][UDP].dport)
313 int(binascii.hexlify(record[field]), 16), value
316 def verify_cflow_data_notimer(self, decoder, capture, cflows):
319 if cflow.haslayer(Data):
320 data = decoder.decode_data_set(cflow.getlayer(Set))
322 raise Exception("No CFLOW data")
327 self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
328 self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
329 self.assertEqual(len(capture), idx)
331 def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
332 """wait for CFLOW packet and verify its correctness
334 :param timeout: how long to wait
337 self.logger.info("IPFIX: Waiting for CFLOW packet")
338 # self.logger.debug(self.vapi.ppcli("show flow table"))
339 p = collector_intf.wait_for_packet(timeout=timeout)
340 self.assertEqual(p[Set].setID, set_id)
341 # self.logger.debug(self.vapi.ppcli("show flow table"))
342 self.logger.debug(ppp("IPFIX: Got packet:", p))
347 @tag_fixme_vpp_workers
348 @tag_fixme_ubuntu2204
350 class Flowprobe(MethodHolder):
351 """Template verification, timer tests"""
355 super(Flowprobe, cls).setUpClass()
358 def tearDownClass(cls):
359 super(Flowprobe, cls).tearDownClass()
362 """timer less than template timeout"""
363 self.logger.info("FFP_TEST_START_0001")
364 self.pg_enable_capture(self.pg_interfaces)
367 ipfix = VppCFLOW(test=self, active=2)
368 ipfix.add_vpp_config()
370 ipfix_decoder = IPFIXDecoder()
371 # template packet should arrive immediately
372 templates = ipfix.verify_templates(ipfix_decoder)
374 self.create_stream(packets=1)
376 capture = self.pg2.get_capture(1)
378 # make sure the one packet we expect actually showed up
379 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
380 self.verify_cflow_data(ipfix_decoder, capture, cflow)
382 ipfix.remove_vpp_config()
383 self.logger.info("FFP_TEST_FINISH_0001")
386 """timer greater than template timeout"""
387 self.logger.info("FFP_TEST_START_0002")
388 self.pg_enable_capture(self.pg_interfaces)
391 ipfix = VppCFLOW(test=self, timeout=3, active=4)
392 ipfix.add_vpp_config()
394 ipfix_decoder = IPFIXDecoder()
395 # template packet should arrive immediately
396 ipfix.verify_templates()
398 self.create_stream(packets=2)
400 capture = self.pg2.get_capture(2)
402 # next set of template packet should arrive after 20 seconds
403 # template packet should arrive within 20 s
404 templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
406 # make sure the one packet we expect actually showed up
407 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
408 self.verify_cflow_data(ipfix_decoder, capture, cflow)
410 ipfix.remove_vpp_config()
411 self.logger.info("FFP_TEST_FINISH_0002")
413 def test_cflow_packet(self):
414 """verify cflow packet fields"""
415 self.logger.info("FFP_TEST_START_0000")
416 self.pg_enable_capture(self.pg_interfaces)
420 test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
422 ipfix.add_vpp_config()
424 route_9001 = VppIpRoute(
428 [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
430 route_9001.add_vpp_config()
432 ipfix_decoder = IPFIXDecoder()
433 templates = ipfix.verify_templates(ipfix_decoder, count=1)
437 Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
438 / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
439 / TCP(sport=1234, dport=4321, flags=80)
444 nowUTC = int(time.time())
445 nowUNIX = nowUTC + 2208988800
446 self.send_packets(src_if=self.pg7, dst_if=self.pg8)
448 cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
449 self.collector.get_capture(2)
451 if cflow[0].haslayer(IPFIX):
452 self.assertEqual(cflow[IPFIX].version, 10)
453 self.assertEqual(cflow[IPFIX].observationDomainID, 1)
454 self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
455 self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
456 if cflow.haslayer(Data):
457 record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
459 self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
461 self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
463 self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
465 self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
467 self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
469 self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
470 flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
471 # flow start timestamp
472 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
473 flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
475 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
477 self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
479 self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
481 self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
483 self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
485 self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
487 self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
489 self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
491 ipfix.remove_vpp_config()
492 self.logger.info("FFP_TEST_FINISH_0000")
494 def test_interface_dump(self):
495 """Dump interfaces with IPFIX flow record generation enabled"""
496 self.logger.info("FFP_TEST_START_0003")
498 # Enable feature for 3 interfaces
499 ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx")
500 ipfix1.add_vpp_config()
502 ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx")
503 ipfix2.enable_flowprobe_feature()
505 ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both")
506 ipfix3.enable_flowprobe_feature()
508 # When request "all", dump should contain all enabled interfaces
509 dump = self.vapi.flowprobe_interface_dump()
510 self.assertEqual(len(dump), 3)
512 # Verify 1st interface
513 self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index)
515 dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2
519 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
522 # Verify 2nd interface
523 self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index)
525 dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
529 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
532 # Verify 3rd interface
533 self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index)
535 dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6
539 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
542 # When request 2nd interface, dump should contain only the specified interface
543 dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index)
544 self.assertEqual(len(dump), 1)
546 # Verify 2nd interface
547 self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index)
549 dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
553 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
556 # When request 99th interface, dump should be empty
557 dump = self.vapi.flowprobe_interface_dump(sw_if_index=99)
558 self.assertEqual(len(dump), 0)
560 ipfix1.remove_vpp_config()
561 ipfix2.remove_vpp_config()
562 ipfix3.remove_vpp_config()
563 self.logger.info("FFP_TEST_FINISH_0003")
565 def test_get_params(self):
566 """Get IPFIX flow record generation parameters"""
567 self.logger.info("FFP_TEST_START_0004")
569 # Enable feature for an interface with custom parameters
570 ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4")
571 ipfix.add_vpp_config()
573 # Get and verify parameters
574 params = self.vapi.flowprobe_get_params()
575 self.assertEqual(params.active_timer, 20)
576 self.assertEqual(params.passive_timer, 40)
577 record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
578 record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
579 record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
580 self.assertEqual(params.record_flags, record_flags)
582 ipfix.remove_vpp_config()
583 self.logger.info("FFP_TEST_FINISH_0004")
586 class DatapathTestsHolder(object):
587 """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
591 super(DatapathTestsHolder, cls).setUpClass()
594 def tearDownClass(cls):
595 super(DatapathTestsHolder, cls).tearDownClass()
597 def test_templatesL2(self):
598 """verify template on L2 datapath"""
599 self.logger.info("FFP_TEST_START_0000")
600 self.pg_enable_capture(self.pg_interfaces)
603 test=self, intf=self.intf1, layer="l2", direction=self.direction
605 ipfix.add_vpp_config()
607 # template packet should arrive immediately
608 self.vapi.ipfix_flush()
609 ipfix.verify_templates(timeout=3, count=1)
610 self.collector.get_capture(1)
612 ipfix.remove_vpp_config()
613 self.logger.info("FFP_TEST_FINISH_0000")
615 def test_L2onL2(self):
616 """L2 data on L2 datapath"""
617 self.logger.info("FFP_TEST_START_0001")
618 self.pg_enable_capture(self.pg_interfaces)
622 test=self, intf=self.intf1, layer="l2", direction=self.direction
624 ipfix.add_vpp_config()
626 ipfix_decoder = IPFIXDecoder()
627 # template packet should arrive immediately
628 templates = ipfix.verify_templates(ipfix_decoder, count=1)
630 self.create_stream(packets=1)
631 capture = self.send_packets()
633 # make sure the one packet we expect actually showed up
634 self.vapi.ipfix_flush()
635 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
636 self.verify_cflow_data_detail(
640 {2: "packets", 256: 8, 61: (self.direction == "tx")},
642 self.collector.get_capture(2)
644 ipfix.remove_vpp_config()
645 self.logger.info("FFP_TEST_FINISH_0001")
647 def test_L3onL2(self):
648 """L3 data on L2 datapath"""
649 self.logger.info("FFP_TEST_START_0002")
650 self.pg_enable_capture(self.pg_interfaces)
654 test=self, intf=self.intf1, layer="l3", direction=self.direction
656 ipfix.add_vpp_config()
658 ipfix_decoder = IPFIXDecoder()
659 # template packet should arrive immediately
660 templates = ipfix.verify_templates(ipfix_decoder, count=2)
662 self.create_stream(packets=1)
663 capture = self.send_packets()
665 # make sure the one packet we expect actually showed up
666 self.vapi.ipfix_flush()
667 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
668 self.verify_cflow_data_detail(
677 61: (self.direction == "tx"),
681 self.collector.get_capture(3)
683 ipfix.remove_vpp_config()
684 self.logger.info("FFP_TEST_FINISH_0002")
686 def test_L23onL2(self):
687 """L2/3 data on L2 datapath"""
688 self.pg_enable_capture(self.pg_interfaces)
692 test=self, intf=self.intf1, layer="l2 l3", direction=self.direction
694 ipfix.add_vpp_config()
696 ipfix_decoder = IPFIXDecoder()
697 # template packet should arrive immediately
698 templates = ipfix.verify_templates(ipfix_decoder, count=3)
700 # verify IPv4 and IPv6 flows
701 for ip_ver in ("v4", "v6"):
702 self.create_stream(packets=1, ip_ver=ip_ver)
703 capture = self.send_packets()
705 # make sure the one packet we expect actually showed up
706 self.vapi.ipfix_flush()
707 cflow = self.wait_for_cflow_packet(
708 self.collector, templates[1 if ip_ver == "v4" else 2]
710 src_ip_id = 8 if ip_ver == "v4" else 27
711 dst_ip_id = 12 if ip_ver == "v4" else 28
712 self.verify_cflow_data_detail(
718 256: 8 if ip_ver == "v4" else 56710,
722 61: (self.direction == "tx"),
730 Ether(dst=self.pg2.local_mac, src=self.pg1.remote_mac)
735 capture = self.send_packets()
737 # make sure the one packet we expect actually showed up
738 self.vapi.ipfix_flush()
739 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
740 self.verify_cflow_data_detail(
744 {2: "packets", 256: 2440, 61: (self.direction == "tx")},
747 self.collector.get_capture(6)
749 ipfix.remove_vpp_config()
751 def test_L4onL2(self):
752 """L4 data on L2 datapath"""
753 self.logger.info("FFP_TEST_START_0003")
754 self.pg_enable_capture(self.pg_interfaces)
758 test=self, intf=self.intf1, layer="l4", direction=self.direction
760 ipfix.add_vpp_config()
762 ipfix_decoder = IPFIXDecoder()
763 # template packet should arrive immediately
764 templates = ipfix.verify_templates(ipfix_decoder, count=2)
766 self.create_stream(packets=1)
767 capture = self.send_packets()
769 # make sure the one packet we expect actually showed up
770 self.vapi.ipfix_flush()
771 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
772 self.verify_cflow_data_detail(
776 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
779 self.collector.get_capture(3)
781 ipfix.remove_vpp_config()
782 self.logger.info("FFP_TEST_FINISH_0003")
784 def test_templatesIp4(self):
785 """verify templates on IP4 datapath"""
786 self.logger.info("FFP_TEST_START_0000")
788 self.pg_enable_capture(self.pg_interfaces)
791 test=self, intf=self.intf1, datapath="ip4", direction=self.direction
793 ipfix.add_vpp_config()
795 # template packet should arrive immediately
796 self.vapi.ipfix_flush()
797 ipfix.verify_templates(timeout=3, count=1)
798 self.collector.get_capture(1)
800 ipfix.remove_vpp_config()
802 self.logger.info("FFP_TEST_FINISH_0000")
804 def test_L2onIP4(self):
805 """L2 data on IP4 datapath"""
806 self.logger.info("FFP_TEST_START_0001")
807 self.pg_enable_capture(self.pg_interfaces)
815 direction=self.direction,
817 ipfix.add_vpp_config()
819 ipfix_decoder = IPFIXDecoder()
820 # template packet should arrive immediately
821 templates = ipfix.verify_templates(ipfix_decoder, count=1)
823 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
824 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
826 # make sure the one packet we expect actually showed up
827 self.vapi.ipfix_flush()
828 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
829 self.verify_cflow_data_detail(
833 {2: "packets", 256: 8, 61: (self.direction == "tx")},
836 # expected two templates and one cflow packet
837 self.collector.get_capture(2)
839 ipfix.remove_vpp_config()
840 self.logger.info("FFP_TEST_FINISH_0001")
842 def test_L3onIP4(self):
843 """L3 data on IP4 datapath"""
844 self.logger.info("FFP_TEST_START_0002")
845 self.pg_enable_capture(self.pg_interfaces)
853 direction=self.direction,
855 ipfix.add_vpp_config()
857 ipfix_decoder = IPFIXDecoder()
858 # template packet should arrive immediately
859 templates = ipfix.verify_templates(ipfix_decoder, count=1)
861 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
862 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
864 # make sure the one packet we expect actually showed up
865 self.vapi.ipfix_flush()
866 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
867 self.verify_cflow_data_detail(
876 61: (self.direction == "tx"),
880 # expected two templates and one cflow packet
881 self.collector.get_capture(2)
883 ipfix.remove_vpp_config()
884 self.logger.info("FFP_TEST_FINISH_0002")
886 def test_L4onIP4(self):
887 """L4 data on IP4 datapath"""
888 self.logger.info("FFP_TEST_START_0003")
889 self.pg_enable_capture(self.pg_interfaces)
897 direction=self.direction,
899 ipfix.add_vpp_config()
901 ipfix_decoder = IPFIXDecoder()
902 # template packet should arrive immediately
903 templates = ipfix.verify_templates(ipfix_decoder, count=1)
905 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
906 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
908 # make sure the one packet we expect actually showed up
909 self.vapi.ipfix_flush()
910 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
911 self.verify_cflow_data_detail(
915 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
918 # expected two templates and one cflow packet
919 self.collector.get_capture(2)
921 ipfix.remove_vpp_config()
922 self.logger.info("FFP_TEST_FINISH_0003")
924 def test_templatesIP6(self):
925 """verify templates on IP6 datapath"""
926 self.logger.info("FFP_TEST_START_0000")
927 self.pg_enable_capture(self.pg_interfaces)
930 test=self, intf=self.intf1, datapath="ip6", direction=self.direction
932 ipfix.add_vpp_config()
934 # template packet should arrive immediately
935 ipfix.verify_templates(count=1)
936 self.collector.get_capture(1)
938 ipfix.remove_vpp_config()
940 self.logger.info("FFP_TEST_FINISH_0000")
942 def test_L2onIP6(self):
943 """L2 data on IP6 datapath"""
944 self.logger.info("FFP_TEST_START_0001")
945 self.pg_enable_capture(self.pg_interfaces)
953 direction=self.direction,
955 ipfix.add_vpp_config()
957 ipfix_decoder = IPFIXDecoder()
958 # template packet should arrive immediately
959 templates = ipfix.verify_templates(ipfix_decoder, count=1)
961 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
962 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
964 # make sure the one packet we expect actually showed up
965 self.vapi.ipfix_flush()
966 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
967 self.verify_cflow_data_detail(
971 {2: "packets", 256: 56710, 61: (self.direction == "tx")},
975 # expected two templates and one cflow packet
976 self.collector.get_capture(2)
978 ipfix.remove_vpp_config()
979 self.logger.info("FFP_TEST_FINISH_0001")
981 def test_L3onIP6(self):
982 """L3 data on IP6 datapath"""
983 self.logger.info("FFP_TEST_START_0002")
984 self.pg_enable_capture(self.pg_interfaces)
992 direction=self.direction,
994 ipfix.add_vpp_config()
996 ipfix_decoder = IPFIXDecoder()
997 # template packet should arrive immediately
998 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1000 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1001 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1003 # make sure the one packet we expect actually showed up
1004 self.vapi.ipfix_flush()
1005 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1006 self.verify_cflow_data_detail(
1010 {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
1014 # expected two templates and one cflow packet
1015 self.collector.get_capture(2)
1017 ipfix.remove_vpp_config()
1018 self.logger.info("FFP_TEST_FINISH_0002")
1020 def test_L4onIP6(self):
1021 """L4 data on IP6 datapath"""
1022 self.logger.info("FFP_TEST_START_0003")
1023 self.pg_enable_capture(self.pg_interfaces)
1031 direction=self.direction,
1033 ipfix.add_vpp_config()
1035 ipfix_decoder = IPFIXDecoder()
1036 # template packet should arrive immediately
1037 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1039 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1040 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1042 # make sure the one packet we expect actually showed up
1043 self.vapi.ipfix_flush()
1044 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1045 self.verify_cflow_data_detail(
1049 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
1053 # expected two templates and one cflow packet
1054 self.collector.get_capture(2)
1056 ipfix.remove_vpp_config()
1057 self.logger.info("FFP_TEST_FINISH_0003")
1059 def test_0001(self):
1060 """no timers, one CFLOW packet, 9 Flows inside"""
1061 self.logger.info("FFP_TEST_START_0001")
1062 self.pg_enable_capture(self.pg_interfaces)
1065 ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
1066 ipfix.add_vpp_config()
1068 ipfix_decoder = IPFIXDecoder()
1069 # template packet should arrive immediately
1070 templates = ipfix.verify_templates(ipfix_decoder)
1072 self.create_stream(packets=9)
1073 capture = self.send_packets()
1075 # make sure the one packet we expect actually showed up
1076 self.vapi.ipfix_flush()
1077 cflow = self.wait_for_cflow_packet(self.collector, templates[1])
1078 self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
1079 self.collector.get_capture(4)
1081 ipfix.remove_vpp_config()
1082 self.logger.info("FFP_TEST_FINISH_0001")
1084 def test_0002(self):
1085 """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
1086 self.logger.info("FFP_TEST_START_0002")
1087 self.pg_enable_capture(self.pg_interfaces)
1090 ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
1091 ipfix.add_vpp_config()
1093 ipfix_decoder = IPFIXDecoder()
1094 # template packet should arrive immediately
1095 self.vapi.ipfix_flush()
1096 templates = ipfix.verify_templates(ipfix_decoder)
1098 self.create_stream(packets=6)
1099 capture = self.send_packets()
1101 # make sure the one packet we expect actually showed up
1103 self.vapi.ipfix_flush()
1104 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1105 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1106 self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
1107 self.collector.get_capture(5)
1109 ipfix.remove_vpp_config()
1110 self.logger.info("FFP_TEST_FINISH_0002")
1113 @tag_fixme_vpp_workers
1114 class DatapathTx(MethodHolder, DatapathTestsHolder):
1115 """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
1123 @tag_fixme_vpp_workers
1124 class DatapathRx(MethodHolder, DatapathTestsHolder):
1125 """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
1133 @unittest.skipUnless(config.extended, "part of extended tests")
1134 class DisableIPFIX(MethodHolder):
1138 def setUpClass(cls):
1139 super(DisableIPFIX, cls).setUpClass()
1142 def tearDownClass(cls):
1143 super(DisableIPFIX, cls).tearDownClass()
1145 def test_0001(self):
1146 """disable IPFIX after first packets"""
1147 self.logger.info("FFP_TEST_START_0001")
1148 self.pg_enable_capture(self.pg_interfaces)
1151 ipfix = VppCFLOW(test=self)
1152 ipfix.add_vpp_config()
1154 ipfix_decoder = IPFIXDecoder()
1155 # template packet should arrive immediately
1156 templates = ipfix.verify_templates(ipfix_decoder)
1158 self.create_stream()
1161 # make sure the one packet we expect actually showed up
1162 self.vapi.ipfix_flush()
1163 self.wait_for_cflow_packet(self.collector, templates[1])
1164 self.collector.get_capture(4)
1167 ipfix.disable_exporter()
1168 self.pg_enable_capture([self.collector])
1172 # make sure no one packet arrived in 1 minute
1173 self.vapi.ipfix_flush()
1174 self.sleep(1, "wait before verifying no packets sent")
1175 self.collector.assert_nothing_captured()
1177 ipfix.remove_vpp_config()
1178 self.logger.info("FFP_TEST_FINISH_0001")
1181 @unittest.skipUnless(config.extended, "part of extended tests")
1182 class ReenableIPFIX(MethodHolder):
1183 """Re-enable IPFIX"""
1186 def setUpClass(cls):
1187 super(ReenableIPFIX, cls).setUpClass()
1190 def tearDownClass(cls):
1191 super(ReenableIPFIX, cls).tearDownClass()
1193 def test_0011(self):
1194 """disable IPFIX after first packets and re-enable after few packets"""
1195 self.logger.info("FFP_TEST_START_0001")
1196 self.pg_enable_capture(self.pg_interfaces)
1199 ipfix = VppCFLOW(test=self)
1200 ipfix.add_vpp_config()
1202 ipfix_decoder = IPFIXDecoder()
1203 # template packet should arrive immediately
1204 templates = ipfix.verify_templates(ipfix_decoder)
1206 self.create_stream(packets=5)
1209 # make sure the one packet we expect actually showed up
1210 self.vapi.ipfix_flush()
1211 self.wait_for_cflow_packet(self.collector, templates[1])
1212 self.collector.get_capture(4)
1215 ipfix.disable_exporter()
1216 self.vapi.ipfix_flush()
1217 self.pg_enable_capture([self.collector])
1221 # make sure no one packet arrived in active timer span
1222 self.vapi.ipfix_flush()
1223 self.sleep(1, "wait before verifying no packets sent")
1224 self.collector.assert_nothing_captured()
1225 self.pg2.get_capture(5)
1228 ipfix.enable_exporter()
1230 capture = self.collector.get_capture(4)
1234 self.assertTrue(p.haslayer(IPFIX))
1235 if p.haslayer(Template):
1237 self.assertTrue(nr_templates, 3)
1239 self.assertTrue(p.haslayer(IPFIX))
1240 if p.haslayer(Data):
1242 self.assertTrue(nr_templates, 1)
1244 ipfix.remove_vpp_config()
1245 self.logger.info("FFP_TEST_FINISH_0001")
1248 @unittest.skipUnless(config.extended, "part of extended tests")
1249 class DisableFP(MethodHolder):
1250 """Disable Flowprobe feature"""
1253 def setUpClass(cls):
1254 super(DisableFP, cls).setUpClass()
1257 def tearDownClass(cls):
1258 super(DisableFP, cls).tearDownClass()
1260 def test_0001(self):
1261 """disable flowprobe feature after first packets"""
1262 self.logger.info("FFP_TEST_START_0001")
1263 self.pg_enable_capture(self.pg_interfaces)
1265 ipfix = VppCFLOW(test=self)
1266 ipfix.add_vpp_config()
1268 ipfix_decoder = IPFIXDecoder()
1269 # template packet should arrive immediately
1270 templates = ipfix.verify_templates(ipfix_decoder)
1272 self.create_stream()
1275 # make sure the one packet we expect actually showed up
1276 self.vapi.ipfix_flush()
1277 self.wait_for_cflow_packet(self.collector, templates[1])
1278 self.collector.get_capture(4)
1281 ipfix.disable_flowprobe_feature()
1282 self.pg_enable_capture([self.collector])
1286 # make sure no one packet arrived in active timer span
1287 self.vapi.ipfix_flush()
1288 self.sleep(1, "wait before verifying no packets sent")
1289 self.collector.assert_nothing_captured()
1291 ipfix.remove_vpp_config()
1292 self.logger.info("FFP_TEST_FINISH_0001")
1295 @unittest.skipUnless(config.extended, "part of extended tests")
1296 class ReenableFP(MethodHolder):
1297 """Re-enable Flowprobe feature"""
1300 def setUpClass(cls):
1301 super(ReenableFP, cls).setUpClass()
1304 def tearDownClass(cls):
1305 super(ReenableFP, cls).tearDownClass()
1307 def test_0001(self):
1308 """disable flowprobe feature after first packets and re-enable
1309 after few packets"""
1310 self.logger.info("FFP_TEST_START_0001")
1311 self.pg_enable_capture(self.pg_interfaces)
1314 ipfix = VppCFLOW(test=self)
1315 ipfix.add_vpp_config()
1317 ipfix_decoder = IPFIXDecoder()
1318 # template packet should arrive immediately
1319 self.vapi.ipfix_flush()
1320 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1322 self.create_stream()
1325 # make sure the one packet we expect actually showed up
1326 self.vapi.ipfix_flush()
1327 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1328 self.collector.get_capture(4)
1330 # disable FPP feature
1331 ipfix.disable_flowprobe_feature()
1332 self.pg_enable_capture([self.collector])
1336 # make sure no one packet arrived in active timer span
1337 self.vapi.ipfix_flush()
1338 self.sleep(5, "wait before verifying no packets sent")
1339 self.collector.assert_nothing_captured()
1341 # enable FPP feature
1342 ipfix.enable_flowprobe_feature()
1343 self.vapi.ipfix_flush()
1344 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1348 # make sure the next packets (templates and data) we expect actually
1350 self.vapi.ipfix_flush()
1351 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1352 self.collector.get_capture(4)
1354 ipfix.remove_vpp_config()
1355 self.logger.info("FFP_TEST_FINISH_0001")
1358 if __name__ == "__main__":
1359 unittest.main(testRunner=VppTestRunner)