2 from __future__ import print_function
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, TCP, UDP
13 from scapy.layers.inet6 import IPv6
15 from config import config
16 from framework import tag_fixme_vpp_workers
17 from framework import VppTestCase, VppTestRunner
18 from framework import tag_run_solo
19 from vpp_object import VppObject
20 from vpp_pg_interface import CaptureTimeoutError
22 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
23 from vpp_ip_route import VppIpRoute, VppRoutePath
24 from vpp_papi.macaddress import mac_ntop
25 from socket import inet_ntop
26 from vpp_papi import VppEnum
29 class VppCFLOW(VppObject):
30 """CFLOW object for IPFIX exporter and Flowprobe feature"""
46 self._intf_obj = getattr(self._test, intf)
48 if passive == 0 or passive < active:
49 self._passive = active + 1
51 self._passive = passive
52 self._datapath = datapath # l2 ip4 ip6
53 self._collect = layer # l2 l3 l4
54 self._direction = direction # rx tx both
55 self._timeout = timeout
57 self._configured = False
59 def add_vpp_config(self):
60 self.enable_exporter()
64 if "l2" in self._collect.lower():
65 l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
66 if "l3" in self._collect.lower():
67 l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
68 if "l4" in self._collect.lower():
69 l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
70 self._test.vapi.flowprobe_set_params(
71 record_flags=(l2_flag | l3_flag | l4_flag),
72 active_timer=self._active,
73 passive_timer=self._passive,
75 self.enable_flowprobe_feature()
76 self._test.vapi.cli("ipfix flush")
77 self._configured = True
79 def remove_vpp_config(self):
80 self.disable_exporter()
81 self.disable_flowprobe_feature()
82 self._test.vapi.cli("ipfix flush")
83 self._configured = False
85 def enable_exporter(self):
86 self._test.vapi.set_ipfix_exporter(
87 collector_address=self._test.pg0.remote_ip4,
88 src_address=self._test.pg0.local_ip4,
90 template_interval=self._timeout,
93 def _enable_disable_flowprobe_feature(self, is_add):
95 "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
96 "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
97 "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
100 "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
101 "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
102 "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
104 self._test.vapi.flowprobe_interface_add_del(
106 which=which_map[self._datapath],
107 direction=direction_map[self._direction],
108 sw_if_index=self._intf_obj.sw_if_index,
111 def enable_flowprobe_feature(self):
112 self._enable_disable_flowprobe_feature(is_add=True)
114 def disable_exporter(self):
115 self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
117 def disable_flowprobe_feature(self):
118 self._enable_disable_flowprobe_feature(is_add=False)
121 return "ipfix-collector-%s-%s" % (self._src, self.dst)
123 def query_vpp_config(self):
124 return self._configured
126 def verify_templates(self, decoder=None, timeout=1, count=3):
128 self._test.assertIn(count, (1, 2, 3))
129 for _ in range(count):
130 p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
131 self._test.assertTrue(p.haslayer(IPFIX))
132 if decoder is not None and p.haslayer(Template):
133 templates.append(p[Template].templateID)
134 decoder.add_template(p.getlayer(Template))
138 class MethodHolder(VppTestCase):
139 """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
143 max_number_of_packets = 10
149 Perform standard class setup (defined by class method setUpClass in
150 class VppTestCase) before running the test case, set test case related
151 variables and configure VPP.
153 super(MethodHolder, cls).setUpClass()
155 # Create pg interfaces
156 cls.create_pg_interfaces(range(9))
159 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
161 # Create BD with MAC learning and unknown unicast flooding disabled
162 # and put interfaces to this BD
163 cls.vapi.bridge_domain_add_del(bd_id=1, uu_flood=1, learn=1)
164 cls.vapi.sw_interface_set_l2_bridge(
165 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
167 cls.vapi.sw_interface_set_l2_bridge(
168 rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
171 # Set up all interfaces
172 for i in cls.pg_interfaces:
176 cls.pg0.configure_ipv4_neighbors()
177 cls.collector = cls.pg0
180 cls.pg1.resolve_arp()
182 cls.pg2.resolve_arp()
184 cls.pg3.resolve_arp()
186 cls.pg4.resolve_arp()
189 cls.pg8.configure_ipv4_neighbors()
192 cls.pg5.resolve_ndp()
193 cls.pg5.disable_ipv6_ra()
195 cls.pg6.resolve_ndp()
196 cls.pg6.disable_ipv6_ra()
198 super(MethodHolder, cls).tearDownClass()
202 def tearDownClass(cls):
203 super(MethodHolder, cls).tearDownClass()
206 self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
208 """Create a packet stream to tickle the plugin
210 :param VppInterface src_if: Source interface for packet stream
211 :param VppInterface src_if: Dst interface for packet stream
219 packets = random.randint(1, self.max_number_of_packets)
221 for p in range(0, packets):
223 pkt_size = random.choice(self.pg_if_packet_sizes)
224 info = self.create_packet_info(src_if, dst_if)
225 payload = self.info_to_payload(info)
226 p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
228 p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
230 p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
231 p /= UDP(sport=1234, dport=4321)
234 self.extend_packet(p, pkt_size)
237 def verify_cflow_data(self, decoder, capture, cflow):
243 if cflow.haslayer(Data):
244 data = decoder.decode_data_set(cflow.getlayer(Set))
246 self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
247 self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
249 def send_packets(self, src_if=None, dst_if=None):
254 self.pg_enable_capture([dst_if])
255 src_if.add_stream(self.pkts)
257 return dst_if.get_capture(len(self.pkts))
259 def verify_cflow_data_detail(
260 self, decoder, capture, cflow, data_set={1: "octets", 2: "packets"}, ip_ver="v4"
263 print(capture[0].show())
264 if cflow.haslayer(Data):
265 data = decoder.decode_data_set(cflow.getlayer(Set))
269 ip_layer = capture[0][IP]
271 ip_layer = capture[0][IPv6]
272 if data_set is not None:
274 # skip flow if ingress/egress interface is 0
275 if int(binascii.hexlify(record[10]), 16) == 0:
277 if int(binascii.hexlify(record[14]), 16) == 0:
280 for field in data_set:
281 value = data_set[field]
282 if value == "octets":
285 value += 40 # ??? is this correct
286 elif value == "packets":
288 elif value == "src_ip":
290 ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
292 ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
293 value = int(binascii.hexlify(ip), 16)
294 elif value == "dst_ip":
296 ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
298 ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
299 value = int(binascii.hexlify(ip), 16)
300 elif value == "sport":
301 value = int(capture[0][UDP].sport)
302 elif value == "dport":
303 value = int(capture[0][UDP].dport)
305 int(binascii.hexlify(record[field]), 16), value
308 def verify_cflow_data_notimer(self, decoder, capture, cflows):
311 if cflow.haslayer(Data):
312 data = decoder.decode_data_set(cflow.getlayer(Set))
314 raise Exception("No CFLOW data")
319 self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
320 self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
321 self.assertEqual(len(capture), idx)
323 def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
324 """wait for CFLOW packet and verify its correctness
326 :param timeout: how long to wait
329 self.logger.info("IPFIX: Waiting for CFLOW packet")
330 # self.logger.debug(self.vapi.ppcli("show flow table"))
331 p = collector_intf.wait_for_packet(timeout=timeout)
332 self.assertEqual(p[Set].setID, set_id)
333 # self.logger.debug(self.vapi.ppcli("show flow table"))
334 self.logger.debug(ppp("IPFIX: Got packet:", p))
339 @tag_fixme_vpp_workers
340 class Flowprobe(MethodHolder):
341 """Template verification, timer tests"""
345 super(Flowprobe, cls).setUpClass()
348 def tearDownClass(cls):
349 super(Flowprobe, cls).tearDownClass()
352 """timer less than template timeout"""
353 self.logger.info("FFP_TEST_START_0001")
354 self.pg_enable_capture(self.pg_interfaces)
357 ipfix = VppCFLOW(test=self, active=2)
358 ipfix.add_vpp_config()
360 ipfix_decoder = IPFIXDecoder()
361 # template packet should arrive immediately
362 templates = ipfix.verify_templates(ipfix_decoder)
364 self.create_stream(packets=1)
366 capture = self.pg2.get_capture(1)
368 # make sure the one packet we expect actually showed up
369 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
370 self.verify_cflow_data(ipfix_decoder, capture, cflow)
372 ipfix.remove_vpp_config()
373 self.logger.info("FFP_TEST_FINISH_0001")
376 """timer greater than template timeout"""
377 self.logger.info("FFP_TEST_START_0002")
378 self.pg_enable_capture(self.pg_interfaces)
381 ipfix = VppCFLOW(test=self, timeout=3, active=4)
382 ipfix.add_vpp_config()
384 ipfix_decoder = IPFIXDecoder()
385 # template packet should arrive immediately
386 ipfix.verify_templates()
388 self.create_stream(packets=2)
390 capture = self.pg2.get_capture(2)
392 # next set of template packet should arrive after 20 seconds
393 # template packet should arrive within 20 s
394 templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
396 # make sure the one packet we expect actually showed up
397 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
398 self.verify_cflow_data(ipfix_decoder, capture, cflow)
400 ipfix.remove_vpp_config()
401 self.logger.info("FFP_TEST_FINISH_0002")
403 def test_cflow_packet(self):
404 """verify cflow packet fields"""
405 self.logger.info("FFP_TEST_START_0000")
406 self.pg_enable_capture(self.pg_interfaces)
410 test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
412 ipfix.add_vpp_config()
414 route_9001 = VppIpRoute(
418 [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
420 route_9001.add_vpp_config()
422 ipfix_decoder = IPFIXDecoder()
423 templates = ipfix.verify_templates(ipfix_decoder, count=1)
427 Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
428 / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
429 / TCP(sport=1234, dport=4321, flags=80)
434 nowUTC = int(time.time())
435 nowUNIX = nowUTC + 2208988800
436 self.send_packets(src_if=self.pg7, dst_if=self.pg8)
438 cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
439 self.collector.get_capture(2)
441 if cflow[0].haslayer(IPFIX):
442 self.assertEqual(cflow[IPFIX].version, 10)
443 self.assertEqual(cflow[IPFIX].observationDomainID, 1)
444 self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
445 self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
446 if cflow.haslayer(Data):
447 record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
449 self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
451 self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
453 self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
455 self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
457 self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
459 self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
460 flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
461 # flow start timestamp
462 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
463 flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
465 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
467 self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
469 self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
471 self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
473 self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
475 self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
477 self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
479 self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
481 ipfix.remove_vpp_config()
482 self.logger.info("FFP_TEST_FINISH_0000")
484 def test_interface_dump(self):
485 """Dump interfaces with IPFIX flow record generation enabled"""
486 self.logger.info("FFP_TEST_START_0003")
488 # Enable feature for 3 interfaces
489 ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx")
490 ipfix1.add_vpp_config()
492 ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx")
493 ipfix2.enable_flowprobe_feature()
495 ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both")
496 ipfix3.enable_flowprobe_feature()
498 # When request "all", dump should contain all enabled interfaces
499 dump = self.vapi.flowprobe_interface_dump()
500 self.assertEqual(len(dump), 3)
502 # Verify 1st interface
503 self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index)
505 dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2
509 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
512 # Verify 2nd interface
513 self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index)
515 dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
519 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
522 # Verify 3rd interface
523 self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index)
525 dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6
529 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
532 # When request 2nd interface, dump should contain only the specified interface
533 dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index)
534 self.assertEqual(len(dump), 1)
536 # Verify 2nd interface
537 self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index)
539 dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
543 VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
546 # When request 99th interface, dump should be empty
547 dump = self.vapi.flowprobe_interface_dump(sw_if_index=99)
548 self.assertEqual(len(dump), 0)
550 ipfix1.remove_vpp_config()
551 ipfix2.remove_vpp_config()
552 ipfix3.remove_vpp_config()
553 self.logger.info("FFP_TEST_FINISH_0003")
555 def test_get_params(self):
556 """Get IPFIX flow record generation parameters"""
557 self.logger.info("FFP_TEST_START_0004")
559 # Enable feature for an interface with custom parameters
560 ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4")
561 ipfix.add_vpp_config()
563 # Get and verify parameters
564 params = self.vapi.flowprobe_get_params()
565 self.assertEqual(params.active_timer, 20)
566 self.assertEqual(params.passive_timer, 40)
567 record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
568 record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
569 record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
570 self.assertEqual(params.record_flags, record_flags)
572 ipfix.remove_vpp_config()
573 self.logger.info("FFP_TEST_FINISH_0004")
576 class DatapathTestsHolder(object):
577 """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
581 super(DatapathTestsHolder, cls).setUpClass()
584 def tearDownClass(cls):
585 super(DatapathTestsHolder, cls).tearDownClass()
587 def test_templatesL2(self):
588 """verify template on L2 datapath"""
589 self.logger.info("FFP_TEST_START_0000")
590 self.pg_enable_capture(self.pg_interfaces)
593 test=self, intf=self.intf1, layer="l2", direction=self.direction
595 ipfix.add_vpp_config()
597 # template packet should arrive immediately
598 self.vapi.ipfix_flush()
599 ipfix.verify_templates(timeout=3, count=1)
600 self.collector.get_capture(1)
602 ipfix.remove_vpp_config()
603 self.logger.info("FFP_TEST_FINISH_0000")
605 def test_L2onL2(self):
606 """L2 data on L2 datapath"""
607 self.logger.info("FFP_TEST_START_0001")
608 self.pg_enable_capture(self.pg_interfaces)
612 test=self, intf=self.intf1, layer="l2", direction=self.direction
614 ipfix.add_vpp_config()
616 ipfix_decoder = IPFIXDecoder()
617 # template packet should arrive immediately
618 templates = ipfix.verify_templates(ipfix_decoder, count=1)
620 self.create_stream(packets=1)
621 capture = self.send_packets()
623 # make sure the one packet we expect actually showed up
624 self.vapi.ipfix_flush()
625 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
626 self.verify_cflow_data_detail(
630 {2: "packets", 256: 8, 61: (self.direction == "tx")},
632 self.collector.get_capture(2)
634 ipfix.remove_vpp_config()
635 self.logger.info("FFP_TEST_FINISH_0001")
637 def test_L3onL2(self):
638 """L3 data on L2 datapath"""
639 self.logger.info("FFP_TEST_START_0002")
640 self.pg_enable_capture(self.pg_interfaces)
644 test=self, intf=self.intf1, layer="l3", direction=self.direction
646 ipfix.add_vpp_config()
648 ipfix_decoder = IPFIXDecoder()
649 # template packet should arrive immediately
650 templates = ipfix.verify_templates(ipfix_decoder, count=2)
652 self.create_stream(packets=1)
653 capture = self.send_packets()
655 # make sure the one packet we expect actually showed up
656 self.vapi.ipfix_flush()
657 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
658 self.verify_cflow_data_detail(
667 61: (self.direction == "tx"),
671 self.collector.get_capture(3)
673 ipfix.remove_vpp_config()
674 self.logger.info("FFP_TEST_FINISH_0002")
676 def test_L4onL2(self):
677 """L4 data on L2 datapath"""
678 self.logger.info("FFP_TEST_START_0003")
679 self.pg_enable_capture(self.pg_interfaces)
683 test=self, intf=self.intf1, layer="l4", direction=self.direction
685 ipfix.add_vpp_config()
687 ipfix_decoder = IPFIXDecoder()
688 # template packet should arrive immediately
689 templates = ipfix.verify_templates(ipfix_decoder, count=2)
691 self.create_stream(packets=1)
692 capture = self.send_packets()
694 # make sure the one packet we expect actually showed up
695 self.vapi.ipfix_flush()
696 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
697 self.verify_cflow_data_detail(
701 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
704 self.collector.get_capture(3)
706 ipfix.remove_vpp_config()
707 self.logger.info("FFP_TEST_FINISH_0003")
709 def test_templatesIp4(self):
710 """verify templates on IP4 datapath"""
711 self.logger.info("FFP_TEST_START_0000")
713 self.pg_enable_capture(self.pg_interfaces)
716 test=self, intf=self.intf1, datapath="ip4", direction=self.direction
718 ipfix.add_vpp_config()
720 # template packet should arrive immediately
721 self.vapi.ipfix_flush()
722 ipfix.verify_templates(timeout=3, count=1)
723 self.collector.get_capture(1)
725 ipfix.remove_vpp_config()
727 self.logger.info("FFP_TEST_FINISH_0000")
729 def test_L2onIP4(self):
730 """L2 data on IP4 datapath"""
731 self.logger.info("FFP_TEST_START_0001")
732 self.pg_enable_capture(self.pg_interfaces)
740 direction=self.direction,
742 ipfix.add_vpp_config()
744 ipfix_decoder = IPFIXDecoder()
745 # template packet should arrive immediately
746 templates = ipfix.verify_templates(ipfix_decoder, count=1)
748 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
749 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
751 # make sure the one packet we expect actually showed up
752 self.vapi.ipfix_flush()
753 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
754 self.verify_cflow_data_detail(
758 {2: "packets", 256: 8, 61: (self.direction == "tx")},
761 # expected two templates and one cflow packet
762 self.collector.get_capture(2)
764 ipfix.remove_vpp_config()
765 self.logger.info("FFP_TEST_FINISH_0001")
767 def test_L3onIP4(self):
768 """L3 data on IP4 datapath"""
769 self.logger.info("FFP_TEST_START_0002")
770 self.pg_enable_capture(self.pg_interfaces)
778 direction=self.direction,
780 ipfix.add_vpp_config()
782 ipfix_decoder = IPFIXDecoder()
783 # template packet should arrive immediately
784 templates = ipfix.verify_templates(ipfix_decoder, count=1)
786 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
787 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
789 # make sure the one packet we expect actually showed up
790 self.vapi.ipfix_flush()
791 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
792 self.verify_cflow_data_detail(
801 61: (self.direction == "tx"),
805 # expected two templates and one cflow packet
806 self.collector.get_capture(2)
808 ipfix.remove_vpp_config()
809 self.logger.info("FFP_TEST_FINISH_0002")
811 def test_L4onIP4(self):
812 """L4 data on IP4 datapath"""
813 self.logger.info("FFP_TEST_START_0003")
814 self.pg_enable_capture(self.pg_interfaces)
822 direction=self.direction,
824 ipfix.add_vpp_config()
826 ipfix_decoder = IPFIXDecoder()
827 # template packet should arrive immediately
828 templates = ipfix.verify_templates(ipfix_decoder, count=1)
830 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
831 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
833 # make sure the one packet we expect actually showed up
834 self.vapi.ipfix_flush()
835 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
836 self.verify_cflow_data_detail(
840 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
843 # expected two templates and one cflow packet
844 self.collector.get_capture(2)
846 ipfix.remove_vpp_config()
847 self.logger.info("FFP_TEST_FINISH_0003")
849 def test_templatesIP6(self):
850 """verify templates on IP6 datapath"""
851 self.logger.info("FFP_TEST_START_0000")
852 self.pg_enable_capture(self.pg_interfaces)
855 test=self, intf=self.intf1, datapath="ip6", direction=self.direction
857 ipfix.add_vpp_config()
859 # template packet should arrive immediately
860 ipfix.verify_templates(count=1)
861 self.collector.get_capture(1)
863 ipfix.remove_vpp_config()
865 self.logger.info("FFP_TEST_FINISH_0000")
867 def test_L2onIP6(self):
868 """L2 data on IP6 datapath"""
869 self.logger.info("FFP_TEST_START_0001")
870 self.pg_enable_capture(self.pg_interfaces)
878 direction=self.direction,
880 ipfix.add_vpp_config()
882 ipfix_decoder = IPFIXDecoder()
883 # template packet should arrive immediately
884 templates = ipfix.verify_templates(ipfix_decoder, count=1)
886 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
887 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
889 # make sure the one packet we expect actually showed up
890 self.vapi.ipfix_flush()
891 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
892 self.verify_cflow_data_detail(
896 {2: "packets", 256: 56710, 61: (self.direction == "tx")},
900 # expected two templates and one cflow packet
901 self.collector.get_capture(2)
903 ipfix.remove_vpp_config()
904 self.logger.info("FFP_TEST_FINISH_0001")
906 def test_L3onIP6(self):
907 """L3 data on IP6 datapath"""
908 self.logger.info("FFP_TEST_START_0002")
909 self.pg_enable_capture(self.pg_interfaces)
917 direction=self.direction,
919 ipfix.add_vpp_config()
921 ipfix_decoder = IPFIXDecoder()
922 # template packet should arrive immediately
923 templates = ipfix.verify_templates(ipfix_decoder, count=1)
925 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
926 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
928 # make sure the one packet we expect actually showed up
929 self.vapi.ipfix_flush()
930 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
931 self.verify_cflow_data_detail(
935 {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
939 # expected two templates and one cflow packet
940 self.collector.get_capture(2)
942 ipfix.remove_vpp_config()
943 self.logger.info("FFP_TEST_FINISH_0002")
945 def test_L4onIP6(self):
946 """L4 data on IP6 datapath"""
947 self.logger.info("FFP_TEST_START_0003")
948 self.pg_enable_capture(self.pg_interfaces)
956 direction=self.direction,
958 ipfix.add_vpp_config()
960 ipfix_decoder = IPFIXDecoder()
961 # template packet should arrive immediately
962 templates = ipfix.verify_templates(ipfix_decoder, count=1)
964 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
965 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
967 # make sure the one packet we expect actually showed up
968 self.vapi.ipfix_flush()
969 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
970 self.verify_cflow_data_detail(
974 {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
978 # expected two templates and one cflow packet
979 self.collector.get_capture(2)
981 ipfix.remove_vpp_config()
982 self.logger.info("FFP_TEST_FINISH_0003")
985 """no timers, one CFLOW packet, 9 Flows inside"""
986 self.logger.info("FFP_TEST_START_0001")
987 self.pg_enable_capture(self.pg_interfaces)
990 ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
991 ipfix.add_vpp_config()
993 ipfix_decoder = IPFIXDecoder()
994 # template packet should arrive immediately
995 templates = ipfix.verify_templates(ipfix_decoder)
997 self.create_stream(packets=9)
998 capture = self.send_packets()
1000 # make sure the one packet we expect actually showed up
1001 self.vapi.ipfix_flush()
1002 cflow = self.wait_for_cflow_packet(self.collector, templates[1])
1003 self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
1004 self.collector.get_capture(4)
1006 ipfix.remove_vpp_config()
1007 self.logger.info("FFP_TEST_FINISH_0001")
1009 def test_0002(self):
1010 """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
1011 self.logger.info("FFP_TEST_START_0002")
1012 self.pg_enable_capture(self.pg_interfaces)
1015 ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
1016 ipfix.add_vpp_config()
1018 ipfix_decoder = IPFIXDecoder()
1019 # template packet should arrive immediately
1020 self.vapi.ipfix_flush()
1021 templates = ipfix.verify_templates(ipfix_decoder)
1023 self.create_stream(packets=6)
1024 capture = self.send_packets()
1026 # make sure the one packet we expect actually showed up
1028 self.vapi.ipfix_flush()
1029 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1030 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1031 self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
1032 self.collector.get_capture(5)
1034 ipfix.remove_vpp_config()
1035 self.logger.info("FFP_TEST_FINISH_0002")
1038 @tag_fixme_vpp_workers
1039 class DatapathTx(MethodHolder, DatapathTestsHolder):
1040 """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
1048 @tag_fixme_vpp_workers
1049 class DatapathRx(MethodHolder, DatapathTestsHolder):
1050 """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
1058 @unittest.skipUnless(config.extended, "part of extended tests")
1059 class DisableIPFIX(MethodHolder):
1063 def setUpClass(cls):
1064 super(DisableIPFIX, cls).setUpClass()
1067 def tearDownClass(cls):
1068 super(DisableIPFIX, cls).tearDownClass()
1070 def test_0001(self):
1071 """disable IPFIX after first packets"""
1072 self.logger.info("FFP_TEST_START_0001")
1073 self.pg_enable_capture(self.pg_interfaces)
1076 ipfix = VppCFLOW(test=self)
1077 ipfix.add_vpp_config()
1079 ipfix_decoder = IPFIXDecoder()
1080 # template packet should arrive immediately
1081 templates = ipfix.verify_templates(ipfix_decoder)
1083 self.create_stream()
1086 # make sure the one packet we expect actually showed up
1087 self.vapi.ipfix_flush()
1088 self.wait_for_cflow_packet(self.collector, templates[1])
1089 self.collector.get_capture(4)
1092 ipfix.disable_exporter()
1093 self.pg_enable_capture([self.collector])
1097 # make sure no one packet arrived in 1 minute
1098 self.vapi.ipfix_flush()
1099 self.sleep(1, "wait before verifying no packets sent")
1100 self.collector.assert_nothing_captured()
1102 ipfix.remove_vpp_config()
1103 self.logger.info("FFP_TEST_FINISH_0001")
1106 @unittest.skipUnless(config.extended, "part of extended tests")
1107 class ReenableIPFIX(MethodHolder):
1108 """Re-enable IPFIX"""
1111 def setUpClass(cls):
1112 super(ReenableIPFIX, cls).setUpClass()
1115 def tearDownClass(cls):
1116 super(ReenableIPFIX, cls).tearDownClass()
1118 def test_0011(self):
1119 """disable IPFIX after first packets and re-enable after few packets"""
1120 self.logger.info("FFP_TEST_START_0001")
1121 self.pg_enable_capture(self.pg_interfaces)
1124 ipfix = VppCFLOW(test=self)
1125 ipfix.add_vpp_config()
1127 ipfix_decoder = IPFIXDecoder()
1128 # template packet should arrive immediately
1129 templates = ipfix.verify_templates(ipfix_decoder)
1131 self.create_stream(packets=5)
1134 # make sure the one packet we expect actually showed up
1135 self.vapi.ipfix_flush()
1136 self.wait_for_cflow_packet(self.collector, templates[1])
1137 self.collector.get_capture(4)
1140 ipfix.disable_exporter()
1141 self.vapi.ipfix_flush()
1142 self.pg_enable_capture([self.collector])
1146 # make sure no one packet arrived in active timer span
1147 self.vapi.ipfix_flush()
1148 self.sleep(1, "wait before verifying no packets sent")
1149 self.collector.assert_nothing_captured()
1150 self.pg2.get_capture(5)
1153 ipfix.enable_exporter()
1155 capture = self.collector.get_capture(4)
1159 self.assertTrue(p.haslayer(IPFIX))
1160 if p.haslayer(Template):
1162 self.assertTrue(nr_templates, 3)
1164 self.assertTrue(p.haslayer(IPFIX))
1165 if p.haslayer(Data):
1167 self.assertTrue(nr_templates, 1)
1169 ipfix.remove_vpp_config()
1170 self.logger.info("FFP_TEST_FINISH_0001")
1173 @unittest.skipUnless(config.extended, "part of extended tests")
1174 class DisableFP(MethodHolder):
1175 """Disable Flowprobe feature"""
1178 def setUpClass(cls):
1179 super(DisableFP, cls).setUpClass()
1182 def tearDownClass(cls):
1183 super(DisableFP, cls).tearDownClass()
1185 def test_0001(self):
1186 """disable flowprobe feature after first packets"""
1187 self.logger.info("FFP_TEST_START_0001")
1188 self.pg_enable_capture(self.pg_interfaces)
1190 ipfix = VppCFLOW(test=self)
1191 ipfix.add_vpp_config()
1193 ipfix_decoder = IPFIXDecoder()
1194 # template packet should arrive immediately
1195 templates = ipfix.verify_templates(ipfix_decoder)
1197 self.create_stream()
1200 # make sure the one packet we expect actually showed up
1201 self.vapi.ipfix_flush()
1202 self.wait_for_cflow_packet(self.collector, templates[1])
1203 self.collector.get_capture(4)
1206 ipfix.disable_flowprobe_feature()
1207 self.pg_enable_capture([self.collector])
1211 # make sure no one packet arrived in active timer span
1212 self.vapi.ipfix_flush()
1213 self.sleep(1, "wait before verifying no packets sent")
1214 self.collector.assert_nothing_captured()
1216 ipfix.remove_vpp_config()
1217 self.logger.info("FFP_TEST_FINISH_0001")
1220 @unittest.skipUnless(config.extended, "part of extended tests")
1221 class ReenableFP(MethodHolder):
1222 """Re-enable Flowprobe feature"""
1225 def setUpClass(cls):
1226 super(ReenableFP, cls).setUpClass()
1229 def tearDownClass(cls):
1230 super(ReenableFP, cls).tearDownClass()
1232 def test_0001(self):
1233 """disable flowprobe feature after first packets and re-enable
1234 after few packets"""
1235 self.logger.info("FFP_TEST_START_0001")
1236 self.pg_enable_capture(self.pg_interfaces)
1239 ipfix = VppCFLOW(test=self)
1240 ipfix.add_vpp_config()
1242 ipfix_decoder = IPFIXDecoder()
1243 # template packet should arrive immediately
1244 self.vapi.ipfix_flush()
1245 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1247 self.create_stream()
1250 # make sure the one packet we expect actually showed up
1251 self.vapi.ipfix_flush()
1252 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1253 self.collector.get_capture(4)
1255 # disable FPP feature
1256 ipfix.disable_flowprobe_feature()
1257 self.pg_enable_capture([self.collector])
1261 # make sure no one packet arrived in active timer span
1262 self.vapi.ipfix_flush()
1263 self.sleep(5, "wait before verifying no packets sent")
1264 self.collector.assert_nothing_captured()
1266 # enable FPP feature
1267 ipfix.enable_flowprobe_feature()
1268 self.vapi.ipfix_flush()
1269 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1273 # make sure the next packets (templates and data) we expect actually
1275 self.vapi.ipfix_flush()
1276 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1277 self.collector.get_capture(4)
1279 ipfix.remove_vpp_config()
1280 self.logger.info("FFP_TEST_FINISH_0001")
1283 if __name__ == "__main__":
1284 unittest.main(testRunner=VppTestRunner)