2 from __future__ import print_function
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, TCP, UDP
12 from scapy.layers.inet6 import IPv6
13 from scapy.contrib.lacp import SlowProtocol, LACP
15 from config import config
16 from framework import VppTestCase
17 from asfframework import (
18 tag_fixme_vpp_workers,
26 from vpp_object import VppObject
28 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
29 from vpp_ip_route import VppIpRoute, VppRoutePath
30 from vpp_papi.macaddress import mac_ntop
31 from socket import inet_ntop
32 from vpp_papi import VppEnum
33 from vpp_sub_interface import VppDot1ADSubint
36 TMPL_COMMON_FIELD_COUNT = 6
37 TMPL_L2_FIELD_COUNT = 3
38 TMPL_L3_FIELD_COUNT = 4
39 TMPL_L4_FIELD_COUNT = 3
41 IPFIX_TCP_FLAGS_ID = 6
42 IPFIX_SRC_TRANS_PORT_ID = 7
43 IPFIX_DST_TRANS_PORT_ID = 11
44 IPFIX_SRC_IP4_ADDR_ID = 8
45 IPFIX_DST_IP4_ADDR_ID = 12
46 IPFIX_FLOW_DIRECTION_ID = 61
58 class VppCFLOW(VppObject):
59 """CFLOW object for IPFIX exporter and Flowprobe feature"""
75 self._intf_obj = getattr(self._test, intf)
77 if passive == 0 or passive < active:
78 self._passive = active + 1
80 self._passive = passive
81 self._datapath = datapath # l2 ip4 ip6
82 self._collect = layer # l2 l3 l4
83 self._direction = direction # rx tx both
84 self._timeout = timeout
86 self._configured = False
88 def add_vpp_config(self):
89 self.enable_exporter()
93 if "l2" in self._collect.lower():
94 l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
95 if "l3" in self._collect.lower():
96 l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
97 if "l4" in self._collect.lower():
98 l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
99 self._test.vapi.flowprobe_set_params(
100 record_flags=(l2_flag | l3_flag | l4_flag),
101 active_timer=self._active,
102 passive_timer=self._passive,
104 self.enable_flowprobe_feature()
105 self._test.vapi.cli("ipfix flush")
106 self._configured = True
108 def remove_vpp_config(self):
109 self.disable_exporter()
110 self.disable_flowprobe_feature()
111 self._test.vapi.cli("ipfix flush")
112 self._configured = False
114 def enable_exporter(self):
115 self._test.vapi.set_ipfix_exporter(
116 collector_address=self._test.pg0.remote_ip4,
117 src_address=self._test.pg0.local_ip4,
119 template_interval=self._timeout,
122 def _enable_disable_flowprobe_feature(self, is_add):
124 "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
125 "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
126 "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
129 "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
130 "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
131 "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
133 self._test.vapi.flowprobe_interface_add_del(
135 which=which_map[self._datapath],
136 direction=direction_map[self._direction],
137 sw_if_index=self._intf_obj.sw_if_index,
140 def enable_flowprobe_feature(self):
141 self._enable_disable_flowprobe_feature(is_add=True)
143 def disable_exporter(self):
144 self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
146 def disable_flowprobe_feature(self):
147 self._enable_disable_flowprobe_feature(is_add=False)
150 return "ipfix-collector-%s-%s" % (self._src, self.dst)
152 def query_vpp_config(self):
153 return self._configured
155 def verify_templates(self, decoder=None, timeout=1, count=3, field_count_in=None):
157 self._test.assertIn(count, (1, 2, 3))
158 for _ in range(count):
159 p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
160 self._test.assertTrue(p.haslayer(IPFIX))
161 self._test.assertTrue(p.haslayer(Template))
162 if decoder is not None:
163 templates.append(p[Template].templateID)
164 decoder.add_template(p.getlayer(Template))
165 if field_count_in is not None:
166 self._test.assertIn(p[Template].fieldCount, field_count_in)
170 class MethodHolder(VppTestCase):
171 """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
175 max_number_of_packets = 10
181 Perform standard class setup (defined by class method setUpClass in
182 class VppTestCase) before running the test case, set test case related
183 variables and configure VPP.
185 super(MethodHolder, cls).setUpClass()
186 if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
191 # Create pg interfaces
192 cls.create_pg_interfaces(range(9))
195 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
197 # Create BD with MAC learning and unknown unicast flooding disabled
198 # and put interfaces to this BD
199 cls.vapi.bridge_domain_add_del_v2(
200 bd_id=1, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
202 cls.vapi.sw_interface_set_l2_bridge(
203 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
205 cls.vapi.sw_interface_set_l2_bridge(
206 rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
209 # Set up all interfaces
210 for i in cls.pg_interfaces:
214 cls.pg0.configure_ipv4_neighbors()
215 cls.collector = cls.pg0
218 cls.pg1.resolve_arp()
220 cls.pg2.resolve_arp()
222 cls.pg3.resolve_arp()
224 cls.pg4.resolve_arp()
227 cls.pg8.configure_ipv4_neighbors()
230 cls.pg5.resolve_ndp()
231 cls.pg5.disable_ipv6_ra()
233 cls.pg6.resolve_ndp()
234 cls.pg6.disable_ipv6_ra()
236 super(MethodHolder, cls).tearDownClass()
240 def tearDownClass(cls):
241 super(MethodHolder, cls).tearDownClass()
244 self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
246 """Create a packet stream to tickle the plugin
248 :param VppInterface src_if: Source interface for packet stream
249 :param VppInterface src_if: Dst interface for packet stream
257 packets = random.randint(1, self.max_number_of_packets)
259 for p in range(0, packets):
261 pkt_size = random.choice(self.pg_if_packet_sizes)
262 info = self.create_packet_info(src_if, dst_if)
263 payload = self.info_to_payload(info)
264 p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
266 p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
268 p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
269 p /= UDP(sport=1234, dport=4321)
272 self.extend_packet(p, pkt_size)
275 def verify_cflow_data(self, decoder, capture, cflow):
281 if cflow.haslayer(Data):
282 data = decoder.decode_data_set(cflow.getlayer(Set))
284 self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
285 self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
287 def send_packets(self, src_if=None, dst_if=None):
292 self.pg_enable_capture([dst_if])
293 src_if.add_stream(self.pkts)
295 return dst_if.get_capture(len(self.pkts))
297 def verify_cflow_data_detail(
302 data_set={1: "octets", 2: "packets"},
307 print(capture[0].show())
308 if cflow.haslayer(Data):
309 data = decoder.decode_data_set(cflow.getlayer(Set))
313 ip_layer = capture[0][IP] if capture[0].haslayer(IP) else None
315 ip_layer = capture[0][IPv6] if capture[0].haslayer(IPv6) else None
316 if data_set is not None:
318 # skip flow if ingress/egress interface is 0
319 if int(binascii.hexlify(record[10]), 16) == 0:
321 if int(binascii.hexlify(record[14]), 16) == 0:
324 for field in data_set:
325 value = data_set[field]
326 if value == "octets":
329 value += 40 # ??? is this correct
330 elif value == "packets":
332 elif value == "src_ip":
334 ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
336 ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
337 value = int(binascii.hexlify(ip), 16)
338 elif value == "dst_ip":
340 ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
342 ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
343 value = int(binascii.hexlify(ip), 16)
344 elif value == "sport":
345 value = int(capture[0][UDP].sport)
346 elif value == "dport":
347 value = int(capture[0][UDP].dport)
349 int(binascii.hexlify(record[field]), 16), value
351 if field_count is not None:
353 self.assertEqual(len(record), field_count)
355 def verify_cflow_data_notimer(self, decoder, capture, cflows):
358 if cflow.haslayer(Data):
359 data = decoder.decode_data_set(cflow.getlayer(Set))
361 raise Exception("No CFLOW data")
366 self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
367 self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
368 self.assertEqual(len(capture), idx)
370 def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
371 """wait for CFLOW packet and verify its correctness
373 :param timeout: how long to wait
376 self.logger.info("IPFIX: Waiting for CFLOW packet")
377 # self.logger.debug(self.vapi.ppcli("show flow table"))
378 p = collector_intf.wait_for_packet(timeout=timeout)
379 self.assertEqual(p[Set].setID, set_id)
380 # self.logger.debug(self.vapi.ppcli("show flow table"))
381 self.logger.debug(ppp("IPFIX: Got packet:", p))
386 @tag_fixme_vpp_workers
387 @tag_fixme_ubuntu2204
389 class Flowprobe(MethodHolder):
390 """Template verification, timer tests"""
394 super(Flowprobe, cls).setUpClass()
397 def tearDownClass(cls):
398 super(Flowprobe, cls).tearDownClass()
401 """timer less than template timeout"""
402 self.logger.info("FFP_TEST_START_0001")
403 self.pg_enable_capture(self.pg_interfaces)
406 ipfix = VppCFLOW(test=self, active=2)
407 ipfix.add_vpp_config()
409 ipfix_decoder = IPFIXDecoder()
410 # template packet should arrive immediately
411 templates = ipfix.verify_templates(ipfix_decoder)
413 self.create_stream(packets=1)
415 capture = self.pg2.get_capture(1)
417 # make sure the one packet we expect actually showed up
418 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
419 self.verify_cflow_data(ipfix_decoder, capture, cflow)
421 ipfix.remove_vpp_config()
422 self.logger.info("FFP_TEST_FINISH_0001")
425 """timer greater than template timeout"""
426 self.logger.info("FFP_TEST_START_0002")
427 self.pg_enable_capture(self.pg_interfaces)
430 ipfix = VppCFLOW(test=self, timeout=3, active=4)
431 ipfix.add_vpp_config()
433 ipfix_decoder = IPFIXDecoder()
434 # template packet should arrive immediately
435 ipfix.verify_templates()
437 self.create_stream(packets=2)
439 capture = self.pg2.get_capture(2)
441 # next set of template packet should arrive after 20 seconds
442 # template packet should arrive within 20 s
443 templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
445 # make sure the one packet we expect actually showed up
446 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
447 self.verify_cflow_data(ipfix_decoder, capture, cflow)
449 ipfix.remove_vpp_config()
450 self.logger.info("FFP_TEST_FINISH_0002")
452 def test_cflow_packet(self):
453 """verify cflow packet fields"""
454 self.logger.info("FFP_TEST_START_0000")
455 self.pg_enable_capture(self.pg_interfaces)
459 test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
461 ipfix.add_vpp_config()
463 route_9001 = VppIpRoute(
467 [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
469 route_9001.add_vpp_config()
471 ipfix_decoder = IPFIXDecoder()
472 templates = ipfix.verify_templates(ipfix_decoder, count=1)
476 Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
477 / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
478 / TCP(sport=1234, dport=4321, flags=80)
483 nowUTC = int(time.time())
484 nowUNIX = nowUTC + 2208988800
485 self.send_packets(src_if=self.pg7, dst_if=self.pg8)
487 cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
488 self.collector.get_capture(2)
490 if cflow[0].haslayer(IPFIX):
491 self.assertEqual(cflow[IPFIX].version, 10)
492 self.assertEqual(cflow[IPFIX].observationDomainID, 1)
493 self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
494 self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
495 if cflow.haslayer(Data):
496 record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
498 self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
500 self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
502 self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
504 self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
506 self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
508 self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
509 flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
510 # flow start timestamp
511 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
512 flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
514 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
516 self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
518 self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
520 self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
522 self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
524 self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
526 self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
528 self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
530 ipfix.remove_vpp_config()
531 self.logger.info("FFP_TEST_FINISH_0000")
533 def test_flow_entry_reuse(self):
534 """Verify flow entry reuse doesn't accumulate meta info"""
535 self.pg_enable_capture(self.pg_interfaces)
538 # enable ip4 datapath for an interface
539 # set active and passive timers
550 ipfix.add_vpp_config()
552 # template packet should arrive immediately
553 ipfix_decoder = IPFIXDecoder()
554 templates = ipfix.verify_templates(ipfix_decoder, count=1)
559 Ether(src=self.pg3.remote_mac, dst=self.pg4.local_mac)
560 / IP(src=self.pg3.remote_ip4, dst=self.pg4.remote_ip4)
561 / TCP(sport=1234, dport=4321)
566 # send the tcp packet two times, each time with new set of flags
568 TCP_F_SYN | TCP_F_ACK,
569 TCP_F_RST | TCP_F_PSH,
572 self.pkts[0][TCP].flags = f
573 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
575 # verify meta info - packet/octet delta and tcp flags
576 cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=6)
577 self.verify_cflow_data(ipfix_decoder, capture, cflow)
578 self.verify_cflow_data_detail(
583 IPFIX_TCP_FLAGS_ID: f,
584 IPFIX_SRC_TRANS_PORT_ID: 1234,
585 IPFIX_DST_TRANS_PORT_ID: 4321,
589 self.collector.get_capture(3)
592 ipfix.remove_vpp_config()
594 def test_interface_dump(self):
595 """Dump interfaces with IPFIX flow record generation enabled"""
596 self.logger.info("FFP_TEST_START_0003")
598 # Enable feature for 3 interfaces
599 ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx")
600 ipfix1.add_vpp_config()
602 ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx")
603 ipfix2.enable_flowprobe_feature()
605 ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both")
606 ipfix3.enable_flowprobe_feature()
608 # When request "all", dump should contain all enabled interfaces
609 dump = self.vapi.flowprobe_interface_dump()
610 self.assertEqual(len(dump), 3)
612 # Verify 1st interface
613 self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index)
615 dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2
619 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
622 # Verify 2nd interface
623 self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index)
625 dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
629 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
632 # Verify 3rd interface
633 self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index)
635 dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6
639 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
642 # When request 2nd interface, dump should contain only the specified interface
643 dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index)
644 self.assertEqual(len(dump), 1)
646 # Verify 2nd interface
647 self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index)
649 dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
653 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
656 # When request 99th interface, dump should be empty
657 dump = self.vapi.flowprobe_interface_dump(sw_if_index=99)
658 self.assertEqual(len(dump), 0)
660 ipfix1.remove_vpp_config()
661 ipfix2.remove_vpp_config()
662 ipfix3.remove_vpp_config()
663 self.logger.info("FFP_TEST_FINISH_0003")
665 def test_get_params(self):
666 """Get IPFIX flow record generation parameters"""
667 self.logger.info("FFP_TEST_START_0004")
669 # Enable feature for an interface with custom parameters
670 ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4")
671 ipfix.add_vpp_config()
673 # Get and verify parameters
674 params = self.vapi.flowprobe_get_params()
675 self.assertEqual(params.active_timer, 20)
676 self.assertEqual(params.passive_timer, 40)
677 record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
678 record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
679 record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
680 self.assertEqual(params.record_flags, record_flags)
682 ipfix.remove_vpp_config()
683 self.logger.info("FFP_TEST_FINISH_0004")
686 class DatapathTestsHolder(object):
687 """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
691 super(DatapathTestsHolder, cls).setUpClass()
694 def tearDownClass(cls):
695 super(DatapathTestsHolder, cls).tearDownClass()
697 def test_templatesL2(self):
698 """verify template on L2 datapath"""
699 self.logger.info("FFP_TEST_START_0000")
700 self.pg_enable_capture(self.pg_interfaces)
703 test=self, intf=self.intf1, layer="l2", direction=self.direction
705 ipfix.add_vpp_config()
707 # template packet should arrive immediately
708 self.vapi.ipfix_flush()
709 ipfix.verify_templates(timeout=3, count=1)
710 self.collector.get_capture(1)
712 ipfix.remove_vpp_config()
713 self.logger.info("FFP_TEST_FINISH_0000")
715 def test_L2onL2(self):
716 """L2 data on L2 datapath"""
717 self.logger.info("FFP_TEST_START_0001")
718 self.pg_enable_capture(self.pg_interfaces)
722 test=self, intf=self.intf1, layer="l2", direction=self.direction
724 ipfix.add_vpp_config()
726 ipfix_decoder = IPFIXDecoder()
727 # template packet should arrive immediately
728 templates = ipfix.verify_templates(ipfix_decoder, count=1)
730 self.create_stream(packets=1)
731 capture = self.send_packets()
733 # make sure the one packet we expect actually showed up
734 self.vapi.ipfix_flush()
735 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
736 self.verify_cflow_data_detail(
740 {2: "packets", 256: 8, 61: (self.direction == "tx")},
742 self.collector.get_capture(2)
744 ipfix.remove_vpp_config()
745 self.logger.info("FFP_TEST_FINISH_0001")
747 def test_L3onL2(self):
748 """L3 data on L2 datapath"""
749 self.logger.info("FFP_TEST_START_0002")
750 self.pg_enable_capture(self.pg_interfaces)
754 test=self, intf=self.intf1, layer="l3", direction=self.direction
756 ipfix.add_vpp_config()
758 ipfix_decoder = IPFIXDecoder()
759 # template packet should arrive immediately
760 templates = ipfix.verify_templates(ipfix_decoder, count=2)
762 self.create_stream(packets=1)
763 capture = self.send_packets()
765 # make sure the one packet we expect actually showed up
766 self.vapi.ipfix_flush()
767 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
768 self.verify_cflow_data_detail(
777 61: (self.direction == "tx"),
781 self.collector.get_capture(3)
783 ipfix.remove_vpp_config()
784 self.logger.info("FFP_TEST_FINISH_0002")
786 def test_L234onL2(self):
787 """L2/3/4 data on L2 datapath"""
788 self.pg_enable_capture(self.pg_interfaces)
792 test=self, intf=self.intf1, layer="l2 l3 l4", direction=self.direction
794 ipfix.add_vpp_config()
796 ipfix_decoder = IPFIXDecoder()
797 # template packet should arrive immediately
798 tmpl_l2_field_count = TMPL_COMMON_FIELD_COUNT + TMPL_L2_FIELD_COUNT
799 tmpl_ip_field_count = (
800 TMPL_COMMON_FIELD_COUNT
801 + TMPL_L2_FIELD_COUNT
802 + TMPL_L3_FIELD_COUNT
803 + TMPL_L4_FIELD_COUNT
805 templates = ipfix.verify_templates(
808 field_count_in=(tmpl_l2_field_count, tmpl_ip_field_count),
811 # verify IPv4 and IPv6 flows
812 for ip_ver in ("v4", "v6"):
813 self.create_stream(packets=1, ip_ver=ip_ver)
814 capture = self.send_packets()
816 # make sure the one packet we expect actually showed up
817 self.vapi.ipfix_flush()
818 cflow = self.wait_for_cflow_packet(
819 self.collector, templates[1 if ip_ver == "v4" else 2]
821 src_ip_id = 8 if ip_ver == "v4" else 27
822 dst_ip_id = 12 if ip_ver == "v4" else 28
823 self.verify_cflow_data_detail(
829 256: 8 if ip_ver == "v4" else 56710,
835 61: (self.direction == "tx"),
838 field_count=tmpl_ip_field_count,
844 Ether(dst=self.pg2.local_mac, src=self.pg1.remote_mac)
849 capture = self.send_packets()
851 # make sure the one packet we expect actually showed up
852 self.vapi.ipfix_flush()
853 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
854 self.verify_cflow_data_detail(
858 {2: "packets", 256: 2440, 61: (self.direction == "tx")},
859 field_count=tmpl_l2_field_count,
862 self.collector.get_capture(6)
864 ipfix.remove_vpp_config()
866 def test_L4onL2(self):
867 """L4 data on L2 datapath"""
868 self.logger.info("FFP_TEST_START_0003")
869 self.pg_enable_capture(self.pg_interfaces)
873 test=self, intf=self.intf1, layer="l4", direction=self.direction
875 ipfix.add_vpp_config()
877 ipfix_decoder = IPFIXDecoder()
878 # template packet should arrive immediately
879 templates = ipfix.verify_templates(ipfix_decoder, count=2)
881 self.create_stream(packets=1)
882 capture = self.send_packets()
884 # make sure the one packet we expect actually showed up
885 self.vapi.ipfix_flush()
886 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
887 self.verify_cflow_data_detail(
891 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
894 self.collector.get_capture(3)
896 ipfix.remove_vpp_config()
897 self.logger.info("FFP_TEST_FINISH_0003")
899 def test_templatesIp4(self):
900 """verify templates on IP4 datapath"""
901 self.logger.info("FFP_TEST_START_0000")
903 self.pg_enable_capture(self.pg_interfaces)
906 test=self, intf=self.intf1, datapath="ip4", direction=self.direction
908 ipfix.add_vpp_config()
910 # template packet should arrive immediately
911 self.vapi.ipfix_flush()
912 ipfix.verify_templates(timeout=3, count=1)
913 self.collector.get_capture(1)
915 ipfix.remove_vpp_config()
917 self.logger.info("FFP_TEST_FINISH_0000")
919 def test_L2onIP4(self):
920 """L2 data on IP4 datapath"""
921 self.logger.info("FFP_TEST_START_0001")
922 self.pg_enable_capture(self.pg_interfaces)
930 direction=self.direction,
932 ipfix.add_vpp_config()
934 ipfix_decoder = IPFIXDecoder()
935 # template packet should arrive immediately
936 templates = ipfix.verify_templates(ipfix_decoder, count=1)
938 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
939 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
941 # make sure the one packet we expect actually showed up
942 self.vapi.ipfix_flush()
943 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
944 self.verify_cflow_data_detail(
948 {2: "packets", 256: 8, 61: (self.direction == "tx")},
951 # expected two templates and one cflow packet
952 self.collector.get_capture(2)
954 ipfix.remove_vpp_config()
955 self.logger.info("FFP_TEST_FINISH_0001")
957 def test_L3onIP4(self):
958 """L3 data on IP4 datapath"""
959 self.logger.info("FFP_TEST_START_0002")
960 self.pg_enable_capture(self.pg_interfaces)
968 direction=self.direction,
970 ipfix.add_vpp_config()
972 ipfix_decoder = IPFIXDecoder()
973 # template packet should arrive immediately
974 templates = ipfix.verify_templates(ipfix_decoder, count=1)
976 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
977 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
979 # make sure the one packet we expect actually showed up
980 self.vapi.ipfix_flush()
981 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
982 self.verify_cflow_data_detail(
991 61: (self.direction == "tx"),
995 # expected two templates and one cflow packet
996 self.collector.get_capture(2)
998 ipfix.remove_vpp_config()
999 self.logger.info("FFP_TEST_FINISH_0002")
1001 def test_L4onIP4(self):
1002 """L4 data on IP4 datapath"""
1003 self.logger.info("FFP_TEST_START_0003")
1004 self.pg_enable_capture(self.pg_interfaces)
1012 direction=self.direction,
1014 ipfix.add_vpp_config()
1016 ipfix_decoder = IPFIXDecoder()
1017 # template packet should arrive immediately
1018 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1020 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
1021 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1023 # make sure the one packet we expect actually showed up
1024 self.vapi.ipfix_flush()
1025 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1026 self.verify_cflow_data_detail(
1030 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
1033 # expected two templates and one cflow packet
1034 self.collector.get_capture(2)
1036 ipfix.remove_vpp_config()
1037 self.logger.info("FFP_TEST_FINISH_0003")
1039 def test_templatesIP6(self):
1040 """verify templates on IP6 datapath"""
1041 self.logger.info("FFP_TEST_START_0000")
1042 self.pg_enable_capture(self.pg_interfaces)
1045 test=self, intf=self.intf1, datapath="ip6", direction=self.direction
1047 ipfix.add_vpp_config()
1049 # template packet should arrive immediately
1050 ipfix.verify_templates(count=1)
1051 self.collector.get_capture(1)
1053 ipfix.remove_vpp_config()
1055 self.logger.info("FFP_TEST_FINISH_0000")
1057 def test_L2onIP6(self):
1058 """L2 data on IP6 datapath"""
1059 self.logger.info("FFP_TEST_START_0001")
1060 self.pg_enable_capture(self.pg_interfaces)
1068 direction=self.direction,
1070 ipfix.add_vpp_config()
1072 ipfix_decoder = IPFIXDecoder()
1073 # template packet should arrive immediately
1074 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1076 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1077 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1079 # make sure the one packet we expect actually showed up
1080 self.vapi.ipfix_flush()
1081 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1082 self.verify_cflow_data_detail(
1086 {2: "packets", 256: 56710, 61: (self.direction == "tx")},
1090 # expected two templates and one cflow packet
1091 self.collector.get_capture(2)
1093 ipfix.remove_vpp_config()
1094 self.logger.info("FFP_TEST_FINISH_0001")
1096 def test_L3onIP6(self):
1097 """L3 data on IP6 datapath"""
1098 self.logger.info("FFP_TEST_START_0002")
1099 self.pg_enable_capture(self.pg_interfaces)
1107 direction=self.direction,
1109 ipfix.add_vpp_config()
1111 ipfix_decoder = IPFIXDecoder()
1112 # template packet should arrive immediately
1113 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1115 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1116 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1118 # make sure the one packet we expect actually showed up
1119 self.vapi.ipfix_flush()
1120 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1121 self.verify_cflow_data_detail(
1125 {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
1129 # expected two templates and one cflow packet
1130 self.collector.get_capture(2)
1132 ipfix.remove_vpp_config()
1133 self.logger.info("FFP_TEST_FINISH_0002")
1135 def test_L4onIP6(self):
1136 """L4 data on IP6 datapath"""
1137 self.logger.info("FFP_TEST_START_0003")
1138 self.pg_enable_capture(self.pg_interfaces)
1146 direction=self.direction,
1148 ipfix.add_vpp_config()
1150 ipfix_decoder = IPFIXDecoder()
1151 # template packet should arrive immediately
1152 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1154 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1155 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1157 # make sure the one packet we expect actually showed up
1158 self.vapi.ipfix_flush()
1159 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1160 self.verify_cflow_data_detail(
1164 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
1168 # expected two templates and one cflow packet
1169 self.collector.get_capture(2)
1171 ipfix.remove_vpp_config()
1172 self.logger.info("FFP_TEST_FINISH_0003")
1174 def test_0001(self):
1175 """no timers, one CFLOW packet, 9 Flows inside"""
1176 self.logger.info("FFP_TEST_START_0001")
1177 self.pg_enable_capture(self.pg_interfaces)
1180 ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
1181 ipfix.add_vpp_config()
1183 ipfix_decoder = IPFIXDecoder()
1184 # template packet should arrive immediately
1185 templates = ipfix.verify_templates(ipfix_decoder)
1187 self.create_stream(packets=9)
1188 capture = self.send_packets()
1190 # make sure the one packet we expect actually showed up
1191 self.vapi.ipfix_flush()
1192 cflow = self.wait_for_cflow_packet(self.collector, templates[1])
1193 self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
1194 self.collector.get_capture(4)
1196 ipfix.remove_vpp_config()
1197 self.logger.info("FFP_TEST_FINISH_0001")
1199 def test_0002(self):
1200 """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
1201 self.logger.info("FFP_TEST_START_0002")
1202 self.pg_enable_capture(self.pg_interfaces)
1205 ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
1206 ipfix.add_vpp_config()
1208 ipfix_decoder = IPFIXDecoder()
1209 # template packet should arrive immediately
1210 self.vapi.ipfix_flush()
1211 templates = ipfix.verify_templates(ipfix_decoder)
1213 self.create_stream(packets=6)
1214 capture = self.send_packets()
1216 # make sure the one packet we expect actually showed up
1218 self.vapi.ipfix_flush()
1219 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1220 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1221 self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
1222 self.collector.get_capture(5)
1224 ipfix.remove_vpp_config()
1225 self.logger.info("FFP_TEST_FINISH_0002")
1228 @tag_fixme_vpp_workers
1229 class DatapathTx(MethodHolder, DatapathTestsHolder):
1230 """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
1237 def test_rewritten_traffic(self):
1238 """Rewritten traffic (from subif to ipfix if)"""
1239 self.pg_enable_capture(self.pg_interfaces)
1242 # prepare a sub-interface
1243 subif = VppDot1ADSubint(self, self.pg7, 0, 300, 400)
1247 # enable ip4 datapath for an interface
1253 direction=self.direction,
1255 ipfix.add_vpp_config()
1257 # template packet should arrive immediately
1258 ipfix_decoder = IPFIXDecoder()
1259 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1261 # forward some traffic through the ipfix interface
1266 [VppRoutePath(self.pg8.remote_ip4, self.pg8.sw_if_index)],
1268 route.add_vpp_config()
1270 # prepare an IPv4 packet (subif => ipfix interface)
1272 Ether(src=subif.remote_mac, dst=self.pg7.local_mac)
1273 / IP(src=subif.remote_ip4, dst="9.0.0.1")
1274 / UDP(sport=1234, dport=4321)
1275 / Raw(b"\xa5" * 123)
1278 subif.add_dot1ad_layer(pkt, 300, 400),
1282 capture = self.send_packets(self.pg7, self.pg8)
1284 # wait for a flow and verify it
1285 self.vapi.ipfix_flush()
1286 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1287 self.verify_cflow_data(ipfix_decoder, capture, cflow)
1288 self.verify_cflow_data_detail(
1293 IPFIX_SRC_IP4_ADDR_ID: "src_ip",
1294 IPFIX_DST_IP4_ADDR_ID: "dst_ip",
1295 IPFIX_SRC_TRANS_PORT_ID: "sport",
1296 IPFIX_DST_TRANS_PORT_ID: "dport",
1297 IPFIX_FLOW_DIRECTION_ID: (self.direction == "tx"),
1301 self.collector.get_capture(2)
1304 route.remove_vpp_config()
1305 subif.remove_vpp_config()
1306 ipfix.remove_vpp_config()
1309 @tag_fixme_vpp_workers
1310 class DatapathRx(MethodHolder, DatapathTestsHolder):
1311 """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
1319 @unittest.skipUnless(config.extended, "part of extended tests")
1320 class DisableIPFIX(MethodHolder):
1324 def setUpClass(cls):
1325 super(DisableIPFIX, cls).setUpClass()
1328 def tearDownClass(cls):
1329 super(DisableIPFIX, cls).tearDownClass()
1331 def test_0001(self):
1332 """disable IPFIX after first packets"""
1333 self.logger.info("FFP_TEST_START_0001")
1334 self.pg_enable_capture(self.pg_interfaces)
1337 ipfix = VppCFLOW(test=self)
1338 ipfix.add_vpp_config()
1340 ipfix_decoder = IPFIXDecoder()
1341 # template packet should arrive immediately
1342 templates = ipfix.verify_templates(ipfix_decoder)
1344 self.create_stream()
1347 # make sure the one packet we expect actually showed up
1348 self.vapi.ipfix_flush()
1349 self.wait_for_cflow_packet(self.collector, templates[1])
1350 self.collector.get_capture(4)
1353 ipfix.disable_exporter()
1354 self.pg_enable_capture([self.collector])
1358 # make sure no one packet arrived in 1 minute
1359 self.vapi.ipfix_flush()
1360 self.sleep(1, "wait before verifying no packets sent")
1361 self.collector.assert_nothing_captured()
1363 ipfix.remove_vpp_config()
1364 self.logger.info("FFP_TEST_FINISH_0001")
1367 @unittest.skipUnless(config.extended, "part of extended tests")
1368 class ReenableIPFIX(MethodHolder):
1369 """Re-enable IPFIX"""
1372 def setUpClass(cls):
1373 super(ReenableIPFIX, cls).setUpClass()
1376 def tearDownClass(cls):
1377 super(ReenableIPFIX, cls).tearDownClass()
1379 def test_0011(self):
1380 """disable IPFIX after first packets and re-enable after few packets"""
1381 self.logger.info("FFP_TEST_START_0001")
1382 self.pg_enable_capture(self.pg_interfaces)
1385 ipfix = VppCFLOW(test=self)
1386 ipfix.add_vpp_config()
1388 ipfix_decoder = IPFIXDecoder()
1389 # template packet should arrive immediately
1390 templates = ipfix.verify_templates(ipfix_decoder)
1392 self.create_stream(packets=5)
1395 # make sure the one packet we expect actually showed up
1396 self.vapi.ipfix_flush()
1397 self.wait_for_cflow_packet(self.collector, templates[1])
1398 self.collector.get_capture(4)
1401 ipfix.disable_exporter()
1402 self.vapi.ipfix_flush()
1403 self.pg_enable_capture([self.collector])
1407 # make sure no one packet arrived in active timer span
1408 self.vapi.ipfix_flush()
1409 self.sleep(1, "wait before verifying no packets sent")
1410 self.collector.assert_nothing_captured()
1411 self.pg2.get_capture(5)
1414 ipfix.enable_exporter()
1416 capture = self.collector.get_capture(4)
1420 self.assertTrue(p.haslayer(IPFIX))
1421 if p.haslayer(Template):
1423 self.assertTrue(nr_templates, 3)
1425 self.assertTrue(p.haslayer(IPFIX))
1426 if p.haslayer(Data):
1428 self.assertTrue(nr_templates, 1)
1430 ipfix.remove_vpp_config()
1431 self.logger.info("FFP_TEST_FINISH_0001")
1434 @unittest.skipUnless(config.extended, "part of extended tests")
1435 class DisableFP(MethodHolder):
1436 """Disable Flowprobe feature"""
1439 def setUpClass(cls):
1440 super(DisableFP, cls).setUpClass()
1443 def tearDownClass(cls):
1444 super(DisableFP, cls).tearDownClass()
1446 def test_0001(self):
1447 """disable flowprobe feature after first packets"""
1448 self.logger.info("FFP_TEST_START_0001")
1449 self.pg_enable_capture(self.pg_interfaces)
1451 ipfix = VppCFLOW(test=self)
1452 ipfix.add_vpp_config()
1454 ipfix_decoder = IPFIXDecoder()
1455 # template packet should arrive immediately
1456 templates = ipfix.verify_templates(ipfix_decoder)
1458 self.create_stream()
1461 # make sure the one packet we expect actually showed up
1462 self.vapi.ipfix_flush()
1463 self.wait_for_cflow_packet(self.collector, templates[1])
1464 self.collector.get_capture(4)
1467 ipfix.disable_flowprobe_feature()
1468 self.pg_enable_capture([self.collector])
1472 # make sure no one packet arrived in active timer span
1473 self.vapi.ipfix_flush()
1474 self.sleep(1, "wait before verifying no packets sent")
1475 self.collector.assert_nothing_captured()
1477 # enable FPP feature so the remove_vpp_config() doesn't fail
1478 # due to missing feature on interface.
1479 ipfix.enable_flowprobe_feature()
1481 ipfix.remove_vpp_config()
1482 self.logger.info("FFP_TEST_FINISH_0001")
1484 def test_no_leftover_flows_after_disabling(self):
1485 """disable flowprobe feature and expect no leftover flows"""
1486 self.pg_enable_capture(self.pg_interfaces)
1489 # enable ip4 datapath for an interface
1490 # set active and passive timers
1501 ipfix.add_vpp_config()
1503 # template packet should arrive immediately
1504 ipfix.verify_templates(count=1)
1506 # send some ip4 packets
1507 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5)
1508 self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1510 # disable feature for the interface
1511 # currently stored ip4 flows should be removed
1512 ipfix.disable_flowprobe_feature()
1514 # no leftover ip4 flows are expected
1515 self.pg_enable_capture([self.collector])
1516 self.sleep(12, "wait for leftover ip4 flows during three passive intervals")
1517 self.collector.assert_nothing_captured()
1519 # re-enable feature for the interface
1520 ipfix.enable_flowprobe_feature()
1522 # template packet should arrive immediately
1523 ipfix_decoder = IPFIXDecoder()
1524 self.vapi.ipfix_flush()
1525 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1527 # send some ip4 packets
1528 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5)
1529 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1531 # verify meta info - packet/octet delta
1532 self.vapi.ipfix_flush()
1533 cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=8)
1534 self.verify_cflow_data(ipfix_decoder, capture, cflow)
1536 self.collector.get_capture(2)
1539 ipfix.remove_vpp_config()
1542 @unittest.skipUnless(config.extended, "part of extended tests")
1543 class ReenableFP(MethodHolder):
1544 """Re-enable Flowprobe feature"""
1547 def setUpClass(cls):
1548 super(ReenableFP, cls).setUpClass()
1551 def tearDownClass(cls):
1552 super(ReenableFP, cls).tearDownClass()
1554 def test_0001(self):
1555 """disable flowprobe feature after first packets and re-enable
1556 after few packets"""
1557 self.logger.info("FFP_TEST_START_0001")
1558 self.pg_enable_capture(self.pg_interfaces)
1561 ipfix = VppCFLOW(test=self)
1562 ipfix.add_vpp_config()
1564 ipfix_decoder = IPFIXDecoder()
1565 # template packet should arrive immediately
1566 self.vapi.ipfix_flush()
1567 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1569 self.create_stream()
1572 # make sure the one packet we expect actually showed up
1573 self.vapi.ipfix_flush()
1574 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1575 self.collector.get_capture(4)
1577 # disable FPP feature
1578 ipfix.disable_flowprobe_feature()
1579 self.pg_enable_capture([self.collector])
1583 # make sure no one packet arrived in active timer span
1584 self.vapi.ipfix_flush()
1585 self.sleep(5, "wait before verifying no packets sent")
1586 self.collector.assert_nothing_captured()
1588 # enable FPP feature
1589 ipfix.enable_flowprobe_feature()
1590 self.vapi.ipfix_flush()
1591 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1595 # make sure the next packets (templates and data) we expect actually
1597 self.vapi.ipfix_flush()
1598 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1599 self.collector.get_capture(4)
1601 ipfix.remove_vpp_config()
1602 self.logger.info("FFP_TEST_FINISH_0001")
1605 if __name__ == "__main__":
1606 unittest.main(testRunner=VppTestRunner)