2 from __future__ import print_function
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, TCP, UDP
12 from scapy.layers.inet6 import IPv6
13 from scapy.contrib.lacp import SlowProtocol, LACP
15 from config import config
16 from framework import VppTestCase
17 from asfframework import (
18 tag_fixme_vpp_workers,
26 from vpp_object import VppObject
28 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
29 from vpp_ip_route import VppIpRoute, VppRoutePath
30 from vpp_papi.macaddress import mac_ntop
31 from socket import inet_ntop
32 from vpp_papi import VppEnum
35 TMPL_COMMON_FIELD_COUNT = 6
36 TMPL_L2_FIELD_COUNT = 3
37 TMPL_L3_FIELD_COUNT = 4
38 TMPL_L4_FIELD_COUNT = 3
40 IPFIX_TCP_FLAGS_ID = 6
41 IPFIX_SRC_TRANS_PORT_ID = 7
42 IPFIX_DST_TRANS_PORT_ID = 11
54 class VppCFLOW(VppObject):
55 """CFLOW object for IPFIX exporter and Flowprobe feature"""
71 self._intf_obj = getattr(self._test, intf)
73 if passive == 0 or passive < active:
74 self._passive = active + 1
76 self._passive = passive
77 self._datapath = datapath # l2 ip4 ip6
78 self._collect = layer # l2 l3 l4
79 self._direction = direction # rx tx both
80 self._timeout = timeout
82 self._configured = False
84 def add_vpp_config(self):
85 self.enable_exporter()
89 if "l2" in self._collect.lower():
90 l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
91 if "l3" in self._collect.lower():
92 l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
93 if "l4" in self._collect.lower():
94 l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
95 self._test.vapi.flowprobe_set_params(
96 record_flags=(l2_flag | l3_flag | l4_flag),
97 active_timer=self._active,
98 passive_timer=self._passive,
100 self.enable_flowprobe_feature()
101 self._test.vapi.cli("ipfix flush")
102 self._configured = True
104 def remove_vpp_config(self):
105 self.disable_exporter()
106 self.disable_flowprobe_feature()
107 self._test.vapi.cli("ipfix flush")
108 self._configured = False
110 def enable_exporter(self):
111 self._test.vapi.set_ipfix_exporter(
112 collector_address=self._test.pg0.remote_ip4,
113 src_address=self._test.pg0.local_ip4,
115 template_interval=self._timeout,
118 def _enable_disable_flowprobe_feature(self, is_add):
120 "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
121 "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
122 "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
125 "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
126 "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
127 "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
129 self._test.vapi.flowprobe_interface_add_del(
131 which=which_map[self._datapath],
132 direction=direction_map[self._direction],
133 sw_if_index=self._intf_obj.sw_if_index,
136 def enable_flowprobe_feature(self):
137 self._enable_disable_flowprobe_feature(is_add=True)
139 def disable_exporter(self):
140 self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
142 def disable_flowprobe_feature(self):
143 self._enable_disable_flowprobe_feature(is_add=False)
146 return "ipfix-collector-%s-%s" % (self._src, self.dst)
148 def query_vpp_config(self):
149 return self._configured
151 def verify_templates(self, decoder=None, timeout=1, count=3, field_count_in=None):
153 self._test.assertIn(count, (1, 2, 3))
154 for _ in range(count):
155 p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
156 self._test.assertTrue(p.haslayer(IPFIX))
157 self._test.assertTrue(p.haslayer(Template))
158 if decoder is not None:
159 templates.append(p[Template].templateID)
160 decoder.add_template(p.getlayer(Template))
161 if field_count_in is not None:
162 self._test.assertIn(p[Template].fieldCount, field_count_in)
166 class MethodHolder(VppTestCase):
167 """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
171 max_number_of_packets = 10
177 Perform standard class setup (defined by class method setUpClass in
178 class VppTestCase) before running the test case, set test case related
179 variables and configure VPP.
181 super(MethodHolder, cls).setUpClass()
182 if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
187 # Create pg interfaces
188 cls.create_pg_interfaces(range(9))
191 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
193 # Create BD with MAC learning and unknown unicast flooding disabled
194 # and put interfaces to this BD
195 cls.vapi.bridge_domain_add_del_v2(
196 bd_id=1, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
198 cls.vapi.sw_interface_set_l2_bridge(
199 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
201 cls.vapi.sw_interface_set_l2_bridge(
202 rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
205 # Set up all interfaces
206 for i in cls.pg_interfaces:
210 cls.pg0.configure_ipv4_neighbors()
211 cls.collector = cls.pg0
214 cls.pg1.resolve_arp()
216 cls.pg2.resolve_arp()
218 cls.pg3.resolve_arp()
220 cls.pg4.resolve_arp()
223 cls.pg8.configure_ipv4_neighbors()
226 cls.pg5.resolve_ndp()
227 cls.pg5.disable_ipv6_ra()
229 cls.pg6.resolve_ndp()
230 cls.pg6.disable_ipv6_ra()
232 super(MethodHolder, cls).tearDownClass()
236 def tearDownClass(cls):
237 super(MethodHolder, cls).tearDownClass()
240 self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
242 """Create a packet stream to tickle the plugin
244 :param VppInterface src_if: Source interface for packet stream
245 :param VppInterface src_if: Dst interface for packet stream
253 packets = random.randint(1, self.max_number_of_packets)
255 for p in range(0, packets):
257 pkt_size = random.choice(self.pg_if_packet_sizes)
258 info = self.create_packet_info(src_if, dst_if)
259 payload = self.info_to_payload(info)
260 p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
262 p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
264 p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
265 p /= UDP(sport=1234, dport=4321)
268 self.extend_packet(p, pkt_size)
271 def verify_cflow_data(self, decoder, capture, cflow):
277 if cflow.haslayer(Data):
278 data = decoder.decode_data_set(cflow.getlayer(Set))
280 self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
281 self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
283 def send_packets(self, src_if=None, dst_if=None):
288 self.pg_enable_capture([dst_if])
289 src_if.add_stream(self.pkts)
291 return dst_if.get_capture(len(self.pkts))
293 def verify_cflow_data_detail(
298 data_set={1: "octets", 2: "packets"},
303 print(capture[0].show())
304 if cflow.haslayer(Data):
305 data = decoder.decode_data_set(cflow.getlayer(Set))
309 ip_layer = capture[0][IP] if capture[0].haslayer(IP) else None
311 ip_layer = capture[0][IPv6] if capture[0].haslayer(IPv6) else None
312 if data_set is not None:
314 # skip flow if ingress/egress interface is 0
315 if int(binascii.hexlify(record[10]), 16) == 0:
317 if int(binascii.hexlify(record[14]), 16) == 0:
320 for field in data_set:
321 value = data_set[field]
322 if value == "octets":
325 value += 40 # ??? is this correct
326 elif value == "packets":
328 elif value == "src_ip":
330 ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
332 ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
333 value = int(binascii.hexlify(ip), 16)
334 elif value == "dst_ip":
336 ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
338 ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
339 value = int(binascii.hexlify(ip), 16)
340 elif value == "sport":
341 value = int(capture[0][UDP].sport)
342 elif value == "dport":
343 value = int(capture[0][UDP].dport)
345 int(binascii.hexlify(record[field]), 16), value
347 if field_count is not None:
349 self.assertEqual(len(record), field_count)
351 def verify_cflow_data_notimer(self, decoder, capture, cflows):
354 if cflow.haslayer(Data):
355 data = decoder.decode_data_set(cflow.getlayer(Set))
357 raise Exception("No CFLOW data")
362 self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
363 self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
364 self.assertEqual(len(capture), idx)
366 def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
367 """wait for CFLOW packet and verify its correctness
369 :param timeout: how long to wait
372 self.logger.info("IPFIX: Waiting for CFLOW packet")
373 # self.logger.debug(self.vapi.ppcli("show flow table"))
374 p = collector_intf.wait_for_packet(timeout=timeout)
375 self.assertEqual(p[Set].setID, set_id)
376 # self.logger.debug(self.vapi.ppcli("show flow table"))
377 self.logger.debug(ppp("IPFIX: Got packet:", p))
382 @tag_fixme_vpp_workers
383 @tag_fixme_ubuntu2204
385 class Flowprobe(MethodHolder):
386 """Template verification, timer tests"""
390 super(Flowprobe, cls).setUpClass()
393 def tearDownClass(cls):
394 super(Flowprobe, cls).tearDownClass()
397 """timer less than template timeout"""
398 self.logger.info("FFP_TEST_START_0001")
399 self.pg_enable_capture(self.pg_interfaces)
402 ipfix = VppCFLOW(test=self, active=2)
403 ipfix.add_vpp_config()
405 ipfix_decoder = IPFIXDecoder()
406 # template packet should arrive immediately
407 templates = ipfix.verify_templates(ipfix_decoder)
409 self.create_stream(packets=1)
411 capture = self.pg2.get_capture(1)
413 # make sure the one packet we expect actually showed up
414 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
415 self.verify_cflow_data(ipfix_decoder, capture, cflow)
417 ipfix.remove_vpp_config()
418 self.logger.info("FFP_TEST_FINISH_0001")
421 """timer greater than template timeout"""
422 self.logger.info("FFP_TEST_START_0002")
423 self.pg_enable_capture(self.pg_interfaces)
426 ipfix = VppCFLOW(test=self, timeout=3, active=4)
427 ipfix.add_vpp_config()
429 ipfix_decoder = IPFIXDecoder()
430 # template packet should arrive immediately
431 ipfix.verify_templates()
433 self.create_stream(packets=2)
435 capture = self.pg2.get_capture(2)
437 # next set of template packet should arrive after 20 seconds
438 # template packet should arrive within 20 s
439 templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
441 # make sure the one packet we expect actually showed up
442 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
443 self.verify_cflow_data(ipfix_decoder, capture, cflow)
445 ipfix.remove_vpp_config()
446 self.logger.info("FFP_TEST_FINISH_0002")
448 def test_cflow_packet(self):
449 """verify cflow packet fields"""
450 self.logger.info("FFP_TEST_START_0000")
451 self.pg_enable_capture(self.pg_interfaces)
455 test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
457 ipfix.add_vpp_config()
459 route_9001 = VppIpRoute(
463 [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
465 route_9001.add_vpp_config()
467 ipfix_decoder = IPFIXDecoder()
468 templates = ipfix.verify_templates(ipfix_decoder, count=1)
472 Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
473 / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
474 / TCP(sport=1234, dport=4321, flags=80)
479 nowUTC = int(time.time())
480 nowUNIX = nowUTC + 2208988800
481 self.send_packets(src_if=self.pg7, dst_if=self.pg8)
483 cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
484 self.collector.get_capture(2)
486 if cflow[0].haslayer(IPFIX):
487 self.assertEqual(cflow[IPFIX].version, 10)
488 self.assertEqual(cflow[IPFIX].observationDomainID, 1)
489 self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
490 self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
491 if cflow.haslayer(Data):
492 record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
494 self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
496 self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
498 self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
500 self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
502 self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
504 self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
505 flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
506 # flow start timestamp
507 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
508 flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
510 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
512 self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
514 self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
516 self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
518 self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
520 self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
522 self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
524 self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
526 ipfix.remove_vpp_config()
527 self.logger.info("FFP_TEST_FINISH_0000")
529 def test_flow_entry_reuse(self):
530 """Verify flow entry reuse doesn't accumulate meta info"""
531 self.pg_enable_capture(self.pg_interfaces)
534 # enable ip4 datapath for an interface
535 # set active and passive timers
546 ipfix.add_vpp_config()
548 # template packet should arrive immediately
549 ipfix_decoder = IPFIXDecoder()
550 templates = ipfix.verify_templates(ipfix_decoder, count=1)
555 Ether(src=self.pg3.remote_mac, dst=self.pg4.local_mac)
556 / IP(src=self.pg3.remote_ip4, dst=self.pg4.remote_ip4)
557 / TCP(sport=1234, dport=4321)
562 # send the tcp packet two times, each time with new set of flags
564 TCP_F_SYN | TCP_F_ACK,
565 TCP_F_RST | TCP_F_PSH,
568 self.pkts[0][TCP].flags = f
569 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
571 # verify meta info - packet/octet delta and tcp flags
572 cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=6)
573 self.verify_cflow_data(ipfix_decoder, capture, cflow)
574 self.verify_cflow_data_detail(
579 IPFIX_TCP_FLAGS_ID: f,
580 IPFIX_SRC_TRANS_PORT_ID: 1234,
581 IPFIX_DST_TRANS_PORT_ID: 4321,
585 self.collector.get_capture(3)
588 ipfix.remove_vpp_config()
590 def test_interface_dump(self):
591 """Dump interfaces with IPFIX flow record generation enabled"""
592 self.logger.info("FFP_TEST_START_0003")
594 # Enable feature for 3 interfaces
595 ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx")
596 ipfix1.add_vpp_config()
598 ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx")
599 ipfix2.enable_flowprobe_feature()
601 ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both")
602 ipfix3.enable_flowprobe_feature()
604 # When request "all", dump should contain all enabled interfaces
605 dump = self.vapi.flowprobe_interface_dump()
606 self.assertEqual(len(dump), 3)
608 # Verify 1st interface
609 self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index)
611 dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2
615 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
618 # Verify 2nd interface
619 self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index)
621 dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
625 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
628 # Verify 3rd interface
629 self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index)
631 dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6
635 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
638 # When request 2nd interface, dump should contain only the specified interface
639 dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index)
640 self.assertEqual(len(dump), 1)
642 # Verify 2nd interface
643 self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index)
645 dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
649 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
652 # When request 99th interface, dump should be empty
653 dump = self.vapi.flowprobe_interface_dump(sw_if_index=99)
654 self.assertEqual(len(dump), 0)
656 ipfix1.remove_vpp_config()
657 ipfix2.remove_vpp_config()
658 ipfix3.remove_vpp_config()
659 self.logger.info("FFP_TEST_FINISH_0003")
661 def test_get_params(self):
662 """Get IPFIX flow record generation parameters"""
663 self.logger.info("FFP_TEST_START_0004")
665 # Enable feature for an interface with custom parameters
666 ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4")
667 ipfix.add_vpp_config()
669 # Get and verify parameters
670 params = self.vapi.flowprobe_get_params()
671 self.assertEqual(params.active_timer, 20)
672 self.assertEqual(params.passive_timer, 40)
673 record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
674 record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
675 record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
676 self.assertEqual(params.record_flags, record_flags)
678 ipfix.remove_vpp_config()
679 self.logger.info("FFP_TEST_FINISH_0004")
682 class DatapathTestsHolder(object):
683 """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
687 super(DatapathTestsHolder, cls).setUpClass()
690 def tearDownClass(cls):
691 super(DatapathTestsHolder, cls).tearDownClass()
693 def test_templatesL2(self):
694 """verify template on L2 datapath"""
695 self.logger.info("FFP_TEST_START_0000")
696 self.pg_enable_capture(self.pg_interfaces)
699 test=self, intf=self.intf1, layer="l2", direction=self.direction
701 ipfix.add_vpp_config()
703 # template packet should arrive immediately
704 self.vapi.ipfix_flush()
705 ipfix.verify_templates(timeout=3, count=1)
706 self.collector.get_capture(1)
708 ipfix.remove_vpp_config()
709 self.logger.info("FFP_TEST_FINISH_0000")
711 def test_L2onL2(self):
712 """L2 data on L2 datapath"""
713 self.logger.info("FFP_TEST_START_0001")
714 self.pg_enable_capture(self.pg_interfaces)
718 test=self, intf=self.intf1, layer="l2", direction=self.direction
720 ipfix.add_vpp_config()
722 ipfix_decoder = IPFIXDecoder()
723 # template packet should arrive immediately
724 templates = ipfix.verify_templates(ipfix_decoder, count=1)
726 self.create_stream(packets=1)
727 capture = self.send_packets()
729 # make sure the one packet we expect actually showed up
730 self.vapi.ipfix_flush()
731 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
732 self.verify_cflow_data_detail(
736 {2: "packets", 256: 8, 61: (self.direction == "tx")},
738 self.collector.get_capture(2)
740 ipfix.remove_vpp_config()
741 self.logger.info("FFP_TEST_FINISH_0001")
743 def test_L3onL2(self):
744 """L3 data on L2 datapath"""
745 self.logger.info("FFP_TEST_START_0002")
746 self.pg_enable_capture(self.pg_interfaces)
750 test=self, intf=self.intf1, layer="l3", direction=self.direction
752 ipfix.add_vpp_config()
754 ipfix_decoder = IPFIXDecoder()
755 # template packet should arrive immediately
756 templates = ipfix.verify_templates(ipfix_decoder, count=2)
758 self.create_stream(packets=1)
759 capture = self.send_packets()
761 # make sure the one packet we expect actually showed up
762 self.vapi.ipfix_flush()
763 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
764 self.verify_cflow_data_detail(
773 61: (self.direction == "tx"),
777 self.collector.get_capture(3)
779 ipfix.remove_vpp_config()
780 self.logger.info("FFP_TEST_FINISH_0002")
782 def test_L234onL2(self):
783 """L2/3/4 data on L2 datapath"""
784 self.pg_enable_capture(self.pg_interfaces)
788 test=self, intf=self.intf1, layer="l2 l3 l4", direction=self.direction
790 ipfix.add_vpp_config()
792 ipfix_decoder = IPFIXDecoder()
793 # template packet should arrive immediately
794 tmpl_l2_field_count = TMPL_COMMON_FIELD_COUNT + TMPL_L2_FIELD_COUNT
795 tmpl_ip_field_count = (
796 TMPL_COMMON_FIELD_COUNT
797 + TMPL_L2_FIELD_COUNT
798 + TMPL_L3_FIELD_COUNT
799 + TMPL_L4_FIELD_COUNT
801 templates = ipfix.verify_templates(
804 field_count_in=(tmpl_l2_field_count, tmpl_ip_field_count),
807 # verify IPv4 and IPv6 flows
808 for ip_ver in ("v4", "v6"):
809 self.create_stream(packets=1, ip_ver=ip_ver)
810 capture = self.send_packets()
812 # make sure the one packet we expect actually showed up
813 self.vapi.ipfix_flush()
814 cflow = self.wait_for_cflow_packet(
815 self.collector, templates[1 if ip_ver == "v4" else 2]
817 src_ip_id = 8 if ip_ver == "v4" else 27
818 dst_ip_id = 12 if ip_ver == "v4" else 28
819 self.verify_cflow_data_detail(
825 256: 8 if ip_ver == "v4" else 56710,
831 61: (self.direction == "tx"),
834 field_count=tmpl_ip_field_count,
840 Ether(dst=self.pg2.local_mac, src=self.pg1.remote_mac)
845 capture = self.send_packets()
847 # make sure the one packet we expect actually showed up
848 self.vapi.ipfix_flush()
849 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
850 self.verify_cflow_data_detail(
854 {2: "packets", 256: 2440, 61: (self.direction == "tx")},
855 field_count=tmpl_l2_field_count,
858 self.collector.get_capture(6)
860 ipfix.remove_vpp_config()
862 def test_L4onL2(self):
863 """L4 data on L2 datapath"""
864 self.logger.info("FFP_TEST_START_0003")
865 self.pg_enable_capture(self.pg_interfaces)
869 test=self, intf=self.intf1, layer="l4", direction=self.direction
871 ipfix.add_vpp_config()
873 ipfix_decoder = IPFIXDecoder()
874 # template packet should arrive immediately
875 templates = ipfix.verify_templates(ipfix_decoder, count=2)
877 self.create_stream(packets=1)
878 capture = self.send_packets()
880 # make sure the one packet we expect actually showed up
881 self.vapi.ipfix_flush()
882 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
883 self.verify_cflow_data_detail(
887 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
890 self.collector.get_capture(3)
892 ipfix.remove_vpp_config()
893 self.logger.info("FFP_TEST_FINISH_0003")
895 def test_templatesIp4(self):
896 """verify templates on IP4 datapath"""
897 self.logger.info("FFP_TEST_START_0000")
899 self.pg_enable_capture(self.pg_interfaces)
902 test=self, intf=self.intf1, datapath="ip4", direction=self.direction
904 ipfix.add_vpp_config()
906 # template packet should arrive immediately
907 self.vapi.ipfix_flush()
908 ipfix.verify_templates(timeout=3, count=1)
909 self.collector.get_capture(1)
911 ipfix.remove_vpp_config()
913 self.logger.info("FFP_TEST_FINISH_0000")
915 def test_L2onIP4(self):
916 """L2 data on IP4 datapath"""
917 self.logger.info("FFP_TEST_START_0001")
918 self.pg_enable_capture(self.pg_interfaces)
926 direction=self.direction,
928 ipfix.add_vpp_config()
930 ipfix_decoder = IPFIXDecoder()
931 # template packet should arrive immediately
932 templates = ipfix.verify_templates(ipfix_decoder, count=1)
934 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
935 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
937 # make sure the one packet we expect actually showed up
938 self.vapi.ipfix_flush()
939 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
940 self.verify_cflow_data_detail(
944 {2: "packets", 256: 8, 61: (self.direction == "tx")},
947 # expected two templates and one cflow packet
948 self.collector.get_capture(2)
950 ipfix.remove_vpp_config()
951 self.logger.info("FFP_TEST_FINISH_0001")
953 def test_L3onIP4(self):
954 """L3 data on IP4 datapath"""
955 self.logger.info("FFP_TEST_START_0002")
956 self.pg_enable_capture(self.pg_interfaces)
964 direction=self.direction,
966 ipfix.add_vpp_config()
968 ipfix_decoder = IPFIXDecoder()
969 # template packet should arrive immediately
970 templates = ipfix.verify_templates(ipfix_decoder, count=1)
972 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
973 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
975 # make sure the one packet we expect actually showed up
976 self.vapi.ipfix_flush()
977 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
978 self.verify_cflow_data_detail(
987 61: (self.direction == "tx"),
991 # expected two templates and one cflow packet
992 self.collector.get_capture(2)
994 ipfix.remove_vpp_config()
995 self.logger.info("FFP_TEST_FINISH_0002")
997 def test_L4onIP4(self):
998 """L4 data on IP4 datapath"""
999 self.logger.info("FFP_TEST_START_0003")
1000 self.pg_enable_capture(self.pg_interfaces)
1008 direction=self.direction,
1010 ipfix.add_vpp_config()
1012 ipfix_decoder = IPFIXDecoder()
1013 # template packet should arrive immediately
1014 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1016 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
1017 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1019 # make sure the one packet we expect actually showed up
1020 self.vapi.ipfix_flush()
1021 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1022 self.verify_cflow_data_detail(
1026 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
1029 # expected two templates and one cflow packet
1030 self.collector.get_capture(2)
1032 ipfix.remove_vpp_config()
1033 self.logger.info("FFP_TEST_FINISH_0003")
1035 def test_templatesIP6(self):
1036 """verify templates on IP6 datapath"""
1037 self.logger.info("FFP_TEST_START_0000")
1038 self.pg_enable_capture(self.pg_interfaces)
1041 test=self, intf=self.intf1, datapath="ip6", direction=self.direction
1043 ipfix.add_vpp_config()
1045 # template packet should arrive immediately
1046 ipfix.verify_templates(count=1)
1047 self.collector.get_capture(1)
1049 ipfix.remove_vpp_config()
1051 self.logger.info("FFP_TEST_FINISH_0000")
1053 def test_L2onIP6(self):
1054 """L2 data on IP6 datapath"""
1055 self.logger.info("FFP_TEST_START_0001")
1056 self.pg_enable_capture(self.pg_interfaces)
1064 direction=self.direction,
1066 ipfix.add_vpp_config()
1068 ipfix_decoder = IPFIXDecoder()
1069 # template packet should arrive immediately
1070 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1072 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1073 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1075 # make sure the one packet we expect actually showed up
1076 self.vapi.ipfix_flush()
1077 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1078 self.verify_cflow_data_detail(
1082 {2: "packets", 256: 56710, 61: (self.direction == "tx")},
1086 # expected two templates and one cflow packet
1087 self.collector.get_capture(2)
1089 ipfix.remove_vpp_config()
1090 self.logger.info("FFP_TEST_FINISH_0001")
1092 def test_L3onIP6(self):
1093 """L3 data on IP6 datapath"""
1094 self.logger.info("FFP_TEST_START_0002")
1095 self.pg_enable_capture(self.pg_interfaces)
1103 direction=self.direction,
1105 ipfix.add_vpp_config()
1107 ipfix_decoder = IPFIXDecoder()
1108 # template packet should arrive immediately
1109 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1111 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1112 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1114 # make sure the one packet we expect actually showed up
1115 self.vapi.ipfix_flush()
1116 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1117 self.verify_cflow_data_detail(
1121 {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
1125 # expected two templates and one cflow packet
1126 self.collector.get_capture(2)
1128 ipfix.remove_vpp_config()
1129 self.logger.info("FFP_TEST_FINISH_0002")
1131 def test_L4onIP6(self):
1132 """L4 data on IP6 datapath"""
1133 self.logger.info("FFP_TEST_START_0003")
1134 self.pg_enable_capture(self.pg_interfaces)
1142 direction=self.direction,
1144 ipfix.add_vpp_config()
1146 ipfix_decoder = IPFIXDecoder()
1147 # template packet should arrive immediately
1148 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1150 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1151 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1153 # make sure the one packet we expect actually showed up
1154 self.vapi.ipfix_flush()
1155 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1156 self.verify_cflow_data_detail(
1160 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
1164 # expected two templates and one cflow packet
1165 self.collector.get_capture(2)
1167 ipfix.remove_vpp_config()
1168 self.logger.info("FFP_TEST_FINISH_0003")
1170 def test_0001(self):
1171 """no timers, one CFLOW packet, 9 Flows inside"""
1172 self.logger.info("FFP_TEST_START_0001")
1173 self.pg_enable_capture(self.pg_interfaces)
1176 ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
1177 ipfix.add_vpp_config()
1179 ipfix_decoder = IPFIXDecoder()
1180 # template packet should arrive immediately
1181 templates = ipfix.verify_templates(ipfix_decoder)
1183 self.create_stream(packets=9)
1184 capture = self.send_packets()
1186 # make sure the one packet we expect actually showed up
1187 self.vapi.ipfix_flush()
1188 cflow = self.wait_for_cflow_packet(self.collector, templates[1])
1189 self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
1190 self.collector.get_capture(4)
1192 ipfix.remove_vpp_config()
1193 self.logger.info("FFP_TEST_FINISH_0001")
1195 def test_0002(self):
1196 """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
1197 self.logger.info("FFP_TEST_START_0002")
1198 self.pg_enable_capture(self.pg_interfaces)
1201 ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
1202 ipfix.add_vpp_config()
1204 ipfix_decoder = IPFIXDecoder()
1205 # template packet should arrive immediately
1206 self.vapi.ipfix_flush()
1207 templates = ipfix.verify_templates(ipfix_decoder)
1209 self.create_stream(packets=6)
1210 capture = self.send_packets()
1212 # make sure the one packet we expect actually showed up
1214 self.vapi.ipfix_flush()
1215 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1216 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1217 self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
1218 self.collector.get_capture(5)
1220 ipfix.remove_vpp_config()
1221 self.logger.info("FFP_TEST_FINISH_0002")
1224 @tag_fixme_vpp_workers
1225 class DatapathTx(MethodHolder, DatapathTestsHolder):
1226 """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
1234 @tag_fixme_vpp_workers
1235 class DatapathRx(MethodHolder, DatapathTestsHolder):
1236 """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
1244 @unittest.skipUnless(config.extended, "part of extended tests")
1245 class DisableIPFIX(MethodHolder):
1249 def setUpClass(cls):
1250 super(DisableIPFIX, cls).setUpClass()
1253 def tearDownClass(cls):
1254 super(DisableIPFIX, cls).tearDownClass()
1256 def test_0001(self):
1257 """disable IPFIX after first packets"""
1258 self.logger.info("FFP_TEST_START_0001")
1259 self.pg_enable_capture(self.pg_interfaces)
1262 ipfix = VppCFLOW(test=self)
1263 ipfix.add_vpp_config()
1265 ipfix_decoder = IPFIXDecoder()
1266 # template packet should arrive immediately
1267 templates = ipfix.verify_templates(ipfix_decoder)
1269 self.create_stream()
1272 # make sure the one packet we expect actually showed up
1273 self.vapi.ipfix_flush()
1274 self.wait_for_cflow_packet(self.collector, templates[1])
1275 self.collector.get_capture(4)
1278 ipfix.disable_exporter()
1279 self.pg_enable_capture([self.collector])
1283 # make sure no one packet arrived in 1 minute
1284 self.vapi.ipfix_flush()
1285 self.sleep(1, "wait before verifying no packets sent")
1286 self.collector.assert_nothing_captured()
1288 ipfix.remove_vpp_config()
1289 self.logger.info("FFP_TEST_FINISH_0001")
1292 @unittest.skipUnless(config.extended, "part of extended tests")
1293 class ReenableIPFIX(MethodHolder):
1294 """Re-enable IPFIX"""
1297 def setUpClass(cls):
1298 super(ReenableIPFIX, cls).setUpClass()
1301 def tearDownClass(cls):
1302 super(ReenableIPFIX, cls).tearDownClass()
1304 def test_0011(self):
1305 """disable IPFIX after first packets and re-enable after few packets"""
1306 self.logger.info("FFP_TEST_START_0001")
1307 self.pg_enable_capture(self.pg_interfaces)
1310 ipfix = VppCFLOW(test=self)
1311 ipfix.add_vpp_config()
1313 ipfix_decoder = IPFIXDecoder()
1314 # template packet should arrive immediately
1315 templates = ipfix.verify_templates(ipfix_decoder)
1317 self.create_stream(packets=5)
1320 # make sure the one packet we expect actually showed up
1321 self.vapi.ipfix_flush()
1322 self.wait_for_cflow_packet(self.collector, templates[1])
1323 self.collector.get_capture(4)
1326 ipfix.disable_exporter()
1327 self.vapi.ipfix_flush()
1328 self.pg_enable_capture([self.collector])
1332 # make sure no one packet arrived in active timer span
1333 self.vapi.ipfix_flush()
1334 self.sleep(1, "wait before verifying no packets sent")
1335 self.collector.assert_nothing_captured()
1336 self.pg2.get_capture(5)
1339 ipfix.enable_exporter()
1341 capture = self.collector.get_capture(4)
1345 self.assertTrue(p.haslayer(IPFIX))
1346 if p.haslayer(Template):
1348 self.assertTrue(nr_templates, 3)
1350 self.assertTrue(p.haslayer(IPFIX))
1351 if p.haslayer(Data):
1353 self.assertTrue(nr_templates, 1)
1355 ipfix.remove_vpp_config()
1356 self.logger.info("FFP_TEST_FINISH_0001")
1359 @unittest.skipUnless(config.extended, "part of extended tests")
1360 class DisableFP(MethodHolder):
1361 """Disable Flowprobe feature"""
1364 def setUpClass(cls):
1365 super(DisableFP, cls).setUpClass()
1368 def tearDownClass(cls):
1369 super(DisableFP, cls).tearDownClass()
1371 def test_0001(self):
1372 """disable flowprobe feature after first packets"""
1373 self.logger.info("FFP_TEST_START_0001")
1374 self.pg_enable_capture(self.pg_interfaces)
1376 ipfix = VppCFLOW(test=self)
1377 ipfix.add_vpp_config()
1379 ipfix_decoder = IPFIXDecoder()
1380 # template packet should arrive immediately
1381 templates = ipfix.verify_templates(ipfix_decoder)
1383 self.create_stream()
1386 # make sure the one packet we expect actually showed up
1387 self.vapi.ipfix_flush()
1388 self.wait_for_cflow_packet(self.collector, templates[1])
1389 self.collector.get_capture(4)
1392 ipfix.disable_flowprobe_feature()
1393 self.pg_enable_capture([self.collector])
1397 # make sure no one packet arrived in active timer span
1398 self.vapi.ipfix_flush()
1399 self.sleep(1, "wait before verifying no packets sent")
1400 self.collector.assert_nothing_captured()
1402 # enable FPP feature so the remove_vpp_config() doesn't fail
1403 # due to missing feature on interface.
1404 ipfix.enable_flowprobe_feature()
1406 ipfix.remove_vpp_config()
1407 self.logger.info("FFP_TEST_FINISH_0001")
1409 def test_no_leftover_flows_after_disabling(self):
1410 """disable flowprobe feature and expect no leftover flows"""
1411 self.pg_enable_capture(self.pg_interfaces)
1414 # enable ip4 datapath for an interface
1415 # set active and passive timers
1426 ipfix.add_vpp_config()
1428 # template packet should arrive immediately
1429 ipfix.verify_templates(count=1)
1431 # send some ip4 packets
1432 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5)
1433 self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1435 # disable feature for the interface
1436 # currently stored ip4 flows should be removed
1437 ipfix.disable_flowprobe_feature()
1439 # no leftover ip4 flows are expected
1440 self.pg_enable_capture([self.collector])
1441 self.sleep(12, "wait for leftover ip4 flows during three passive intervals")
1442 self.collector.assert_nothing_captured()
1444 # re-enable feature for the interface
1445 ipfix.enable_flowprobe_feature()
1447 # template packet should arrive immediately
1448 ipfix_decoder = IPFIXDecoder()
1449 self.vapi.ipfix_flush()
1450 templates = ipfix.verify_templates(ipfix_decoder, count=1)
1452 # send some ip4 packets
1453 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5)
1454 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1456 # verify meta info - packet/octet delta
1457 self.vapi.ipfix_flush()
1458 cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=8)
1459 self.verify_cflow_data(ipfix_decoder, capture, cflow)
1461 self.collector.get_capture(2)
1464 ipfix.remove_vpp_config()
1467 @unittest.skipUnless(config.extended, "part of extended tests")
1468 class ReenableFP(MethodHolder):
1469 """Re-enable Flowprobe feature"""
1472 def setUpClass(cls):
1473 super(ReenableFP, cls).setUpClass()
1476 def tearDownClass(cls):
1477 super(ReenableFP, cls).tearDownClass()
1479 def test_0001(self):
1480 """disable flowprobe feature after first packets and re-enable
1481 after few packets"""
1482 self.logger.info("FFP_TEST_START_0001")
1483 self.pg_enable_capture(self.pg_interfaces)
1486 ipfix = VppCFLOW(test=self)
1487 ipfix.add_vpp_config()
1489 ipfix_decoder = IPFIXDecoder()
1490 # template packet should arrive immediately
1491 self.vapi.ipfix_flush()
1492 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1494 self.create_stream()
1497 # make sure the one packet we expect actually showed up
1498 self.vapi.ipfix_flush()
1499 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1500 self.collector.get_capture(4)
1502 # disable FPP feature
1503 ipfix.disable_flowprobe_feature()
1504 self.pg_enable_capture([self.collector])
1508 # make sure no one packet arrived in active timer span
1509 self.vapi.ipfix_flush()
1510 self.sleep(5, "wait before verifying no packets sent")
1511 self.collector.assert_nothing_captured()
1513 # enable FPP feature
1514 ipfix.enable_flowprobe_feature()
1515 self.vapi.ipfix_flush()
1516 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1520 # make sure the next packets (templates and data) we expect actually
1522 self.vapi.ipfix_flush()
1523 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1524 self.collector.get_capture(4)
1526 ipfix.remove_vpp_config()
1527 self.logger.info("FFP_TEST_FINISH_0001")
1530 if __name__ == "__main__":
1531 unittest.main(testRunner=VppTestRunner)