2 from __future__ import print_function
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, TCP, UDP
12 from scapy.layers.inet6 import IPv6
13 from scapy.contrib.lacp import SlowProtocol, LACP
15 from config import config
16 from framework import VppTestCase
17 from asfframework import (
18 tag_fixme_vpp_workers,
26 from vpp_object import VppObject
28 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
29 from vpp_ip_route import VppIpRoute, VppRoutePath
30 from vpp_papi.macaddress import mac_ntop
31 from socket import inet_ntop
32 from vpp_papi import VppEnum
33 from vpp_sub_interface import VppDot1ADSubint
36 TMPL_COMMON_FIELD_COUNT = 6
37 TMPL_L2_FIELD_COUNT = 3
38 TMPL_L3_FIELD_COUNT = 4
39 TMPL_L4_FIELD_COUNT = 3
41 IPFIX_TCP_FLAGS_ID = 6
42 IPFIX_SRC_TRANS_PORT_ID = 7
43 IPFIX_DST_TRANS_PORT_ID = 11
44 IPFIX_SRC_IP4_ADDR_ID = 8
45 IPFIX_DST_IP4_ADDR_ID = 12
46 IPFIX_FLOW_DIRECTION_ID = 61
58 class VppCFLOW(VppObject):
59 """CFLOW object for IPFIX exporter and Flowprobe feature"""
75 self._intf_obj = getattr(self._test, intf)
77 if passive == 0 or passive < active:
78 self._passive = active + 1
80 self._passive = passive
81 self._datapath = datapath # l2 ip4 ip6
82 self._collect = layer # l2 l3 l4
83 self._direction = direction # rx tx both
84 self._timeout = timeout
86 self._configured = False
88 def add_vpp_config(self):
89 self.enable_exporter()
93 if "l2" in self._collect.lower():
94 l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
95 if "l3" in self._collect.lower():
96 l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
97 if "l4" in self._collect.lower():
98 l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
99 self._test.vapi.flowprobe_set_params(
100 record_flags=(l2_flag | l3_flag | l4_flag),
101 active_timer=self._active,
102 passive_timer=self._passive,
104 self.enable_flowprobe_feature()
105 self._test.vapi.cli("ipfix flush")
106 self._configured = True
108 def remove_vpp_config(self):
109 self.disable_exporter()
110 self.disable_flowprobe_feature()
111 self._test.vapi.cli("ipfix flush")
112 self._configured = False
114 def enable_exporter(self):
115 self._test.vapi.set_ipfix_exporter(
116 collector_address=self._test.pg0.remote_ip4,
117 src_address=self._test.pg0.local_ip4,
119 template_interval=self._timeout,
122 def _enable_disable_flowprobe_feature(self, is_add):
124 "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
125 "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
126 "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
129 "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
130 "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
131 "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
133 self._test.vapi.flowprobe_interface_add_del(
135 which=which_map[self._datapath],
136 direction=direction_map[self._direction],
137 sw_if_index=self._intf_obj.sw_if_index,
140 def enable_flowprobe_feature(self):
141 self._enable_disable_flowprobe_feature(is_add=True)
143 def disable_exporter(self):
144 self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
146 def disable_flowprobe_feature(self):
147 self._enable_disable_flowprobe_feature(is_add=False)
150 return "ipfix-collector-%s-%s" % (self._src, self.dst)
152 def query_vpp_config(self):
153 return self._configured
155 def verify_templates(self, decoder=None, timeout=1, count=3, field_count_in=None):
157 self._test.assertIn(count, (1, 2, 3))
158 for _ in range(count):
159 p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
160 self._test.assertTrue(p.haslayer(IPFIX))
161 self._test.assertTrue(p.haslayer(Template))
162 if decoder is not None:
163 templates.append(p[Template].templateID)
164 decoder.add_template(p.getlayer(Template))
165 if field_count_in is not None:
166 self._test.assertIn(p[Template].fieldCount, field_count_in)
170 class MethodHolder(VppTestCase):
171 """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
175 max_number_of_packets = 10
181 Perform standard class setup (defined by class method setUpClass in
182 class VppTestCase) before running the test case, set test case related
183 variables and configure VPP.
185 super(MethodHolder, cls).setUpClass()
186 if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
191 # Create pg interfaces
192 cls.create_pg_interfaces(range(9))
195 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
197 # Create BD with MAC learning and unknown unicast flooding disabled
198 # and put interfaces to this BD
199 cls.vapi.bridge_domain_add_del_v2(
200 bd_id=1, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
202 cls.vapi.sw_interface_set_l2_bridge(
203 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
205 cls.vapi.sw_interface_set_l2_bridge(
206 rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
209 # Set up all interfaces
210 for i in cls.pg_interfaces:
214 cls.pg0.configure_ipv4_neighbors()
215 cls.collector = cls.pg0
218 cls.pg1.resolve_arp()
220 cls.pg2.resolve_arp()
222 cls.pg3.resolve_arp()
224 cls.pg4.resolve_arp()
227 cls.pg8.configure_ipv4_neighbors()
230 cls.pg5.resolve_ndp()
231 cls.pg5.disable_ipv6_ra()
233 cls.pg6.resolve_ndp()
234 cls.pg6.disable_ipv6_ra()
236 super(MethodHolder, cls).tearDownClass()
240 def tearDownClass(cls):
241 super(MethodHolder, cls).tearDownClass()
244 self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
246 """Create a packet stream to tickle the plugin
248 :param VppInterface src_if: Source interface for packet stream
249 :param VppInterface src_if: Dst interface for packet stream
257 packets = random.randint(1, self.max_number_of_packets)
259 for p in range(0, packets):
261 pkt_size = random.choice(self.pg_if_packet_sizes)
262 info = self.create_packet_info(src_if, dst_if)
263 payload = self.info_to_payload(info)
264 p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
266 p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
268 p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
269 p /= UDP(sport=1234, dport=4321)
272 self.extend_packet(p, pkt_size)
275 def verify_cflow_data(self, decoder, capture, cflow):
281 if cflow.haslayer(Data):
282 data = decoder.decode_data_set(cflow.getlayer(Set))
284 self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
285 self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
287 def send_packets(self, src_if=None, dst_if=None):
292 self.pg_enable_capture([dst_if])
293 src_if.add_stream(self.pkts)
295 return dst_if.get_capture(len(self.pkts))
297 def verify_cflow_data_detail(
302 data_set={1: "octets", 2: "packets"},
307 print(capture[0].show())
308 if cflow.haslayer(Data):
309 data = decoder.decode_data_set(cflow.getlayer(Set))
313 ip_layer = capture[0][IP] if capture[0].haslayer(IP) else None
315 ip_layer = capture[0][IPv6] if capture[0].haslayer(IPv6) else None
316 if data_set is not None:
318 # skip flow if ingress/egress interface is 0
319 if int(binascii.hexlify(record[10]), 16) == 0:
321 if int(binascii.hexlify(record[14]), 16) == 0:
324 for field in data_set:
325 value = data_set[field]
326 if value == "octets":
329 value += 40 # ??? is this correct
330 elif value == "packets":
332 elif value == "src_ip":
334 ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
336 ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
337 value = int(binascii.hexlify(ip), 16)
338 elif value == "dst_ip":
340 ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
342 ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
343 value = int(binascii.hexlify(ip), 16)
344 elif value == "sport":
345 value = int(capture[0][UDP].sport)
346 elif value == "dport":
347 value = int(capture[0][UDP].dport)
349 int(binascii.hexlify(record[field]), 16), value
351 if field_count is not None:
353 self.assertEqual(len(record), field_count)
355 def verify_cflow_data_notimer(self, decoder, capture, cflows):
358 if cflow.haslayer(Data):
359 data = decoder.decode_data_set(cflow.getlayer(Set))
361 raise Exception("No CFLOW data")
366 self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
367 self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
368 self.assertEqual(len(capture), idx)
370 def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
371 """wait for CFLOW packet and verify its correctness
373 :param timeout: how long to wait
376 self.logger.info("IPFIX: Waiting for CFLOW packet")
377 # self.logger.debug(self.vapi.ppcli("show flow table"))
378 p = collector_intf.wait_for_packet(timeout=timeout)
379 self.assertEqual(p[Set].setID, set_id)
380 # self.logger.debug(self.vapi.ppcli("show flow table"))
381 self.logger.debug(ppp("IPFIX: Got packet:", p))
386 @tag_fixme_vpp_workers
387 @tag_fixme_ubuntu2204
389 class Flowprobe(MethodHolder):
390 """Template verification, timer tests"""
394 super(Flowprobe, cls).setUpClass()
397 def tearDownClass(cls):
398 super(Flowprobe, cls).tearDownClass()
401 """timer less than template timeout"""
402 self.logger.info("FFP_TEST_START_0001")
403 self.pg_enable_capture(self.pg_interfaces)
406 ipfix = VppCFLOW(test=self, active=2)
407 ipfix.add_vpp_config()
409 ipfix_decoder = IPFIXDecoder()
410 # template packet should arrive immediately
411 templates = ipfix.verify_templates(ipfix_decoder)
413 self.create_stream(packets=1)
415 capture = self.pg2.get_capture(1)
417 # make sure the one packet we expect actually showed up
418 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
419 self.verify_cflow_data(ipfix_decoder, capture, cflow)
421 ipfix.remove_vpp_config()
422 self.logger.info("FFP_TEST_FINISH_0001")
424 @unittest.skipUnless(
425 config.extended, "Test is unstable (assertion error, needs to be fixed"
428 """timer greater than template timeout [UNSTABLE, FIX ME]"""
429 self.logger.info("FFP_TEST_START_0002")
430 self.pg_enable_capture(self.pg_interfaces)
433 ipfix = VppCFLOW(test=self, timeout=3, active=4)
434 ipfix.add_vpp_config()
436 ipfix_decoder = IPFIXDecoder()
437 # template packet should arrive immediately
438 ipfix.verify_templates()
440 self.create_stream(packets=2)
442 capture = self.pg2.get_capture(2)
444 # next set of template packet should arrive after 20 seconds
445 # template packet should arrive within 20 s
446 templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
448 # make sure the one packet we expect actually showed up
449 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
450 self.verify_cflow_data(ipfix_decoder, capture, cflow)
452 ipfix.remove_vpp_config()
453 self.logger.info("FFP_TEST_FINISH_0002")
455 def test_cflow_packet(self):
456 """verify cflow packet fields"""
457 self.logger.info("FFP_TEST_START_0000")
458 self.pg_enable_capture(self.pg_interfaces)
462 test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
464 ipfix.add_vpp_config()
466 route_9001 = VppIpRoute(
470 [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
472 route_9001.add_vpp_config()
474 ipfix_decoder = IPFIXDecoder()
475 templates = ipfix.verify_templates(ipfix_decoder, count=1)
479 Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
480 / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
481 / TCP(sport=1234, dport=4321, flags=80)
486 nowUTC = int(time.time())
487 nowUNIX = nowUTC + 2208988800
488 self.send_packets(src_if=self.pg7, dst_if=self.pg8)
490 cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
491 self.collector.get_capture(2)
493 if cflow[0].haslayer(IPFIX):
494 self.assertEqual(cflow[IPFIX].version, 10)
495 self.assertEqual(cflow[IPFIX].observationDomainID, 1)
496 self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
497 self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
498 if cflow.haslayer(Data):
499 record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
501 self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
503 self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
505 self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
507 self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
509 self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
511 self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
512 flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
513 # flow start timestamp
514 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
515 flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
517 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
519 self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
521 self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
523 self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
525 self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
527 self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
529 self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
531 self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
533 ipfix.remove_vpp_config()
534 self.logger.info("FFP_TEST_FINISH_0000")
536 def test_flow_entry_reuse(self):
537 """Verify flow entry reuse doesn't accumulate meta info"""
538 self.pg_enable_capture(self.pg_interfaces)
541 # enable ip4 datapath for an interface
542 # set active and passive timers
553 ipfix.add_vpp_config()
555 # template packet should arrive immediately
556 ipfix_decoder = IPFIXDecoder()
557 templates = ipfix.verify_templates(ipfix_decoder, count=1)
562 Ether(src=self.pg3.remote_mac, dst=self.pg4.local_mac)
563 / IP(src=self.pg3.remote_ip4, dst=self.pg4.remote_ip4)
564 / TCP(sport=1234, dport=4321)
569 # send the tcp packet two times, each time with new set of flags
571 TCP_F_SYN | TCP_F_ACK,
572 TCP_F_RST | TCP_F_PSH,
575 self.pkts[0][TCP].flags = f
576 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
578 # verify meta info - packet/octet delta and tcp flags
579 cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=6)
580 self.verify_cflow_data(ipfix_decoder, capture, cflow)
581 self.verify_cflow_data_detail(
586 IPFIX_TCP_FLAGS_ID: f,
587 IPFIX_SRC_TRANS_PORT_ID: 1234,
588 IPFIX_DST_TRANS_PORT_ID: 4321,
592 self.collector.get_capture(3)
595 ipfix.remove_vpp_config()
597 def test_interface_dump(self):
598 """Dump interfaces with IPFIX flow record generation enabled"""
599 self.logger.info("FFP_TEST_START_0003")
601 # Enable feature for 3 interfaces
602 ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx")
603 ipfix1.add_vpp_config()
605 ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx")
606 ipfix2.enable_flowprobe_feature()
608 ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both")
609 ipfix3.enable_flowprobe_feature()
611 # When request "all", dump should contain all enabled interfaces
612 dump = self.vapi.flowprobe_interface_dump()
613 self.assertEqual(len(dump), 3)
615 # Verify 1st interface
616 self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index)
618 dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2
622 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
625 # Verify 2nd interface
626 self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index)
628 dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
632 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
635 # Verify 3rd interface
636 self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index)
638 dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6
642 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
645 # When request 2nd interface, dump should contain only the specified interface
646 dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index)
647 self.assertEqual(len(dump), 1)
649 # Verify 2nd interface
650 self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index)
652 dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
656 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
659 # When request 99th interface, dump should be empty
660 dump = self.vapi.flowprobe_interface_dump(sw_if_index=99)
661 self.assertEqual(len(dump), 0)
663 ipfix1.remove_vpp_config()
664 ipfix2.remove_vpp_config()
665 ipfix3.remove_vpp_config()
666 self.logger.info("FFP_TEST_FINISH_0003")
668 def test_get_params(self):
669 """Get IPFIX flow record generation parameters"""
670 self.logger.info("FFP_TEST_START_0004")
672 # Enable feature for an interface with custom parameters
673 ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4")
674 ipfix.add_vpp_config()
676 # Get and verify parameters
677 params = self.vapi.flowprobe_get_params()
678 self.assertEqual(params.active_timer, 20)
679 self.assertEqual(params.passive_timer, 40)
680 record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
681 record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
682 record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
683 self.assertEqual(params.record_flags, record_flags)
685 ipfix.remove_vpp_config()
686 self.logger.info("FFP_TEST_FINISH_0004")
689 class DatapathTestsHolder(object):
690 """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
694 super(DatapathTestsHolder, cls).setUpClass()
697 def tearDownClass(cls):
698 super(DatapathTestsHolder, cls).tearDownClass()
700 def test_templatesL2(self):
701 """verify template on L2 datapath"""
702 self.logger.info("FFP_TEST_START_0000")
703 self.pg_enable_capture(self.pg_interfaces)
706 test=self, intf=self.intf1, layer="l2", direction=self.direction
708 ipfix.add_vpp_config()
710 # template packet should arrive immediately
711 self.vapi.ipfix_flush()
712 ipfix.verify_templates(timeout=3, count=1)
713 self.collector.get_capture(1)
715 ipfix.remove_vpp_config()
716 self.logger.info("FFP_TEST_FINISH_0000")
718 def test_L2onL2(self):
719 """L2 data on L2 datapath"""
720 self.logger.info("FFP_TEST_START_0001")
721 self.pg_enable_capture(self.pg_interfaces)
725 test=self, intf=self.intf1, layer="l2", direction=self.direction
727 ipfix.add_vpp_config()
729 ipfix_decoder = IPFIXDecoder()
730 # template packet should arrive immediately
731 templates = ipfix.verify_templates(ipfix_decoder, count=1)
733 self.create_stream(packets=1)
734 capture = self.send_packets()
736 # make sure the one packet we expect actually showed up
737 self.vapi.ipfix_flush()
738 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
739 self.verify_cflow_data_detail(
743 {2: "packets", 256: 8, 61: (self.direction == "tx")},
745 self.collector.get_capture(2)
747 ipfix.remove_vpp_config()
748 self.logger.info("FFP_TEST_FINISH_0001")
750 def test_L3onL2(self):
751 """L3 data on L2 datapath"""
752 self.logger.info("FFP_TEST_START_0002")
753 self.pg_enable_capture(self.pg_interfaces)
757 test=self, intf=self.intf1, layer="l3", direction=self.direction
759 ipfix.add_vpp_config()
761 ipfix_decoder = IPFIXDecoder()
762 # template packet should arrive immediately
763 templates = ipfix.verify_templates(ipfix_decoder, count=2)
765 self.create_stream(packets=1)
766 capture = self.send_packets()
768 # make sure the one packet we expect actually showed up
769 self.vapi.ipfix_flush()
770 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
771 self.verify_cflow_data_detail(
780 61: (self.direction == "tx"),
784 self.collector.get_capture(3)
786 ipfix.remove_vpp_config()
787 self.logger.info("FFP_TEST_FINISH_0002")
789 def test_L234onL2(self):
790 """L2/3/4 data on L2 datapath"""
791 self.pg_enable_capture(self.pg_interfaces)
795 test=self, intf=self.intf1, layer="l2 l3 l4", direction=self.direction
797 ipfix.add_vpp_config()
799 ipfix_decoder = IPFIXDecoder()
800 # template packet should arrive immediately
801 tmpl_l2_field_count = TMPL_COMMON_FIELD_COUNT + TMPL_L2_FIELD_COUNT
802 tmpl_ip_field_count = (
803 TMPL_COMMON_FIELD_COUNT
804 + TMPL_L2_FIELD_COUNT
805 + TMPL_L3_FIELD_COUNT
806 + TMPL_L4_FIELD_COUNT
808 templates = ipfix.verify_templates(
811 field_count_in=(tmpl_l2_field_count, tmpl_ip_field_count),
814 # verify IPv4 and IPv6 flows
815 for ip_ver in ("v4", "v6"):
816 self.create_stream(packets=1, ip_ver=ip_ver)
817 capture = self.send_packets()
819 # make sure the one packet we expect actually showed up
820 self.vapi.ipfix_flush()
821 cflow = self.wait_for_cflow_packet(
822 self.collector, templates[1 if ip_ver == "v4" else 2]
824 src_ip_id = 8 if ip_ver == "v4" else 27
825 dst_ip_id = 12 if ip_ver == "v4" else 28
826 self.verify_cflow_data_detail(
832 256: 8 if ip_ver == "v4" else 56710,
838 61: (self.direction == "tx"),
841 field_count=tmpl_ip_field_count,
847 Ether(dst=self.pg2.local_mac, src=self.pg1.remote_mac)
852 capture = self.send_packets()
854 # make sure the one packet we expect actually showed up
855 self.vapi.ipfix_flush()
856 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
857 self.verify_cflow_data_detail(
861 {2: "packets", 256: 2440, 61: (self.direction == "tx")},
862 field_count=tmpl_l2_field_count,
865 self.collector.get_capture(6)
867 ipfix.remove_vpp_config()
869 def test_L4onL2(self):
870 """L4 data on L2 datapath"""
871 self.logger.info("FFP_TEST_START_0003")
872 self.pg_enable_capture(self.pg_interfaces)
876 test=self, intf=self.intf1, layer="l4", direction=self.direction
878 ipfix.add_vpp_config()
880 ipfix_decoder = IPFIXDecoder()
881 # template packet should arrive immediately
882 templates = ipfix.verify_templates(ipfix_decoder, count=2)
884 self.create_stream(packets=1)
885 capture = self.send_packets()
887 # make sure the one packet we expect actually showed up
888 self.vapi.ipfix_flush()
889 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
890 self.verify_cflow_data_detail(
894 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
897 self.collector.get_capture(3)
899 ipfix.remove_vpp_config()
900 self.logger.info("FFP_TEST_FINISH_0003")
902 def test_templatesIp4(self):
903 """verify templates on IP4 datapath"""
904 self.logger.info("FFP_TEST_START_0000")
906 self.pg_enable_capture(self.pg_interfaces)
909 test=self, intf=self.intf1, datapath="ip4", direction=self.direction
911 ipfix.add_vpp_config()
913 # template packet should arrive immediately
914 self.vapi.ipfix_flush()
915 ipfix.verify_templates(timeout=3, count=1)
916 self.collector.get_capture(1)
918 ipfix.remove_vpp_config()
920 self.logger.info("FFP_TEST_FINISH_0000")
922 def test_L2onIP4(self):
923 """L2 data on IP4 datapath"""
924 self.logger.info("FFP_TEST_START_0001")
925 self.pg_enable_capture(self.pg_interfaces)
933 direction=self.direction,
935 ipfix.add_vpp_config()
937 ipfix_decoder = IPFIXDecoder()
938 # template packet should arrive immediately
939 templates = ipfix.verify_templates(ipfix_decoder, count=1)
941 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
942 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
944 # make sure the one packet we expect actually showed up
945 self.vapi.ipfix_flush()
946 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
947 self.verify_cflow_data_detail(
951 {2: "packets", 256: 8, 61: (self.direction == "tx")},
954 # expected two templates and one cflow packet
955 self.collector.get_capture(2)
957 ipfix.remove_vpp_config()
958 self.logger.info("FFP_TEST_FINISH_0001")
960 def test_L3onIP4(self):
961 """L3 data on IP4 datapath"""
962 self.logger.info("FFP_TEST_START_0002")
963 self.pg_enable_capture(self.pg_interfaces)
971 direction=self.direction,
973 ipfix.add_vpp_config()
975 ipfix_decoder = IPFIXDecoder()
976 # template packet should arrive immediately
977 templates = ipfix.verify_templates(ipfix_decoder, count=1)
979 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
980 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
982 # make sure the one packet we expect actually showed up
983 self.vapi.ipfix_flush()
984 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
985 self.verify_cflow_data_detail(
994 61: (self.direction == "tx"),
998 # expected two templates and one cflow packet
999 self.collector.get_capture(2)
1001 ipfix.remove_vpp_config()
1002 self.logger.info("FFP_TEST_FINISH_0002")
1004 def test_L4onIP4(self):
1005 """L4 data on IP4 datapath"""
1006 self.logger.info("FFP_TEST_START_0003")
1007 self.pg_enable_capture(self.pg_interfaces)
1015 direction=self.direction,
1017 ipfix.add_vpp_config()
1019 ipfix_decoder = IPFIXDecoder()
1020 # template packet should arrive immediately
1021 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1023 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
1024 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1026 # make sure the one packet we expect actually showed up
1027 self.vapi.ipfix_flush()
1028 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1029 self.verify_cflow_data_detail(
1033 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
1036 # expected two templates and one cflow packet
1037 self.collector.get_capture(2)
1039 ipfix.remove_vpp_config()
1040 self.logger.info("FFP_TEST_FINISH_0003")
1042 def test_templatesIP6(self):
1043 """verify templates on IP6 datapath"""
1044 self.logger.info("FFP_TEST_START_0000")
1045 self.pg_enable_capture(self.pg_interfaces)
1048 test=self, intf=self.intf1, datapath="ip6", direction=self.direction
1050 ipfix.add_vpp_config()
1052 # template packet should arrive immediately
1053 ipfix.verify_templates(count=1)
1054 self.collector.get_capture(1)
1056 ipfix.remove_vpp_config()
1058 self.logger.info("FFP_TEST_FINISH_0000")
1060 def test_L2onIP6(self):
1061 """L2 data on IP6 datapath"""
1062 self.logger.info("FFP_TEST_START_0001")
1063 self.pg_enable_capture(self.pg_interfaces)
1071 direction=self.direction,
1073 ipfix.add_vpp_config()
1075 ipfix_decoder = IPFIXDecoder()
1076 # template packet should arrive immediately
1077 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1079 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1080 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1082 # make sure the one packet we expect actually showed up
1083 self.vapi.ipfix_flush()
1084 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1085 self.verify_cflow_data_detail(
1089 {2: "packets", 256: 56710, 61: (self.direction == "tx")},
1093 # expected two templates and one cflow packet
1094 self.collector.get_capture(2)
1096 ipfix.remove_vpp_config()
1097 self.logger.info("FFP_TEST_FINISH_0001")
1099 def test_L3onIP6(self):
1100 """L3 data on IP6 datapath"""
1101 self.logger.info("FFP_TEST_START_0002")
1102 self.pg_enable_capture(self.pg_interfaces)
1110 direction=self.direction,
1112 ipfix.add_vpp_config()
1114 ipfix_decoder = IPFIXDecoder()
1115 # template packet should arrive immediately
1116 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1118 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1119 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1121 # make sure the one packet we expect actually showed up
1122 self.vapi.ipfix_flush()
1123 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1124 self.verify_cflow_data_detail(
1128 {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
1132 # expected two templates and one cflow packet
1133 self.collector.get_capture(2)
1135 ipfix.remove_vpp_config()
1136 self.logger.info("FFP_TEST_FINISH_0002")
1138 def test_L4onIP6(self):
1139 """L4 data on IP6 datapath"""
1140 self.logger.info("FFP_TEST_START_0003")
1141 self.pg_enable_capture(self.pg_interfaces)
1149 direction=self.direction,
1151 ipfix.add_vpp_config()
1153 ipfix_decoder = IPFIXDecoder()
1154 # template packet should arrive immediately
1155 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1157 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1158 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1160 # make sure the one packet we expect actually showed up
1161 self.vapi.ipfix_flush()
1162 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1163 self.verify_cflow_data_detail(
1167 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
1171 # expected two templates and one cflow packet
1172 self.collector.get_capture(2)
1174 ipfix.remove_vpp_config()
1175 self.logger.info("FFP_TEST_FINISH_0003")
1177 def test_0001(self):
1178 """no timers, one CFLOW packet, 9 Flows inside"""
1179 self.logger.info("FFP_TEST_START_0001")
1180 self.pg_enable_capture(self.pg_interfaces)
1183 ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
1184 ipfix.add_vpp_config()
1186 ipfix_decoder = IPFIXDecoder()
1187 # template packet should arrive immediately
1188 templates = ipfix.verify_templates(ipfix_decoder)
1190 self.create_stream(packets=9)
1191 capture = self.send_packets()
1193 # make sure the one packet we expect actually showed up
1194 self.vapi.ipfix_flush()
1195 cflow = self.wait_for_cflow_packet(self.collector, templates[1])
1196 self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
1197 self.collector.get_capture(4)
1199 ipfix.remove_vpp_config()
1200 self.logger.info("FFP_TEST_FINISH_0001")
1202 def test_0002(self):
1203 """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
1204 self.logger.info("FFP_TEST_START_0002")
1205 self.pg_enable_capture(self.pg_interfaces)
1208 ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
1209 ipfix.add_vpp_config()
1211 ipfix_decoder = IPFIXDecoder()
1212 # template packet should arrive immediately
1213 self.vapi.ipfix_flush()
1214 templates = ipfix.verify_templates(ipfix_decoder)
1216 self.create_stream(packets=6)
1217 capture = self.send_packets()
1219 # make sure the one packet we expect actually showed up
1221 self.vapi.ipfix_flush()
1222 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1223 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1224 self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
1225 self.collector.get_capture(5)
1227 ipfix.remove_vpp_config()
1228 self.logger.info("FFP_TEST_FINISH_0002")
1231 @tag_fixme_vpp_workers
1232 class DatapathTx(MethodHolder, DatapathTestsHolder):
1233 """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
1240 def test_rewritten_traffic(self):
1241 """Rewritten traffic (from subif to ipfix if)"""
1242 self.pg_enable_capture(self.pg_interfaces)
1245 # prepare a sub-interface
1246 subif = VppDot1ADSubint(self, self.pg7, 0, 300, 400)
1250 # enable ip4 datapath for an interface
1256 direction=self.direction,
1258 ipfix.add_vpp_config()
1260 # template packet should arrive immediately
1261 ipfix_decoder = IPFIXDecoder()
1262 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1264 # forward some traffic through the ipfix interface
1269 [VppRoutePath(self.pg8.remote_ip4, self.pg8.sw_if_index)],
1271 route.add_vpp_config()
1273 # prepare an IPv4 packet (subif => ipfix interface)
1275 Ether(src=subif.remote_mac, dst=self.pg7.local_mac)
1276 / IP(src=subif.remote_ip4, dst="9.0.0.1")
1277 / UDP(sport=1234, dport=4321)
1278 / Raw(b"\xa5" * 123)
1281 subif.add_dot1ad_layer(pkt, 300, 400),
1285 capture = self.send_packets(self.pg7, self.pg8)
1287 # wait for a flow and verify it
1288 self.vapi.ipfix_flush()
1289 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1290 self.verify_cflow_data(ipfix_decoder, capture, cflow)
1291 self.verify_cflow_data_detail(
1296 IPFIX_SRC_IP4_ADDR_ID: "src_ip",
1297 IPFIX_DST_IP4_ADDR_ID: "dst_ip",
1298 IPFIX_SRC_TRANS_PORT_ID: "sport",
1299 IPFIX_DST_TRANS_PORT_ID: "dport",
1300 IPFIX_FLOW_DIRECTION_ID: (self.direction == "tx"),
1304 self.collector.get_capture(2)
1307 route.remove_vpp_config()
1308 subif.remove_vpp_config()
1309 ipfix.remove_vpp_config()
1312 @tag_fixme_vpp_workers
1313 class DatapathRx(MethodHolder, DatapathTestsHolder):
1314 """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
1322 @unittest.skipUnless(config.extended, "part of extended tests")
1323 class DisableIPFIX(MethodHolder):
1327 def setUpClass(cls):
1328 super(DisableIPFIX, cls).setUpClass()
1331 def tearDownClass(cls):
1332 super(DisableIPFIX, cls).tearDownClass()
1334 def test_0001(self):
1335 """disable IPFIX after first packets"""
1336 self.logger.info("FFP_TEST_START_0001")
1337 self.pg_enable_capture(self.pg_interfaces)
1340 ipfix = VppCFLOW(test=self)
1341 ipfix.add_vpp_config()
1343 ipfix_decoder = IPFIXDecoder()
1344 # template packet should arrive immediately
1345 templates = ipfix.verify_templates(ipfix_decoder)
1347 self.create_stream()
1350 # make sure the one packet we expect actually showed up
1351 self.vapi.ipfix_flush()
1352 self.wait_for_cflow_packet(self.collector, templates[1])
1353 self.collector.get_capture(4)
1356 ipfix.disable_exporter()
1357 self.pg_enable_capture([self.collector])
1361 # make sure no one packet arrived in 1 minute
1362 self.vapi.ipfix_flush()
1363 self.sleep(1, "wait before verifying no packets sent")
1364 self.collector.assert_nothing_captured()
1366 ipfix.remove_vpp_config()
1367 self.logger.info("FFP_TEST_FINISH_0001")
1370 @unittest.skipUnless(config.extended, "part of extended tests")
1371 class ReenableIPFIX(MethodHolder):
1372 """Re-enable IPFIX"""
1375 def setUpClass(cls):
1376 super(ReenableIPFIX, cls).setUpClass()
1379 def tearDownClass(cls):
1380 super(ReenableIPFIX, cls).tearDownClass()
1382 def test_0011(self):
1383 """disable IPFIX after first packets and re-enable after few packets"""
1384 self.logger.info("FFP_TEST_START_0001")
1385 self.pg_enable_capture(self.pg_interfaces)
1388 ipfix = VppCFLOW(test=self)
1389 ipfix.add_vpp_config()
1391 ipfix_decoder = IPFIXDecoder()
1392 # template packet should arrive immediately
1393 templates = ipfix.verify_templates(ipfix_decoder)
1395 self.create_stream(packets=5)
1398 # make sure the one packet we expect actually showed up
1399 self.vapi.ipfix_flush()
1400 self.wait_for_cflow_packet(self.collector, templates[1])
1401 self.collector.get_capture(4)
1404 ipfix.disable_exporter()
1405 self.vapi.ipfix_flush()
1406 self.pg_enable_capture([self.collector])
1410 # make sure no one packet arrived in active timer span
1411 self.vapi.ipfix_flush()
1412 self.sleep(1, "wait before verifying no packets sent")
1413 self.collector.assert_nothing_captured()
1414 self.pg2.get_capture(5)
1417 ipfix.enable_exporter()
1419 capture = self.collector.get_capture(4)
1423 self.assertTrue(p.haslayer(IPFIX))
1424 if p.haslayer(Template):
1426 self.assertTrue(nr_templates, 3)
1428 self.assertTrue(p.haslayer(IPFIX))
1429 if p.haslayer(Data):
1431 self.assertTrue(nr_templates, 1)
1433 ipfix.remove_vpp_config()
1434 self.logger.info("FFP_TEST_FINISH_0001")
1437 @unittest.skipUnless(config.extended, "part of extended tests")
1438 class DisableFP(MethodHolder):
1439 """Disable Flowprobe feature"""
1442 def setUpClass(cls):
1443 super(DisableFP, cls).setUpClass()
1446 def tearDownClass(cls):
1447 super(DisableFP, cls).tearDownClass()
1449 def test_0001(self):
1450 """disable flowprobe feature after first packets"""
1451 self.logger.info("FFP_TEST_START_0001")
1452 self.pg_enable_capture(self.pg_interfaces)
1454 ipfix = VppCFLOW(test=self)
1455 ipfix.add_vpp_config()
1457 ipfix_decoder = IPFIXDecoder()
1458 # template packet should arrive immediately
1459 templates = ipfix.verify_templates(ipfix_decoder)
1461 self.create_stream()
1464 # make sure the one packet we expect actually showed up
1465 self.vapi.ipfix_flush()
1466 self.wait_for_cflow_packet(self.collector, templates[1])
1467 self.collector.get_capture(4)
1470 ipfix.disable_flowprobe_feature()
1471 self.pg_enable_capture([self.collector])
1475 # make sure no one packet arrived in active timer span
1476 self.vapi.ipfix_flush()
1477 self.sleep(1, "wait before verifying no packets sent")
1478 self.collector.assert_nothing_captured()
1480 # enable FPP feature so the remove_vpp_config() doesn't fail
1481 # due to missing feature on interface.
1482 ipfix.enable_flowprobe_feature()
1484 ipfix.remove_vpp_config()
1485 self.logger.info("FFP_TEST_FINISH_0001")
1487 def test_no_leftover_flows_after_disabling(self):
1488 """disable flowprobe feature and expect no leftover flows"""
1489 self.pg_enable_capture(self.pg_interfaces)
1492 # enable ip4 datapath for an interface
1493 # set active and passive timers
1504 ipfix.add_vpp_config()
1506 # template packet should arrive immediately
1507 ipfix.verify_templates(count=1)
1509 # send some ip4 packets
1510 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5)
1511 self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1513 # disable feature for the interface
1514 # currently stored ip4 flows should be removed
1515 ipfix.disable_flowprobe_feature()
1517 # no leftover ip4 flows are expected
1518 self.pg_enable_capture([self.collector])
1519 self.sleep(12, "wait for leftover ip4 flows during three passive intervals")
1520 self.collector.assert_nothing_captured()
1522 # re-enable feature for the interface
1523 ipfix.enable_flowprobe_feature()
1525 # template packet should arrive immediately
1526 ipfix_decoder = IPFIXDecoder()
1527 self.vapi.ipfix_flush()
1528 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1530 # send some ip4 packets
1531 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5)
1532 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1534 # verify meta info - packet/octet delta
1535 self.vapi.ipfix_flush()
1536 cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=8)
1537 self.verify_cflow_data(ipfix_decoder, capture, cflow)
1539 self.collector.get_capture(2)
1542 ipfix.remove_vpp_config()
1545 @unittest.skipUnless(config.extended, "part of extended tests")
1546 class ReenableFP(MethodHolder):
1547 """Re-enable Flowprobe feature"""
1550 def setUpClass(cls):
1551 super(ReenableFP, cls).setUpClass()
1554 def tearDownClass(cls):
1555 super(ReenableFP, cls).tearDownClass()
1557 def test_0001(self):
1558 """disable flowprobe feature after first packets and re-enable
1559 after few packets"""
1560 self.logger.info("FFP_TEST_START_0001")
1561 self.pg_enable_capture(self.pg_interfaces)
1564 ipfix = VppCFLOW(test=self)
1565 ipfix.add_vpp_config()
1567 ipfix_decoder = IPFIXDecoder()
1568 # template packet should arrive immediately
1569 self.vapi.ipfix_flush()
1570 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1572 self.create_stream()
1575 # make sure the one packet we expect actually showed up
1576 self.vapi.ipfix_flush()
1577 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1578 self.collector.get_capture(4)
1580 # disable FPP feature
1581 ipfix.disable_flowprobe_feature()
1582 self.pg_enable_capture([self.collector])
1586 # make sure no one packet arrived in active timer span
1587 self.vapi.ipfix_flush()
1588 self.sleep(5, "wait before verifying no packets sent")
1589 self.collector.assert_nothing_captured()
1591 # enable FPP feature
1592 ipfix.enable_flowprobe_feature()
1593 self.vapi.ipfix_flush()
1594 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1598 # make sure the next packets (templates and data) we expect actually
1600 self.vapi.ipfix_flush()
1601 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1602 self.collector.get_capture(4)
1604 ipfix.remove_vpp_config()
1605 self.logger.info("FFP_TEST_FINISH_0001")
1608 if __name__ == "__main__":
1609 unittest.main(testRunner=VppTestRunner)