2 from __future__ import print_function
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, TCP, UDP
13 from scapy.layers.inet6 import IPv6
14 from scapy.contrib.lacp import SlowProtocol, LACP
16 from config import config
17 from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11
18 from framework import is_distro_ubuntu2204, is_distro_debian11
19 from framework import VppTestCase, VppTestRunner
20 from framework import tag_run_solo
21 from vpp_object import VppObject
22 from vpp_pg_interface import CaptureTimeoutError
24 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
25 from vpp_ip_route import VppIpRoute, VppRoutePath
26 from vpp_papi.macaddress import mac_ntop
27 from socket import inet_ntop
28 from vpp_papi import VppEnum
31 TMPL_COMMON_FIELD_COUNT = 6
32 TMPL_L2_FIELD_COUNT = 3
33 TMPL_L3_FIELD_COUNT = 4
34 TMPL_L4_FIELD_COUNT = 3
37 class VppCFLOW(VppObject):
38 """CFLOW object for IPFIX exporter and Flowprobe feature"""
54 self._intf_obj = getattr(self._test, intf)
56 if passive == 0 or passive < active:
57 self._passive = active + 1
59 self._passive = passive
60 self._datapath = datapath # l2 ip4 ip6
61 self._collect = layer # l2 l3 l4
62 self._direction = direction # rx tx both
63 self._timeout = timeout
65 self._configured = False
67 def add_vpp_config(self):
68 self.enable_exporter()
72 if "l2" in self._collect.lower():
73 l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
74 if "l3" in self._collect.lower():
75 l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
76 if "l4" in self._collect.lower():
77 l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
78 self._test.vapi.flowprobe_set_params(
79 record_flags=(l2_flag | l3_flag | l4_flag),
80 active_timer=self._active,
81 passive_timer=self._passive,
83 self.enable_flowprobe_feature()
84 self._test.vapi.cli("ipfix flush")
85 self._configured = True
87 def remove_vpp_config(self):
88 self.disable_exporter()
89 self.disable_flowprobe_feature()
90 self._test.vapi.cli("ipfix flush")
91 self._configured = False
93 def enable_exporter(self):
94 self._test.vapi.set_ipfix_exporter(
95 collector_address=self._test.pg0.remote_ip4,
96 src_address=self._test.pg0.local_ip4,
98 template_interval=self._timeout,
101 def _enable_disable_flowprobe_feature(self, is_add):
103 "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
104 "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
105 "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
108 "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
109 "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
110 "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
112 self._test.vapi.flowprobe_interface_add_del(
114 which=which_map[self._datapath],
115 direction=direction_map[self._direction],
116 sw_if_index=self._intf_obj.sw_if_index,
119 def enable_flowprobe_feature(self):
120 self._enable_disable_flowprobe_feature(is_add=True)
122 def disable_exporter(self):
123 self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
125 def disable_flowprobe_feature(self):
126 self._enable_disable_flowprobe_feature(is_add=False)
129 return "ipfix-collector-%s-%s" % (self._src, self.dst)
131 def query_vpp_config(self):
132 return self._configured
134 def verify_templates(self, decoder=None, timeout=1, count=3, field_count_in=None):
136 self._test.assertIn(count, (1, 2, 3))
137 for _ in range(count):
138 p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
139 self._test.assertTrue(p.haslayer(IPFIX))
140 self._test.assertTrue(p.haslayer(Template))
141 if decoder is not None:
142 templates.append(p[Template].templateID)
143 decoder.add_template(p.getlayer(Template))
144 if field_count_in is not None:
145 self._test.assertIn(p[Template].fieldCount, field_count_in)
149 class MethodHolder(VppTestCase):
150 """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
154 max_number_of_packets = 10
160 Perform standard class setup (defined by class method setUpClass in
161 class VppTestCase) before running the test case, set test case related
162 variables and configure VPP.
164 super(MethodHolder, cls).setUpClass()
165 if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
170 # Create pg interfaces
171 cls.create_pg_interfaces(range(9))
174 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
176 # Create BD with MAC learning and unknown unicast flooding disabled
177 # and put interfaces to this BD
178 cls.vapi.bridge_domain_add_del_v2(
179 bd_id=1, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
181 cls.vapi.sw_interface_set_l2_bridge(
182 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
184 cls.vapi.sw_interface_set_l2_bridge(
185 rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
188 # Set up all interfaces
189 for i in cls.pg_interfaces:
193 cls.pg0.configure_ipv4_neighbors()
194 cls.collector = cls.pg0
197 cls.pg1.resolve_arp()
199 cls.pg2.resolve_arp()
201 cls.pg3.resolve_arp()
203 cls.pg4.resolve_arp()
206 cls.pg8.configure_ipv4_neighbors()
209 cls.pg5.resolve_ndp()
210 cls.pg5.disable_ipv6_ra()
212 cls.pg6.resolve_ndp()
213 cls.pg6.disable_ipv6_ra()
215 super(MethodHolder, cls).tearDownClass()
219 def tearDownClass(cls):
220 super(MethodHolder, cls).tearDownClass()
223 self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
225 """Create a packet stream to tickle the plugin
227 :param VppInterface src_if: Source interface for packet stream
228 :param VppInterface src_if: Dst interface for packet stream
236 packets = random.randint(1, self.max_number_of_packets)
238 for p in range(0, packets):
240 pkt_size = random.choice(self.pg_if_packet_sizes)
241 info = self.create_packet_info(src_if, dst_if)
242 payload = self.info_to_payload(info)
243 p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
245 p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
247 p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
248 p /= UDP(sport=1234, dport=4321)
251 self.extend_packet(p, pkt_size)
254 def verify_cflow_data(self, decoder, capture, cflow):
260 if cflow.haslayer(Data):
261 data = decoder.decode_data_set(cflow.getlayer(Set))
263 self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
264 self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
266 def send_packets(self, src_if=None, dst_if=None):
271 self.pg_enable_capture([dst_if])
272 src_if.add_stream(self.pkts)
274 return dst_if.get_capture(len(self.pkts))
276 def verify_cflow_data_detail(
281 data_set={1: "octets", 2: "packets"},
286 print(capture[0].show())
287 if cflow.haslayer(Data):
288 data = decoder.decode_data_set(cflow.getlayer(Set))
292 ip_layer = capture[0][IP] if capture[0].haslayer(IP) else None
294 ip_layer = capture[0][IPv6] if capture[0].haslayer(IPv6) else None
295 if data_set is not None:
297 # skip flow if ingress/egress interface is 0
298 if int(binascii.hexlify(record[10]), 16) == 0:
300 if int(binascii.hexlify(record[14]), 16) == 0:
303 for field in data_set:
304 value = data_set[field]
305 if value == "octets":
308 value += 40 # ??? is this correct
309 elif value == "packets":
311 elif value == "src_ip":
313 ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
315 ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
316 value = int(binascii.hexlify(ip), 16)
317 elif value == "dst_ip":
319 ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
321 ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
322 value = int(binascii.hexlify(ip), 16)
323 elif value == "sport":
324 value = int(capture[0][UDP].sport)
325 elif value == "dport":
326 value = int(capture[0][UDP].dport)
328 int(binascii.hexlify(record[field]), 16), value
330 if field_count is not None:
332 self.assertEqual(len(record), field_count)
334 def verify_cflow_data_notimer(self, decoder, capture, cflows):
337 if cflow.haslayer(Data):
338 data = decoder.decode_data_set(cflow.getlayer(Set))
340 raise Exception("No CFLOW data")
345 self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
346 self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
347 self.assertEqual(len(capture), idx)
349 def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
350 """wait for CFLOW packet and verify its correctness
352 :param timeout: how long to wait
355 self.logger.info("IPFIX: Waiting for CFLOW packet")
356 # self.logger.debug(self.vapi.ppcli("show flow table"))
357 p = collector_intf.wait_for_packet(timeout=timeout)
358 self.assertEqual(p[Set].setID, set_id)
359 # self.logger.debug(self.vapi.ppcli("show flow table"))
360 self.logger.debug(ppp("IPFIX: Got packet:", p))
365 @tag_fixme_vpp_workers
366 @tag_fixme_ubuntu2204
368 class Flowprobe(MethodHolder):
369 """Template verification, timer tests"""
373 super(Flowprobe, cls).setUpClass()
376 def tearDownClass(cls):
377 super(Flowprobe, cls).tearDownClass()
380 """timer less than template timeout"""
381 self.logger.info("FFP_TEST_START_0001")
382 self.pg_enable_capture(self.pg_interfaces)
385 ipfix = VppCFLOW(test=self, active=2)
386 ipfix.add_vpp_config()
388 ipfix_decoder = IPFIXDecoder()
389 # template packet should arrive immediately
390 templates = ipfix.verify_templates(ipfix_decoder)
392 self.create_stream(packets=1)
394 capture = self.pg2.get_capture(1)
396 # make sure the one packet we expect actually showed up
397 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
398 self.verify_cflow_data(ipfix_decoder, capture, cflow)
400 ipfix.remove_vpp_config()
401 self.logger.info("FFP_TEST_FINISH_0001")
404 """timer greater than template timeout"""
405 self.logger.info("FFP_TEST_START_0002")
406 self.pg_enable_capture(self.pg_interfaces)
409 ipfix = VppCFLOW(test=self, timeout=3, active=4)
410 ipfix.add_vpp_config()
412 ipfix_decoder = IPFIXDecoder()
413 # template packet should arrive immediately
414 ipfix.verify_templates()
416 self.create_stream(packets=2)
418 capture = self.pg2.get_capture(2)
420 # next set of template packet should arrive after 20 seconds
421 # template packet should arrive within 20 s
422 templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
424 # make sure the one packet we expect actually showed up
425 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
426 self.verify_cflow_data(ipfix_decoder, capture, cflow)
428 ipfix.remove_vpp_config()
429 self.logger.info("FFP_TEST_FINISH_0002")
431 def test_cflow_packet(self):
432 """verify cflow packet fields"""
433 self.logger.info("FFP_TEST_START_0000")
434 self.pg_enable_capture(self.pg_interfaces)
438 test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
440 ipfix.add_vpp_config()
442 route_9001 = VppIpRoute(
446 [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
448 route_9001.add_vpp_config()
450 ipfix_decoder = IPFIXDecoder()
451 templates = ipfix.verify_templates(ipfix_decoder, count=1)
455 Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
456 / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
457 / TCP(sport=1234, dport=4321, flags=80)
462 nowUTC = int(time.time())
463 nowUNIX = nowUTC + 2208988800
464 self.send_packets(src_if=self.pg7, dst_if=self.pg8)
466 cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
467 self.collector.get_capture(2)
469 if cflow[0].haslayer(IPFIX):
470 self.assertEqual(cflow[IPFIX].version, 10)
471 self.assertEqual(cflow[IPFIX].observationDomainID, 1)
472 self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
473 self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
474 if cflow.haslayer(Data):
475 record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
477 self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
479 self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
481 self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
483 self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
485 self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
487 self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
488 flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
489 # flow start timestamp
490 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
491 flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
493 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
495 self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
497 self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
499 self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
501 self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
503 self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
505 self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
507 self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
509 ipfix.remove_vpp_config()
510 self.logger.info("FFP_TEST_FINISH_0000")
512 def test_interface_dump(self):
513 """Dump interfaces with IPFIX flow record generation enabled"""
514 self.logger.info("FFP_TEST_START_0003")
516 # Enable feature for 3 interfaces
517 ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx")
518 ipfix1.add_vpp_config()
520 ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx")
521 ipfix2.enable_flowprobe_feature()
523 ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both")
524 ipfix3.enable_flowprobe_feature()
526 # When request "all", dump should contain all enabled interfaces
527 dump = self.vapi.flowprobe_interface_dump()
528 self.assertEqual(len(dump), 3)
530 # Verify 1st interface
531 self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index)
533 dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2
537 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
540 # Verify 2nd interface
541 self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index)
543 dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
547 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
550 # Verify 3rd interface
551 self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index)
553 dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6
557 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
560 # When request 2nd interface, dump should contain only the specified interface
561 dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index)
562 self.assertEqual(len(dump), 1)
564 # Verify 2nd interface
565 self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index)
567 dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
571 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
574 # When request 99th interface, dump should be empty
575 dump = self.vapi.flowprobe_interface_dump(sw_if_index=99)
576 self.assertEqual(len(dump), 0)
578 ipfix1.remove_vpp_config()
579 ipfix2.remove_vpp_config()
580 ipfix3.remove_vpp_config()
581 self.logger.info("FFP_TEST_FINISH_0003")
583 def test_get_params(self):
584 """Get IPFIX flow record generation parameters"""
585 self.logger.info("FFP_TEST_START_0004")
587 # Enable feature for an interface with custom parameters
588 ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4")
589 ipfix.add_vpp_config()
591 # Get and verify parameters
592 params = self.vapi.flowprobe_get_params()
593 self.assertEqual(params.active_timer, 20)
594 self.assertEqual(params.passive_timer, 40)
595 record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
596 record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
597 record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
598 self.assertEqual(params.record_flags, record_flags)
600 ipfix.remove_vpp_config()
601 self.logger.info("FFP_TEST_FINISH_0004")
604 class DatapathTestsHolder(object):
605 """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
609 super(DatapathTestsHolder, cls).setUpClass()
612 def tearDownClass(cls):
613 super(DatapathTestsHolder, cls).tearDownClass()
615 def test_templatesL2(self):
616 """verify template on L2 datapath"""
617 self.logger.info("FFP_TEST_START_0000")
618 self.pg_enable_capture(self.pg_interfaces)
621 test=self, intf=self.intf1, layer="l2", direction=self.direction
623 ipfix.add_vpp_config()
625 # template packet should arrive immediately
626 self.vapi.ipfix_flush()
627 ipfix.verify_templates(timeout=3, count=1)
628 self.collector.get_capture(1)
630 ipfix.remove_vpp_config()
631 self.logger.info("FFP_TEST_FINISH_0000")
633 def test_L2onL2(self):
634 """L2 data on L2 datapath"""
635 self.logger.info("FFP_TEST_START_0001")
636 self.pg_enable_capture(self.pg_interfaces)
640 test=self, intf=self.intf1, layer="l2", direction=self.direction
642 ipfix.add_vpp_config()
644 ipfix_decoder = IPFIXDecoder()
645 # template packet should arrive immediately
646 templates = ipfix.verify_templates(ipfix_decoder, count=1)
648 self.create_stream(packets=1)
649 capture = self.send_packets()
651 # make sure the one packet we expect actually showed up
652 self.vapi.ipfix_flush()
653 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
654 self.verify_cflow_data_detail(
658 {2: "packets", 256: 8, 61: (self.direction == "tx")},
660 self.collector.get_capture(2)
662 ipfix.remove_vpp_config()
663 self.logger.info("FFP_TEST_FINISH_0001")
665 def test_L3onL2(self):
666 """L3 data on L2 datapath"""
667 self.logger.info("FFP_TEST_START_0002")
668 self.pg_enable_capture(self.pg_interfaces)
672 test=self, intf=self.intf1, layer="l3", direction=self.direction
674 ipfix.add_vpp_config()
676 ipfix_decoder = IPFIXDecoder()
677 # template packet should arrive immediately
678 templates = ipfix.verify_templates(ipfix_decoder, count=2)
680 self.create_stream(packets=1)
681 capture = self.send_packets()
683 # make sure the one packet we expect actually showed up
684 self.vapi.ipfix_flush()
685 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
686 self.verify_cflow_data_detail(
695 61: (self.direction == "tx"),
699 self.collector.get_capture(3)
701 ipfix.remove_vpp_config()
702 self.logger.info("FFP_TEST_FINISH_0002")
704 def test_L234onL2(self):
705 """L2/3/4 data on L2 datapath"""
706 self.pg_enable_capture(self.pg_interfaces)
710 test=self, intf=self.intf1, layer="l2 l3 l4", direction=self.direction
712 ipfix.add_vpp_config()
714 ipfix_decoder = IPFIXDecoder()
715 # template packet should arrive immediately
716 tmpl_l2_field_count = TMPL_COMMON_FIELD_COUNT + TMPL_L2_FIELD_COUNT
717 tmpl_ip_field_count = (
718 TMPL_COMMON_FIELD_COUNT
719 + TMPL_L2_FIELD_COUNT
720 + TMPL_L3_FIELD_COUNT
721 + TMPL_L4_FIELD_COUNT
723 templates = ipfix.verify_templates(
726 field_count_in=(tmpl_l2_field_count, tmpl_ip_field_count),
729 # verify IPv4 and IPv6 flows
730 for ip_ver in ("v4", "v6"):
731 self.create_stream(packets=1, ip_ver=ip_ver)
732 capture = self.send_packets()
734 # make sure the one packet we expect actually showed up
735 self.vapi.ipfix_flush()
736 cflow = self.wait_for_cflow_packet(
737 self.collector, templates[1 if ip_ver == "v4" else 2]
739 src_ip_id = 8 if ip_ver == "v4" else 27
740 dst_ip_id = 12 if ip_ver == "v4" else 28
741 self.verify_cflow_data_detail(
747 256: 8 if ip_ver == "v4" else 56710,
753 61: (self.direction == "tx"),
756 field_count=tmpl_ip_field_count,
762 Ether(dst=self.pg2.local_mac, src=self.pg1.remote_mac)
767 capture = self.send_packets()
769 # make sure the one packet we expect actually showed up
770 self.vapi.ipfix_flush()
771 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
772 self.verify_cflow_data_detail(
776 {2: "packets", 256: 2440, 61: (self.direction == "tx")},
777 field_count=tmpl_l2_field_count,
780 self.collector.get_capture(6)
782 ipfix.remove_vpp_config()
784 def test_L4onL2(self):
785 """L4 data on L2 datapath"""
786 self.logger.info("FFP_TEST_START_0003")
787 self.pg_enable_capture(self.pg_interfaces)
791 test=self, intf=self.intf1, layer="l4", direction=self.direction
793 ipfix.add_vpp_config()
795 ipfix_decoder = IPFIXDecoder()
796 # template packet should arrive immediately
797 templates = ipfix.verify_templates(ipfix_decoder, count=2)
799 self.create_stream(packets=1)
800 capture = self.send_packets()
802 # make sure the one packet we expect actually showed up
803 self.vapi.ipfix_flush()
804 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
805 self.verify_cflow_data_detail(
809 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
812 self.collector.get_capture(3)
814 ipfix.remove_vpp_config()
815 self.logger.info("FFP_TEST_FINISH_0003")
817 def test_templatesIp4(self):
818 """verify templates on IP4 datapath"""
819 self.logger.info("FFP_TEST_START_0000")
821 self.pg_enable_capture(self.pg_interfaces)
824 test=self, intf=self.intf1, datapath="ip4", direction=self.direction
826 ipfix.add_vpp_config()
828 # template packet should arrive immediately
829 self.vapi.ipfix_flush()
830 ipfix.verify_templates(timeout=3, count=1)
831 self.collector.get_capture(1)
833 ipfix.remove_vpp_config()
835 self.logger.info("FFP_TEST_FINISH_0000")
837 def test_L2onIP4(self):
838 """L2 data on IP4 datapath"""
839 self.logger.info("FFP_TEST_START_0001")
840 self.pg_enable_capture(self.pg_interfaces)
848 direction=self.direction,
850 ipfix.add_vpp_config()
852 ipfix_decoder = IPFIXDecoder()
853 # template packet should arrive immediately
854 templates = ipfix.verify_templates(ipfix_decoder, count=1)
856 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
857 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
859 # make sure the one packet we expect actually showed up
860 self.vapi.ipfix_flush()
861 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
862 self.verify_cflow_data_detail(
866 {2: "packets", 256: 8, 61: (self.direction == "tx")},
869 # expected two templates and one cflow packet
870 self.collector.get_capture(2)
872 ipfix.remove_vpp_config()
873 self.logger.info("FFP_TEST_FINISH_0001")
875 def test_L3onIP4(self):
876 """L3 data on IP4 datapath"""
877 self.logger.info("FFP_TEST_START_0002")
878 self.pg_enable_capture(self.pg_interfaces)
886 direction=self.direction,
888 ipfix.add_vpp_config()
890 ipfix_decoder = IPFIXDecoder()
891 # template packet should arrive immediately
892 templates = ipfix.verify_templates(ipfix_decoder, count=1)
894 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
895 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
897 # make sure the one packet we expect actually showed up
898 self.vapi.ipfix_flush()
899 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
900 self.verify_cflow_data_detail(
909 61: (self.direction == "tx"),
913 # expected two templates and one cflow packet
914 self.collector.get_capture(2)
916 ipfix.remove_vpp_config()
917 self.logger.info("FFP_TEST_FINISH_0002")
919 def test_L4onIP4(self):
920 """L4 data on IP4 datapath"""
921 self.logger.info("FFP_TEST_START_0003")
922 self.pg_enable_capture(self.pg_interfaces)
930 direction=self.direction,
932 ipfix.add_vpp_config()
934 ipfix_decoder = IPFIXDecoder()
935 # template packet should arrive immediately
936 templates = ipfix.verify_templates(ipfix_decoder, count=1)
938 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
939 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
941 # make sure the one packet we expect actually showed up
942 self.vapi.ipfix_flush()
943 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
944 self.verify_cflow_data_detail(
948 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
951 # expected two templates and one cflow packet
952 self.collector.get_capture(2)
954 ipfix.remove_vpp_config()
955 self.logger.info("FFP_TEST_FINISH_0003")
957 def test_templatesIP6(self):
958 """verify templates on IP6 datapath"""
959 self.logger.info("FFP_TEST_START_0000")
960 self.pg_enable_capture(self.pg_interfaces)
963 test=self, intf=self.intf1, datapath="ip6", direction=self.direction
965 ipfix.add_vpp_config()
967 # template packet should arrive immediately
968 ipfix.verify_templates(count=1)
969 self.collector.get_capture(1)
971 ipfix.remove_vpp_config()
973 self.logger.info("FFP_TEST_FINISH_0000")
975 def test_L2onIP6(self):
976 """L2 data on IP6 datapath"""
977 self.logger.info("FFP_TEST_START_0001")
978 self.pg_enable_capture(self.pg_interfaces)
986 direction=self.direction,
988 ipfix.add_vpp_config()
990 ipfix_decoder = IPFIXDecoder()
991 # template packet should arrive immediately
992 templates = ipfix.verify_templates(ipfix_decoder, count=1)
994 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
995 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
997 # make sure the one packet we expect actually showed up
998 self.vapi.ipfix_flush()
999 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1000 self.verify_cflow_data_detail(
1004 {2: "packets", 256: 56710, 61: (self.direction == "tx")},
1008 # expected two templates and one cflow packet
1009 self.collector.get_capture(2)
1011 ipfix.remove_vpp_config()
1012 self.logger.info("FFP_TEST_FINISH_0001")
1014 def test_L3onIP6(self):
1015 """L3 data on IP6 datapath"""
1016 self.logger.info("FFP_TEST_START_0002")
1017 self.pg_enable_capture(self.pg_interfaces)
1025 direction=self.direction,
1027 ipfix.add_vpp_config()
1029 ipfix_decoder = IPFIXDecoder()
1030 # template packet should arrive immediately
1031 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1033 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1034 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1036 # make sure the one packet we expect actually showed up
1037 self.vapi.ipfix_flush()
1038 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1039 self.verify_cflow_data_detail(
1043 {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
1047 # expected two templates and one cflow packet
1048 self.collector.get_capture(2)
1050 ipfix.remove_vpp_config()
1051 self.logger.info("FFP_TEST_FINISH_0002")
1053 def test_L4onIP6(self):
1054 """L4 data on IP6 datapath"""
1055 self.logger.info("FFP_TEST_START_0003")
1056 self.pg_enable_capture(self.pg_interfaces)
1064 direction=self.direction,
1066 ipfix.add_vpp_config()
1068 ipfix_decoder = IPFIXDecoder()
1069 # template packet should arrive immediately
1070 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1072 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1073 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1075 # make sure the one packet we expect actually showed up
1076 self.vapi.ipfix_flush()
1077 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1078 self.verify_cflow_data_detail(
1082 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
1086 # expected two templates and one cflow packet
1087 self.collector.get_capture(2)
1089 ipfix.remove_vpp_config()
1090 self.logger.info("FFP_TEST_FINISH_0003")
1092 def test_0001(self):
1093 """no timers, one CFLOW packet, 9 Flows inside"""
1094 self.logger.info("FFP_TEST_START_0001")
1095 self.pg_enable_capture(self.pg_interfaces)
1098 ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
1099 ipfix.add_vpp_config()
1101 ipfix_decoder = IPFIXDecoder()
1102 # template packet should arrive immediately
1103 templates = ipfix.verify_templates(ipfix_decoder)
1105 self.create_stream(packets=9)
1106 capture = self.send_packets()
1108 # make sure the one packet we expect actually showed up
1109 self.vapi.ipfix_flush()
1110 cflow = self.wait_for_cflow_packet(self.collector, templates[1])
1111 self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
1112 self.collector.get_capture(4)
1114 ipfix.remove_vpp_config()
1115 self.logger.info("FFP_TEST_FINISH_0001")
1117 def test_0002(self):
1118 """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
1119 self.logger.info("FFP_TEST_START_0002")
1120 self.pg_enable_capture(self.pg_interfaces)
1123 ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
1124 ipfix.add_vpp_config()
1126 ipfix_decoder = IPFIXDecoder()
1127 # template packet should arrive immediately
1128 self.vapi.ipfix_flush()
1129 templates = ipfix.verify_templates(ipfix_decoder)
1131 self.create_stream(packets=6)
1132 capture = self.send_packets()
1134 # make sure the one packet we expect actually showed up
1136 self.vapi.ipfix_flush()
1137 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1138 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1139 self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
1140 self.collector.get_capture(5)
1142 ipfix.remove_vpp_config()
1143 self.logger.info("FFP_TEST_FINISH_0002")
1146 @tag_fixme_vpp_workers
1147 class DatapathTx(MethodHolder, DatapathTestsHolder):
1148 """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
1156 @tag_fixme_vpp_workers
1157 class DatapathRx(MethodHolder, DatapathTestsHolder):
1158 """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
1166 @unittest.skipUnless(config.extended, "part of extended tests")
1167 class DisableIPFIX(MethodHolder):
1171 def setUpClass(cls):
1172 super(DisableIPFIX, cls).setUpClass()
1175 def tearDownClass(cls):
1176 super(DisableIPFIX, cls).tearDownClass()
1178 def test_0001(self):
1179 """disable IPFIX after first packets"""
1180 self.logger.info("FFP_TEST_START_0001")
1181 self.pg_enable_capture(self.pg_interfaces)
1184 ipfix = VppCFLOW(test=self)
1185 ipfix.add_vpp_config()
1187 ipfix_decoder = IPFIXDecoder()
1188 # template packet should arrive immediately
1189 templates = ipfix.verify_templates(ipfix_decoder)
1191 self.create_stream()
1194 # make sure the one packet we expect actually showed up
1195 self.vapi.ipfix_flush()
1196 self.wait_for_cflow_packet(self.collector, templates[1])
1197 self.collector.get_capture(4)
1200 ipfix.disable_exporter()
1201 self.pg_enable_capture([self.collector])
1205 # make sure no one packet arrived in 1 minute
1206 self.vapi.ipfix_flush()
1207 self.sleep(1, "wait before verifying no packets sent")
1208 self.collector.assert_nothing_captured()
1210 ipfix.remove_vpp_config()
1211 self.logger.info("FFP_TEST_FINISH_0001")
1214 @unittest.skipUnless(config.extended, "part of extended tests")
1215 class ReenableIPFIX(MethodHolder):
1216 """Re-enable IPFIX"""
1219 def setUpClass(cls):
1220 super(ReenableIPFIX, cls).setUpClass()
1223 def tearDownClass(cls):
1224 super(ReenableIPFIX, cls).tearDownClass()
1226 def test_0011(self):
1227 """disable IPFIX after first packets and re-enable after few packets"""
1228 self.logger.info("FFP_TEST_START_0001")
1229 self.pg_enable_capture(self.pg_interfaces)
1232 ipfix = VppCFLOW(test=self)
1233 ipfix.add_vpp_config()
1235 ipfix_decoder = IPFIXDecoder()
1236 # template packet should arrive immediately
1237 templates = ipfix.verify_templates(ipfix_decoder)
1239 self.create_stream(packets=5)
1242 # make sure the one packet we expect actually showed up
1243 self.vapi.ipfix_flush()
1244 self.wait_for_cflow_packet(self.collector, templates[1])
1245 self.collector.get_capture(4)
1248 ipfix.disable_exporter()
1249 self.vapi.ipfix_flush()
1250 self.pg_enable_capture([self.collector])
1254 # make sure no one packet arrived in active timer span
1255 self.vapi.ipfix_flush()
1256 self.sleep(1, "wait before verifying no packets sent")
1257 self.collector.assert_nothing_captured()
1258 self.pg2.get_capture(5)
1261 ipfix.enable_exporter()
1263 capture = self.collector.get_capture(4)
1267 self.assertTrue(p.haslayer(IPFIX))
1268 if p.haslayer(Template):
1270 self.assertTrue(nr_templates, 3)
1272 self.assertTrue(p.haslayer(IPFIX))
1273 if p.haslayer(Data):
1275 self.assertTrue(nr_templates, 1)
1277 ipfix.remove_vpp_config()
1278 self.logger.info("FFP_TEST_FINISH_0001")
1281 @unittest.skipUnless(config.extended, "part of extended tests")
1282 class DisableFP(MethodHolder):
1283 """Disable Flowprobe feature"""
1286 def setUpClass(cls):
1287 super(DisableFP, cls).setUpClass()
1290 def tearDownClass(cls):
1291 super(DisableFP, cls).tearDownClass()
1293 def test_0001(self):
1294 """disable flowprobe feature after first packets"""
1295 self.logger.info("FFP_TEST_START_0001")
1296 self.pg_enable_capture(self.pg_interfaces)
1298 ipfix = VppCFLOW(test=self)
1299 ipfix.add_vpp_config()
1301 ipfix_decoder = IPFIXDecoder()
1302 # template packet should arrive immediately
1303 templates = ipfix.verify_templates(ipfix_decoder)
1305 self.create_stream()
1308 # make sure the one packet we expect actually showed up
1309 self.vapi.ipfix_flush()
1310 self.wait_for_cflow_packet(self.collector, templates[1])
1311 self.collector.get_capture(4)
1314 ipfix.disable_flowprobe_feature()
1315 self.pg_enable_capture([self.collector])
1319 # make sure no one packet arrived in active timer span
1320 self.vapi.ipfix_flush()
1321 self.sleep(1, "wait before verifying no packets sent")
1322 self.collector.assert_nothing_captured()
1324 ipfix.remove_vpp_config()
1325 self.logger.info("FFP_TEST_FINISH_0001")
1327 def test_no_leftover_flows_after_disabling(self):
1328 """disable flowprobe feature and expect no leftover flows"""
1329 self.pg_enable_capture(self.pg_interfaces)
1332 # enable ip4 datapath for an interface
1333 # set active and passive timers
1344 ipfix.add_vpp_config()
1346 # template packet should arrive immediately
1347 ipfix.verify_templates(count=1)
1349 # send some ip4 packets
1350 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5)
1351 self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1353 # disable feature for the interface
1354 # currently stored ip4 flows should be removed
1355 ipfix.disable_flowprobe_feature()
1357 # no leftover ip4 flows are expected
1358 self.pg_enable_capture([self.collector])
1359 self.sleep(12, "wait for leftover ip4 flows during three passive intervals")
1360 self.collector.assert_nothing_captured()
1363 ipfix.disable_exporter()
1366 @unittest.skipUnless(config.extended, "part of extended tests")
1367 class ReenableFP(MethodHolder):
1368 """Re-enable Flowprobe feature"""
1371 def setUpClass(cls):
1372 super(ReenableFP, cls).setUpClass()
1375 def tearDownClass(cls):
1376 super(ReenableFP, cls).tearDownClass()
1378 def test_0001(self):
1379 """disable flowprobe feature after first packets and re-enable
1380 after few packets"""
1381 self.logger.info("FFP_TEST_START_0001")
1382 self.pg_enable_capture(self.pg_interfaces)
1385 ipfix = VppCFLOW(test=self)
1386 ipfix.add_vpp_config()
1388 ipfix_decoder = IPFIXDecoder()
1389 # template packet should arrive immediately
1390 self.vapi.ipfix_flush()
1391 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1393 self.create_stream()
1396 # make sure the one packet we expect actually showed up
1397 self.vapi.ipfix_flush()
1398 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1399 self.collector.get_capture(4)
1401 # disable FPP feature
1402 ipfix.disable_flowprobe_feature()
1403 self.pg_enable_capture([self.collector])
1407 # make sure no one packet arrived in active timer span
1408 self.vapi.ipfix_flush()
1409 self.sleep(5, "wait before verifying no packets sent")
1410 self.collector.assert_nothing_captured()
1412 # enable FPP feature
1413 ipfix.enable_flowprobe_feature()
1414 self.vapi.ipfix_flush()
1415 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1419 # make sure the next packets (templates and data) we expect actually
1421 self.vapi.ipfix_flush()
1422 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1423 self.collector.get_capture(4)
1425 ipfix.remove_vpp_config()
1426 self.logger.info("FFP_TEST_FINISH_0001")
1429 if __name__ == "__main__":
1430 unittest.main(testRunner=VppTestRunner)