2 from __future__ import print_function
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, TCP, UDP
12 from scapy.layers.inet6 import IPv6
13 from scapy.contrib.lacp import SlowProtocol, LACP
15 from config import config
16 from framework import VppTestCase
17 from asfframework import (
18 tag_fixme_vpp_workers,
26 from vpp_object import VppObject
28 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
29 from vpp_ip_route import VppIpRoute, VppRoutePath
30 from vpp_papi.macaddress import mac_ntop
31 from socket import inet_ntop
32 from vpp_papi import VppEnum
33 from vpp_sub_interface import VppDot1ADSubint
36 TMPL_COMMON_FIELD_COUNT = 6
37 TMPL_L2_FIELD_COUNT = 3
38 TMPL_L3_FIELD_COUNT = 4
39 TMPL_L4_FIELD_COUNT = 3
41 IPFIX_TCP_FLAGS_ID = 6
42 IPFIX_SRC_TRANS_PORT_ID = 7
43 IPFIX_DST_TRANS_PORT_ID = 11
44 IPFIX_SRC_IP4_ADDR_ID = 8
45 IPFIX_DST_IP4_ADDR_ID = 12
46 IPFIX_FLOW_DIRECTION_ID = 61
58 class VppCFLOW(VppObject):
59 """CFLOW object for IPFIX exporter and Flowprobe feature"""
75 self._intf_obj = getattr(self._test, intf)
77 if passive == 0 or passive < active:
78 self._passive = active + 1
80 self._passive = passive
81 self._datapath = datapath # l2 ip4 ip6
82 self._collect = layer # l2 l3 l4
83 self._direction = direction # rx tx both
84 self._timeout = timeout
86 self._configured = False
88 def add_vpp_config(self):
89 self.enable_exporter()
93 if "l2" in self._collect.lower():
94 l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
95 if "l3" in self._collect.lower():
96 l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
97 if "l4" in self._collect.lower():
98 l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
99 self._test.vapi.flowprobe_set_params(
100 record_flags=(l2_flag | l3_flag | l4_flag),
101 active_timer=self._active,
102 passive_timer=self._passive,
104 self.enable_flowprobe_feature()
105 self._test.vapi.cli("ipfix flush")
106 self._configured = True
108 def remove_vpp_config(self):
109 self.disable_exporter()
110 self.disable_flowprobe_feature()
111 self._test.vapi.cli("ipfix flush")
112 self._configured = False
114 def enable_exporter(self):
115 self._test.vapi.set_ipfix_exporter(
116 collector_address=self._test.pg0.remote_ip4,
117 src_address=self._test.pg0.local_ip4,
119 template_interval=self._timeout,
122 def _enable_disable_flowprobe_feature(self, is_add):
124 "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
125 "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
126 "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
129 "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
130 "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
131 "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
133 self._test.vapi.flowprobe_interface_add_del(
135 which=which_map[self._datapath],
136 direction=direction_map[self._direction],
137 sw_if_index=self._intf_obj.sw_if_index,
140 def enable_flowprobe_feature(self):
141 self._enable_disable_flowprobe_feature(is_add=True)
143 def disable_exporter(self):
144 self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
146 def disable_flowprobe_feature(self):
147 self._enable_disable_flowprobe_feature(is_add=False)
150 return "ipfix-collector-%s-%s" % (self._src, self.dst)
152 def query_vpp_config(self):
153 return self._configured
155 def verify_templates(self, decoder=None, timeout=1, count=3, field_count_in=None):
157 self._test.assertIn(count, (1, 2, 3))
158 for _ in range(count):
159 p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
160 self._test.assertTrue(p.haslayer(IPFIX))
161 self._test.assertTrue(p.haslayer(Template))
162 if decoder is not None:
163 templates.append(p[Template].templateID)
164 decoder.add_template(p.getlayer(Template))
165 if field_count_in is not None:
166 self._test.assertIn(p[Template].fieldCount, field_count_in)
170 class MethodHolder(VppTestCase):
171 """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
175 max_number_of_packets = 10
181 Perform standard class setup (defined by class method setUpClass in
182 class VppTestCase) before running the test case, set test case related
183 variables and configure VPP.
185 super(MethodHolder, cls).setUpClass()
186 if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
191 # Create pg interfaces
192 cls.create_pg_interfaces(range(9))
195 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
197 # Create BD with MAC learning and unknown unicast flooding disabled
198 # and put interfaces to this BD
199 cls.vapi.bridge_domain_add_del_v2(
200 bd_id=1, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
202 cls.vapi.sw_interface_set_l2_bridge(
203 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
205 cls.vapi.sw_interface_set_l2_bridge(
206 rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
209 # Set up all interfaces
210 for i in cls.pg_interfaces:
214 cls.pg0.configure_ipv4_neighbors()
215 cls.collector = cls.pg0
218 cls.pg1.resolve_arp()
220 cls.pg2.resolve_arp()
222 cls.pg3.resolve_arp()
224 cls.pg4.resolve_arp()
227 cls.pg8.configure_ipv4_neighbors()
230 cls.pg5.resolve_ndp()
231 cls.pg5.disable_ipv6_ra()
233 cls.pg6.resolve_ndp()
234 cls.pg6.disable_ipv6_ra()
236 super(MethodHolder, cls).tearDownClass()
240 def tearDownClass(cls):
241 super(MethodHolder, cls).tearDownClass()
244 self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
246 """Create a packet stream to tickle the plugin
248 :param VppInterface src_if: Source interface for packet stream
249 :param VppInterface src_if: Dst interface for packet stream
257 packets = random.randint(1, self.max_number_of_packets)
259 for p in range(0, packets):
261 pkt_size = random.choice(self.pg_if_packet_sizes)
262 info = self.create_packet_info(src_if, dst_if)
263 payload = self.info_to_payload(info)
264 p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
266 p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
268 p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
269 p /= UDP(sport=1234, dport=4321)
272 self.extend_packet(p, pkt_size)
275 def verify_cflow_data(self, decoder, capture, cflow):
281 if cflow.haslayer(Data):
282 data = decoder.decode_data_set(cflow.getlayer(Set))
284 self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
285 self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
287 def send_packets(self, src_if=None, dst_if=None):
292 self.pg_enable_capture([dst_if])
293 src_if.add_stream(self.pkts)
295 return dst_if.get_capture(len(self.pkts))
297 def verify_cflow_data_detail(
302 data_set={1: "octets", 2: "packets"},
307 print(capture[0].show())
308 if cflow.haslayer(Data):
309 data = decoder.decode_data_set(cflow.getlayer(Set))
313 ip_layer = capture[0][IP] if capture[0].haslayer(IP) else None
315 ip_layer = capture[0][IPv6] if capture[0].haslayer(IPv6) else None
316 if data_set is not None:
318 # skip flow if ingress/egress interface is 0
319 if int(binascii.hexlify(record[10]), 16) == 0:
321 if int(binascii.hexlify(record[14]), 16) == 0:
324 for field in data_set:
325 value = data_set[field]
326 if value == "octets":
329 value += 40 # ??? is this correct
330 elif value == "packets":
332 elif value == "src_ip":
334 ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
336 ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
337 value = int(binascii.hexlify(ip), 16)
338 elif value == "dst_ip":
340 ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
342 ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
343 value = int(binascii.hexlify(ip), 16)
344 elif value == "sport":
345 value = int(capture[0][UDP].sport)
346 elif value == "dport":
347 value = int(capture[0][UDP].dport)
349 int(binascii.hexlify(record[field]), 16), value
351 if field_count is not None:
353 self.assertEqual(len(record), field_count)
355 def verify_cflow_data_notimer(self, decoder, capture, cflows):
358 if cflow.haslayer(Data):
359 data = decoder.decode_data_set(cflow.getlayer(Set))
361 raise Exception("No CFLOW data")
366 self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
367 self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
368 self.assertEqual(len(capture), idx)
370 def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
371 """wait for CFLOW packet and verify its correctness
373 :param timeout: how long to wait
376 self.logger.info("IPFIX: Waiting for CFLOW packet")
377 # self.logger.debug(self.vapi.ppcli("show flow table"))
378 p = collector_intf.wait_for_packet(timeout=timeout)
379 self.assertEqual(p[Set].setID, set_id)
380 # self.logger.debug(self.vapi.ppcli("show flow table"))
381 self.logger.debug(ppp("IPFIX: Got packet:", p))
386 @tag_fixme_vpp_workers
387 @tag_fixme_ubuntu2204
389 class Flowprobe(MethodHolder):
390 """Template verification, timer tests"""
394 super(Flowprobe, cls).setUpClass()
397 def tearDownClass(cls):
398 super(Flowprobe, cls).tearDownClass()
401 """timer less than template timeout"""
402 self.logger.info("FFP_TEST_START_0001")
403 self.pg_enable_capture(self.pg_interfaces)
406 ipfix = VppCFLOW(test=self, active=2)
407 ipfix.add_vpp_config()
409 ipfix_decoder = IPFIXDecoder()
410 # template packet should arrive immediately
411 templates = ipfix.verify_templates(ipfix_decoder)
413 self.create_stream(packets=1)
415 capture = self.pg2.get_capture(1)
417 # make sure the one packet we expect actually showed up
418 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
419 self.verify_cflow_data(ipfix_decoder, capture, cflow)
421 ipfix.remove_vpp_config()
422 self.logger.info("FFP_TEST_FINISH_0001")
424 @unittest.skipUnless(
425 config.extended, "Test is unstable (assertion error, needs to be fixed"
428 """timer greater than template timeout [UNSTABLE, FIX ME]"""
429 self.logger.info("FFP_TEST_START_0002")
430 self.pg_enable_capture(self.pg_interfaces)
433 ipfix = VppCFLOW(test=self, timeout=3, active=4)
434 ipfix.add_vpp_config()
436 ipfix_decoder = IPFIXDecoder()
437 # template packet should arrive immediately
438 ipfix.verify_templates()
440 self.create_stream(packets=2)
442 capture = self.pg2.get_capture(2)
444 # next set of template packet should arrive after 20 seconds
445 # template packet should arrive within 20 s
446 templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
448 # make sure the one packet we expect actually showed up
449 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
450 self.verify_cflow_data(ipfix_decoder, capture, cflow)
452 ipfix.remove_vpp_config()
453 self.logger.info("FFP_TEST_FINISH_0002")
455 def test_cflow_packet(self):
456 """verify cflow packet fields"""
457 self.logger.info("FFP_TEST_START_0000")
458 self.pg_enable_capture(self.pg_interfaces)
462 test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
464 ipfix.add_vpp_config()
466 route_9001 = VppIpRoute(
470 [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
472 route_9001.add_vpp_config()
474 ipfix_decoder = IPFIXDecoder()
475 templates = ipfix.verify_templates(ipfix_decoder, count=1)
479 Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
480 / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
481 / TCP(sport=1234, dport=4321, flags=80)
486 nowUTC = int(time.time())
487 nowUNIX = nowUTC + 2208988800
488 self.send_packets(src_if=self.pg7, dst_if=self.pg8)
490 cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
491 self.collector.get_capture(2)
493 if cflow[0].haslayer(IPFIX):
494 self.assertEqual(cflow[IPFIX].version, 10)
495 self.assertEqual(cflow[IPFIX].observationDomainID, 1)
496 self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
497 self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
498 if cflow.haslayer(Data):
499 record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
501 self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
503 self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
505 self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
507 self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
509 self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
511 self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
512 flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
513 # flow start timestamp
514 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
515 flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
517 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
519 self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
521 self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
523 self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
525 self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
527 self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
529 self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
531 self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
533 ipfix.remove_vpp_config()
534 self.logger.info("FFP_TEST_FINISH_0000")
536 def test_flow_entry_reuse(self):
537 """Verify flow entry reuse doesn't accumulate meta info"""
538 self.pg_enable_capture(self.pg_interfaces)
541 # enable ip4 datapath for an interface
542 # set active and passive timers
553 ipfix.add_vpp_config()
555 # template packet should arrive immediately
556 ipfix_decoder = IPFIXDecoder()
557 templates = ipfix.verify_templates(ipfix_decoder, count=1)
562 Ether(src=self.pg3.remote_mac, dst=self.pg4.local_mac)
563 / IP(src=self.pg3.remote_ip4, dst=self.pg4.remote_ip4)
564 / TCP(sport=1234, dport=4321)
569 # send the tcp packet two times, each time with new set of flags
571 TCP_F_SYN | TCP_F_ACK,
572 TCP_F_RST | TCP_F_PSH,
575 self.pkts[0][TCP].flags = f
576 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
578 # verify meta info - packet/octet delta and tcp flags
579 cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=6)
580 self.verify_cflow_data(ipfix_decoder, capture, cflow)
581 self.verify_cflow_data_detail(
586 IPFIX_TCP_FLAGS_ID: f,
587 IPFIX_SRC_TRANS_PORT_ID: 1234,
588 IPFIX_DST_TRANS_PORT_ID: 4321,
592 self.collector.get_capture(3)
595 ipfix.remove_vpp_config()
597 def test_interface_dump(self):
598 """Dump interfaces with IPFIX flow record generation enabled"""
599 self.logger.info("FFP_TEST_START_0003")
601 # Enable feature for 3 interfaces
602 ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx")
603 ipfix1.add_vpp_config()
605 ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx")
606 ipfix2.enable_flowprobe_feature()
608 ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both")
609 ipfix3.enable_flowprobe_feature()
611 # When request "all", dump should contain all enabled interfaces
612 dump = self.vapi.flowprobe_interface_dump()
613 self.assertEqual(len(dump), 3)
615 # Verify 1st interface
616 self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index)
618 dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2
622 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
625 # Verify 2nd interface
626 self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index)
628 dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
632 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
635 # Verify 3rd interface
636 self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index)
638 dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6
642 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
645 # When request 2nd interface, dump should contain only the specified interface
646 dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index)
647 self.assertEqual(len(dump), 1)
649 # Verify 2nd interface
650 self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index)
652 dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
656 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
659 # When request 99th interface, dump should be empty
660 dump = self.vapi.flowprobe_interface_dump(sw_if_index=99)
661 self.assertEqual(len(dump), 0)
663 ipfix1.remove_vpp_config()
664 ipfix2.remove_vpp_config()
665 ipfix3.remove_vpp_config()
666 self.logger.info("FFP_TEST_FINISH_0003")
668 def test_get_params(self):
669 """Get IPFIX flow record generation parameters"""
670 self.logger.info("FFP_TEST_START_0004")
672 # Enable feature for an interface with custom parameters
673 ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4")
674 ipfix.add_vpp_config()
676 # Get and verify parameters
677 params = self.vapi.flowprobe_get_params()
678 self.assertEqual(params.active_timer, 20)
679 self.assertEqual(params.passive_timer, 40)
680 record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
681 record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
682 record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
683 self.assertEqual(params.record_flags, record_flags)
685 ipfix.remove_vpp_config()
686 self.logger.info("FFP_TEST_FINISH_0004")
689 class DatapathTestsHolder(object):
690 """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
694 super(DatapathTestsHolder, cls).setUpClass()
697 def tearDownClass(cls):
698 super(DatapathTestsHolder, cls).tearDownClass()
700 def test_templatesL2(self):
701 """verify template on L2 datapath"""
702 self.logger.info("FFP_TEST_START_0000")
703 self.pg_enable_capture(self.pg_interfaces)
706 test=self, intf=self.intf1, layer="l2", direction=self.direction
708 ipfix.add_vpp_config()
710 # template packet should arrive immediately
711 self.vapi.ipfix_flush()
712 ipfix.verify_templates(timeout=3, count=1)
713 self.collector.get_capture(1)
715 ipfix.remove_vpp_config()
716 self.logger.info("FFP_TEST_FINISH_0000")
718 def test_L2onL2(self):
719 """L2 data on L2 datapath"""
720 self.logger.info("FFP_TEST_START_0001")
721 self.pg_enable_capture(self.pg_interfaces)
725 test=self, intf=self.intf1, layer="l2", direction=self.direction
727 ipfix.add_vpp_config()
729 ipfix_decoder = IPFIXDecoder()
730 # template packet should arrive immediately
731 templates = ipfix.verify_templates(ipfix_decoder, count=1)
733 self.create_stream(packets=1)
734 capture = self.send_packets()
736 # make sure the one packet we expect actually showed up
737 self.vapi.ipfix_flush()
738 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
739 self.verify_cflow_data_detail(
743 {2: "packets", 256: 8, 61: (self.direction == "tx")},
745 self.collector.get_capture(2)
747 ipfix.remove_vpp_config()
748 self.logger.info("FFP_TEST_FINISH_0001")
750 def test_L3onL2(self):
751 """L3 data on L2 datapath"""
752 self.logger.info("FFP_TEST_START_0002")
753 self.pg_enable_capture(self.pg_interfaces)
757 test=self, intf=self.intf1, layer="l3", direction=self.direction
759 ipfix.add_vpp_config()
761 ipfix_decoder = IPFIXDecoder()
762 # template packet should arrive immediately
763 templates = ipfix.verify_templates(ipfix_decoder, count=2)
765 self.create_stream(packets=1)
766 capture = self.send_packets()
768 # make sure the one packet we expect actually showed up
769 self.vapi.ipfix_flush()
770 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
771 self.verify_cflow_data_detail(
780 61: (self.direction == "tx"),
784 self.collector.get_capture(3)
786 ipfix.remove_vpp_config()
787 self.logger.info("FFP_TEST_FINISH_0002")
789 def test_L234onL2(self):
790 """L2/3/4 data on L2 datapath"""
791 self.pg_enable_capture(self.pg_interfaces)
795 test=self, intf=self.intf1, layer="l2 l3 l4", direction=self.direction
797 ipfix.add_vpp_config()
799 ipfix_decoder = IPFIXDecoder()
800 # template packet should arrive immediately
801 tmpl_l2_field_count = TMPL_COMMON_FIELD_COUNT + TMPL_L2_FIELD_COUNT
802 tmpl_ip_field_count = (
803 TMPL_COMMON_FIELD_COUNT
804 + TMPL_L2_FIELD_COUNT
805 + TMPL_L3_FIELD_COUNT
806 + TMPL_L4_FIELD_COUNT
808 templates = ipfix.verify_templates(
811 field_count_in=(tmpl_l2_field_count, tmpl_ip_field_count),
814 # verify IPv4 and IPv6 flows
815 for ip_ver in ("v4", "v6"):
816 self.create_stream(packets=1, ip_ver=ip_ver)
817 capture = self.send_packets()
819 # make sure the one packet we expect actually showed up
820 self.vapi.ipfix_flush()
821 cflow = self.wait_for_cflow_packet(
822 self.collector, templates[1 if ip_ver == "v4" else 2]
824 src_ip_id = 8 if ip_ver == "v4" else 27
825 dst_ip_id = 12 if ip_ver == "v4" else 28
826 self.verify_cflow_data_detail(
832 256: 8 if ip_ver == "v4" else 56710,
838 61: (self.direction == "tx"),
841 field_count=tmpl_ip_field_count,
847 Ether(dst=self.pg2.local_mac, src=self.pg1.remote_mac)
852 capture = self.send_packets()
854 # make sure the one packet we expect actually showed up
855 self.vapi.ipfix_flush()
856 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
857 self.verify_cflow_data_detail(
861 {2: "packets", 256: 2440, 61: (self.direction == "tx")},
862 field_count=tmpl_l2_field_count,
865 self.collector.get_capture(6)
867 ipfix.remove_vpp_config()
869 def test_L4onL2(self):
870 """L4 data on L2 datapath"""
871 self.logger.info("FFP_TEST_START_0003")
872 self.pg_enable_capture(self.pg_interfaces)
876 test=self, intf=self.intf1, layer="l4", direction=self.direction
878 ipfix.add_vpp_config()
880 ipfix_decoder = IPFIXDecoder()
881 # template packet should arrive immediately
882 templates = ipfix.verify_templates(ipfix_decoder, count=2)
884 self.create_stream(packets=1)
885 capture = self.send_packets()
887 # make sure the one packet we expect actually showed up
888 self.vapi.ipfix_flush()
889 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
890 self.verify_cflow_data_detail(
894 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
897 self.collector.get_capture(3)
899 ipfix.remove_vpp_config()
900 self.logger.info("FFP_TEST_FINISH_0003")
902 def test_templatesIp4(self):
903 """verify templates on IP4 datapath"""
904 self.logger.info("FFP_TEST_START_0000")
906 self.pg_enable_capture(self.pg_interfaces)
909 test=self, intf=self.intf1, datapath="ip4", direction=self.direction
911 ipfix.add_vpp_config()
913 # template packet should arrive immediately
914 self.vapi.ipfix_flush()
915 ipfix.verify_templates(timeout=3, count=1)
916 self.collector.get_capture(1)
918 ipfix.remove_vpp_config()
920 self.logger.info("FFP_TEST_FINISH_0000")
922 def test_L2onIP4(self):
923 """L2 data on IP4 datapath"""
924 self.logger.info("FFP_TEST_START_0001")
925 self.pg_enable_capture(self.pg_interfaces)
933 direction=self.direction,
935 ipfix.add_vpp_config()
937 ipfix_decoder = IPFIXDecoder()
938 # template packet should arrive immediately
939 templates = ipfix.verify_templates(ipfix_decoder, count=1)
941 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
942 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
944 # make sure the one packet we expect actually showed up
945 self.vapi.ipfix_flush()
946 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
947 self.verify_cflow_data_detail(
951 {2: "packets", 256: 8, 61: (self.direction == "tx")},
954 # expected two templates and one cflow packet
955 self.collector.get_capture(2)
957 ipfix.remove_vpp_config()
958 self.logger.info("FFP_TEST_FINISH_0001")
960 def test_L3onIP4(self):
961 """L3 data on IP4 datapath"""
962 self.logger.info("FFP_TEST_START_0002")
963 self.pg_enable_capture(self.pg_interfaces)
971 direction=self.direction,
973 ipfix.add_vpp_config()
975 ipfix_decoder = IPFIXDecoder()
976 # template packet should arrive immediately
977 templates = ipfix.verify_templates(ipfix_decoder, count=1)
979 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
980 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
982 # make sure the one packet we expect actually showed up
983 self.vapi.ipfix_flush()
984 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
985 self.verify_cflow_data_detail(
994 61: (self.direction == "tx"),
998 # expected two templates and one cflow packet
999 self.collector.get_capture(2)
1001 ipfix.remove_vpp_config()
1002 self.logger.info("FFP_TEST_FINISH_0002")
1004 def test_L4onIP4(self):
1005 """L4 data on IP4 datapath"""
1006 self.logger.info("FFP_TEST_START_0003")
1007 self.pg_enable_capture(self.pg_interfaces)
1015 direction=self.direction,
1017 ipfix.add_vpp_config()
1019 ipfix_decoder = IPFIXDecoder()
1020 # template packet should arrive immediately
1021 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1023 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
1024 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1026 # make sure the one packet we expect actually showed up
1027 self.vapi.ipfix_flush()
1028 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1029 self.verify_cflow_data_detail(
1033 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
1036 # expected two templates and one cflow packet
1037 self.collector.get_capture(2)
1039 ipfix.remove_vpp_config()
1040 self.logger.info("FFP_TEST_FINISH_0003")
1042 def test_templatesIP6(self):
1043 """verify templates on IP6 datapath"""
1044 self.logger.info("FFP_TEST_START_0000")
1045 self.pg_enable_capture(self.pg_interfaces)
1048 test=self, intf=self.intf1, datapath="ip6", direction=self.direction
1050 ipfix.add_vpp_config()
1052 # template packet should arrive immediately
1053 ipfix.verify_templates(count=1)
1054 self.collector.get_capture(1)
1056 ipfix.remove_vpp_config()
1058 self.logger.info("FFP_TEST_FINISH_0000")
1060 def test_L2onIP6(self):
1061 """L2 data on IP6 datapath"""
1062 self.logger.info("FFP_TEST_START_0001")
1063 self.pg_enable_capture(self.pg_interfaces)
1071 direction=self.direction,
1073 ipfix.add_vpp_config()
1075 ipfix_decoder = IPFIXDecoder()
1076 # template packet should arrive immediately
1077 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1079 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1080 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1082 # make sure the one packet we expect actually showed up
1083 self.vapi.ipfix_flush()
1084 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1085 self.verify_cflow_data_detail(
1089 {2: "packets", 256: 56710, 61: (self.direction == "tx")},
1093 # expected two templates and one cflow packet
1094 self.collector.get_capture(2)
1096 ipfix.remove_vpp_config()
1097 self.logger.info("FFP_TEST_FINISH_0001")
1099 def test_L3onIP6(self):
1100 """L3 data on IP6 datapath"""
1101 self.logger.info("FFP_TEST_START_0002")
1102 self.pg_enable_capture(self.pg_interfaces)
1110 direction=self.direction,
1112 ipfix.add_vpp_config()
1114 ipfix_decoder = IPFIXDecoder()
1115 # template packet should arrive immediately
1116 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1118 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1119 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1121 # make sure the one packet we expect actually showed up
1122 self.vapi.ipfix_flush()
1123 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1124 self.verify_cflow_data_detail(
1128 {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
1132 # expected two templates and one cflow packet
1133 self.collector.get_capture(2)
1135 ipfix.remove_vpp_config()
1136 self.logger.info("FFP_TEST_FINISH_0002")
1138 def test_L4onIP6(self):
1139 """L4 data on IP6 datapath"""
1140 self.logger.info("FFP_TEST_START_0003")
1141 self.pg_enable_capture(self.pg_interfaces)
1149 direction=self.direction,
1151 ipfix.add_vpp_config()
1153 ipfix_decoder = IPFIXDecoder()
1154 # template packet should arrive immediately
1155 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1157 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1158 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1160 # make sure the one packet we expect actually showed up
1161 self.vapi.ipfix_flush()
1162 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1163 self.verify_cflow_data_detail(
1167 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
1171 # expected two templates and one cflow packet
1172 self.collector.get_capture(2)
1174 ipfix.remove_vpp_config()
1175 self.logger.info("FFP_TEST_FINISH_0003")
1177 def test_0001(self):
1178 """no timers, one CFLOW packet, 9 Flows inside"""
1179 self.logger.info("FFP_TEST_START_0001")
1180 self.pg_enable_capture(self.pg_interfaces)
1183 ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
1184 ipfix.add_vpp_config()
1186 ipfix_decoder = IPFIXDecoder()
1187 # template packet should arrive immediately
1188 templates = ipfix.verify_templates(ipfix_decoder)
1190 self.create_stream(packets=9)
1191 capture = self.send_packets()
1193 # make sure the one packet we expect actually showed up
1194 self.vapi.ipfix_flush()
1195 cflow = self.wait_for_cflow_packet(self.collector, templates[1])
1196 self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
1197 self.collector.get_capture(4)
1199 ipfix.remove_vpp_config()
1200 self.logger.info("FFP_TEST_FINISH_0001")
1202 def test_0002(self):
1203 """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
1204 self.logger.info("FFP_TEST_START_0002")
1205 self.pg_enable_capture(self.pg_interfaces)
1208 ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
1209 ipfix.add_vpp_config()
1211 ipfix_decoder = IPFIXDecoder()
1212 # template packet should arrive immediately
1213 self.vapi.ipfix_flush()
1214 templates = ipfix.verify_templates(ipfix_decoder)
1216 self.create_stream(packets=6)
1217 capture = self.send_packets()
1219 # make sure the one packet we expect actually showed up
1221 self.vapi.ipfix_flush()
1222 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1223 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1224 self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
1225 self.collector.get_capture(5)
1227 ipfix.remove_vpp_config()
1228 self.logger.info("FFP_TEST_FINISH_0002")
1231 class DatapathTx(MethodHolder, DatapathTestsHolder):
1232 """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
1239 def test_rewritten_traffic(self):
1240 """Rewritten traffic (from subif to ipfix if)"""
1241 self.pg_enable_capture(self.pg_interfaces)
1244 # prepare a sub-interface
1245 subif = VppDot1ADSubint(self, self.pg7, 0, 300, 400)
1249 # enable ip4 datapath for an interface
1255 direction=self.direction,
1257 ipfix.add_vpp_config()
1259 # template packet should arrive immediately
1260 ipfix_decoder = IPFIXDecoder()
1261 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1263 # forward some traffic through the ipfix interface
1268 [VppRoutePath(self.pg8.remote_ip4, self.pg8.sw_if_index)],
1270 route.add_vpp_config()
1272 # prepare an IPv4 packet (subif => ipfix interface)
1274 Ether(src=subif.remote_mac, dst=self.pg7.local_mac)
1275 / IP(src=subif.remote_ip4, dst="9.0.0.1")
1276 / UDP(sport=1234, dport=4321)
1277 / Raw(b"\xa5" * 123)
1280 subif.add_dot1ad_layer(pkt, 300, 400),
1284 capture = self.send_packets(self.pg7, self.pg8)
1286 # wait for a flow and verify it
1287 self.vapi.ipfix_flush()
1288 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1289 self.verify_cflow_data(ipfix_decoder, capture, cflow)
1290 self.verify_cflow_data_detail(
1295 IPFIX_SRC_IP4_ADDR_ID: "src_ip",
1296 IPFIX_DST_IP4_ADDR_ID: "dst_ip",
1297 IPFIX_SRC_TRANS_PORT_ID: "sport",
1298 IPFIX_DST_TRANS_PORT_ID: "dport",
1299 IPFIX_FLOW_DIRECTION_ID: (self.direction == "tx"),
1303 self.collector.get_capture(2)
1306 route.remove_vpp_config()
1307 subif.remove_vpp_config()
1308 ipfix.remove_vpp_config()
1311 class DatapathRx(MethodHolder, DatapathTestsHolder):
1312 """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
1320 @unittest.skipUnless(config.extended, "part of extended tests")
1321 class DisableIPFIX(MethodHolder):
1325 def setUpClass(cls):
1326 super(DisableIPFIX, cls).setUpClass()
1329 def tearDownClass(cls):
1330 super(DisableIPFIX, cls).tearDownClass()
1332 def test_0001(self):
1333 """disable IPFIX after first packets"""
1334 self.logger.info("FFP_TEST_START_0001")
1335 self.pg_enable_capture(self.pg_interfaces)
1338 ipfix = VppCFLOW(test=self)
1339 ipfix.add_vpp_config()
1341 ipfix_decoder = IPFIXDecoder()
1342 # template packet should arrive immediately
1343 templates = ipfix.verify_templates(ipfix_decoder)
1345 self.create_stream()
1348 # make sure the one packet we expect actually showed up
1349 self.vapi.ipfix_flush()
1350 self.wait_for_cflow_packet(self.collector, templates[1])
1351 self.collector.get_capture(4)
1354 ipfix.disable_exporter()
1355 self.pg_enable_capture([self.collector])
1359 # make sure no one packet arrived in 1 minute
1360 self.vapi.ipfix_flush()
1361 self.sleep(1, "wait before verifying no packets sent")
1362 self.collector.assert_nothing_captured()
1364 ipfix.remove_vpp_config()
1365 self.logger.info("FFP_TEST_FINISH_0001")
1368 @unittest.skipUnless(config.extended, "part of extended tests")
1369 class ReenableIPFIX(MethodHolder):
1370 """Re-enable IPFIX"""
1373 def setUpClass(cls):
1374 super(ReenableIPFIX, cls).setUpClass()
1377 def tearDownClass(cls):
1378 super(ReenableIPFIX, cls).tearDownClass()
1380 def test_0011(self):
1381 """disable IPFIX after first packets and re-enable after few packets"""
1382 self.logger.info("FFP_TEST_START_0001")
1383 self.pg_enable_capture(self.pg_interfaces)
1386 ipfix = VppCFLOW(test=self)
1387 ipfix.add_vpp_config()
1389 ipfix_decoder = IPFIXDecoder()
1390 # template packet should arrive immediately
1391 templates = ipfix.verify_templates(ipfix_decoder)
1393 self.create_stream(packets=5)
1396 # make sure the one packet we expect actually showed up
1397 self.vapi.ipfix_flush()
1398 self.wait_for_cflow_packet(self.collector, templates[1])
1399 self.collector.get_capture(4)
1402 ipfix.disable_exporter()
1403 self.vapi.ipfix_flush()
1404 self.pg_enable_capture([self.collector])
1408 # make sure no one packet arrived in active timer span
1409 self.vapi.ipfix_flush()
1410 self.sleep(1, "wait before verifying no packets sent")
1411 self.collector.assert_nothing_captured()
1412 self.pg2.get_capture(5)
1415 ipfix.enable_exporter()
1417 capture = self.collector.get_capture(4)
1421 self.assertTrue(p.haslayer(IPFIX))
1422 if p.haslayer(Template):
1424 self.assertTrue(nr_templates, 3)
1426 self.assertTrue(p.haslayer(IPFIX))
1427 if p.haslayer(Data):
1429 self.assertTrue(nr_templates, 1)
1431 ipfix.remove_vpp_config()
1432 self.logger.info("FFP_TEST_FINISH_0001")
1435 @unittest.skipUnless(config.extended, "part of extended tests")
1436 class DisableFP(MethodHolder):
1437 """Disable Flowprobe feature"""
1440 def setUpClass(cls):
1441 super(DisableFP, cls).setUpClass()
1444 def tearDownClass(cls):
1445 super(DisableFP, cls).tearDownClass()
1447 def test_0001(self):
1448 """disable flowprobe feature after first packets"""
1449 self.logger.info("FFP_TEST_START_0001")
1450 self.pg_enable_capture(self.pg_interfaces)
1452 ipfix = VppCFLOW(test=self)
1453 ipfix.add_vpp_config()
1455 ipfix_decoder = IPFIXDecoder()
1456 # template packet should arrive immediately
1457 templates = ipfix.verify_templates(ipfix_decoder)
1459 self.create_stream()
1462 # make sure the one packet we expect actually showed up
1463 self.vapi.ipfix_flush()
1464 self.wait_for_cflow_packet(self.collector, templates[1])
1465 self.collector.get_capture(4)
1468 ipfix.disable_flowprobe_feature()
1469 self.pg_enable_capture([self.collector])
1473 # make sure no one packet arrived in active timer span
1474 self.vapi.ipfix_flush()
1475 self.sleep(1, "wait before verifying no packets sent")
1476 self.collector.assert_nothing_captured()
1478 # enable FPP feature so the remove_vpp_config() doesn't fail
1479 # due to missing feature on interface.
1480 ipfix.enable_flowprobe_feature()
1482 ipfix.remove_vpp_config()
1483 self.logger.info("FFP_TEST_FINISH_0001")
1485 def test_no_leftover_flows_after_disabling(self):
1486 """disable flowprobe feature and expect no leftover flows"""
1487 self.pg_enable_capture(self.pg_interfaces)
1490 # enable ip4 datapath for an interface
1491 # set active and passive timers
1502 ipfix.add_vpp_config()
1504 # template packet should arrive immediately
1505 ipfix.verify_templates(count=1)
1507 # send some ip4 packets
1508 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5)
1509 self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1511 # disable feature for the interface
1512 # currently stored ip4 flows should be removed
1513 ipfix.disable_flowprobe_feature()
1515 # no leftover ip4 flows are expected
1516 self.pg_enable_capture([self.collector])
1517 self.sleep(12, "wait for leftover ip4 flows during three passive intervals")
1518 self.collector.assert_nothing_captured()
1520 # re-enable feature for the interface
1521 ipfix.enable_flowprobe_feature()
1523 # template packet should arrive immediately
1524 ipfix_decoder = IPFIXDecoder()
1525 self.vapi.ipfix_flush()
1526 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1528 # send some ip4 packets
1529 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5)
1530 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1532 # verify meta info - packet/octet delta
1533 self.vapi.ipfix_flush()
1534 cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=8)
1535 self.verify_cflow_data(ipfix_decoder, capture, cflow)
1537 self.collector.get_capture(2)
1540 ipfix.remove_vpp_config()
1543 @unittest.skipUnless(config.extended, "part of extended tests")
1544 class ReenableFP(MethodHolder):
1545 """Re-enable Flowprobe feature"""
1548 def setUpClass(cls):
1549 super(ReenableFP, cls).setUpClass()
1552 def tearDownClass(cls):
1553 super(ReenableFP, cls).tearDownClass()
1555 def test_0001(self):
1556 """disable flowprobe feature after first packets and re-enable
1557 after few packets"""
1558 self.logger.info("FFP_TEST_START_0001")
1559 self.pg_enable_capture(self.pg_interfaces)
1562 ipfix = VppCFLOW(test=self)
1563 ipfix.add_vpp_config()
1565 ipfix_decoder = IPFIXDecoder()
1566 # template packet should arrive immediately
1567 self.vapi.ipfix_flush()
1568 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1570 self.create_stream()
1573 # make sure the one packet we expect actually showed up
1574 self.vapi.ipfix_flush()
1575 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1576 self.collector.get_capture(4)
1578 # disable FPP feature
1579 ipfix.disable_flowprobe_feature()
1580 self.pg_enable_capture([self.collector])
1584 # make sure no one packet arrived in active timer span
1585 self.vapi.ipfix_flush()
1586 self.sleep(5, "wait before verifying no packets sent")
1587 self.collector.assert_nothing_captured()
1589 # enable FPP feature
1590 ipfix.enable_flowprobe_feature()
1591 self.vapi.ipfix_flush()
1592 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1596 # make sure the next packets (templates and data) we expect actually
1598 self.vapi.ipfix_flush()
1599 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1600 self.collector.get_capture(4)
1602 ipfix.remove_vpp_config()
1603 self.logger.info("FFP_TEST_FINISH_0001")
1606 if __name__ == "__main__":
1607 unittest.main(testRunner=VppTestRunner)