2 from __future__ import print_function
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, TCP, UDP
13 from scapy.layers.inet6 import IPv6
15 from config import config
16 from framework import tag_fixme_vpp_workers
17 from framework import VppTestCase, VppTestRunner
18 from framework import tag_run_solo
19 from vpp_object import VppObject
20 from vpp_pg_interface import CaptureTimeoutError
22 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
23 from vpp_ip_route import VppIpRoute, VppRoutePath
24 from vpp_papi.macaddress import mac_ntop
25 from socket import inet_ntop
26 from vpp_papi import VppEnum
29 class VppCFLOW(VppObject):
30 """CFLOW object for IPFIX exporter and Flowprobe feature"""
46 if passive == 0 or passive < active:
47 self._passive = active + 1
49 self._passive = passive
50 self._datapath = datapath # l2 ip4 ip6
51 self._collect = layer # l2 l3 l4
52 self._timeout = timeout
54 self._configured = False
56 def add_vpp_config(self):
57 self.enable_exporter()
61 if "l2" in self._collect.lower():
62 l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
63 if "l3" in self._collect.lower():
64 l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
65 if "l4" in self._collect.lower():
66 l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
67 self._test.vapi.flowprobe_params(
68 record_flags=(l2_flag | l3_flag | l4_flag),
69 active_timer=self._active,
70 passive_timer=self._passive,
72 self.enable_flowprobe_feature()
73 self._test.vapi.cli("ipfix flush")
74 self._configured = True
76 def remove_vpp_config(self):
77 self.disable_exporter()
78 self.disable_flowprobe_feature()
79 self._test.vapi.cli("ipfix flush")
80 self._configured = False
82 def enable_exporter(self):
83 self._test.vapi.set_ipfix_exporter(
84 collector_address=self._test.pg0.remote_ip4,
85 src_address=self._test.pg0.local_ip4,
87 template_interval=self._timeout,
90 def enable_flowprobe_feature(self):
91 self._test.vapi.ppcli(
92 "flowprobe feature add-del %s %s" % (self._intf, self._datapath)
95 def disable_exporter(self):
96 self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
98 def disable_flowprobe_feature(self):
100 "flowprobe feature add-del %s %s disable" % (self._intf, self._datapath)
104 return "ipfix-collector-%s-%s" % (self._src, self.dst)
106 def query_vpp_config(self):
107 return self._configured
109 def verify_templates(self, decoder=None, timeout=1, count=3):
111 self._test.assertIn(count, (1, 2, 3))
112 for _ in range(count):
113 p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
114 self._test.assertTrue(p.haslayer(IPFIX))
115 if decoder is not None and p.haslayer(Template):
116 templates.append(p[Template].templateID)
117 decoder.add_template(p.getlayer(Template))
121 class MethodHolder(VppTestCase):
122 """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
126 max_number_of_packets = 10
132 Perform standard class setup (defined by class method setUpClass in
133 class VppTestCase) before running the test case, set test case related
134 variables and configure VPP.
136 super(MethodHolder, cls).setUpClass()
138 # Create pg interfaces
139 cls.create_pg_interfaces(range(9))
142 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
144 # Create BD with MAC learning and unknown unicast flooding disabled
145 # and put interfaces to this BD
146 cls.vapi.bridge_domain_add_del(bd_id=1, uu_flood=1, learn=1)
147 cls.vapi.sw_interface_set_l2_bridge(
148 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
150 cls.vapi.sw_interface_set_l2_bridge(
151 rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
154 # Set up all interfaces
155 for i in cls.pg_interfaces:
159 cls.pg0.configure_ipv4_neighbors()
160 cls.collector = cls.pg0
163 cls.pg1.resolve_arp()
165 cls.pg2.resolve_arp()
167 cls.pg3.resolve_arp()
169 cls.pg4.resolve_arp()
172 cls.pg8.configure_ipv4_neighbors()
175 cls.pg5.resolve_ndp()
176 cls.pg5.disable_ipv6_ra()
178 cls.pg6.resolve_ndp()
179 cls.pg6.disable_ipv6_ra()
181 super(MethodHolder, cls).tearDownClass()
185 def tearDownClass(cls):
186 super(MethodHolder, cls).tearDownClass()
189 self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
191 """Create a packet stream to tickle the plugin
193 :param VppInterface src_if: Source interface for packet stream
194 :param VppInterface src_if: Dst interface for packet stream
202 packets = random.randint(1, self.max_number_of_packets)
204 for p in range(0, packets):
206 pkt_size = random.choice(self.pg_if_packet_sizes)
207 info = self.create_packet_info(src_if, dst_if)
208 payload = self.info_to_payload(info)
209 p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
211 p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
213 p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
214 p /= UDP(sport=1234, dport=4321)
217 self.extend_packet(p, pkt_size)
220 def verify_cflow_data(self, decoder, capture, cflow):
226 if cflow.haslayer(Data):
227 data = decoder.decode_data_set(cflow.getlayer(Set))
229 self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
230 self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
232 def send_packets(self, src_if=None, dst_if=None):
237 self.pg_enable_capture([dst_if])
238 src_if.add_stream(self.pkts)
240 return dst_if.get_capture(len(self.pkts))
242 def verify_cflow_data_detail(
243 self, decoder, capture, cflow, data_set={1: "octets", 2: "packets"}, ip_ver="v4"
246 print(capture[0].show())
247 if cflow.haslayer(Data):
248 data = decoder.decode_data_set(cflow.getlayer(Set))
252 ip_layer = capture[0][IP]
254 ip_layer = capture[0][IPv6]
255 if data_set is not None:
257 # skip flow if ingress/egress interface is 0
258 if int(binascii.hexlify(record[10]), 16) == 0:
260 if int(binascii.hexlify(record[14]), 16) == 0:
263 for field in data_set:
264 if field not in record.keys():
266 value = data_set[field]
267 if value == "octets":
270 value += 40 # ??? is this correct
271 elif value == "packets":
273 elif value == "src_ip":
275 ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
277 ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
278 value = int(binascii.hexlify(ip), 16)
279 elif value == "dst_ip":
281 ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
283 ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
284 value = int(binascii.hexlify(ip), 16)
285 elif value == "sport":
286 value = int(capture[0][UDP].sport)
287 elif value == "dport":
288 value = int(capture[0][UDP].dport)
290 int(binascii.hexlify(record[field]), 16), value
293 def verify_cflow_data_notimer(self, decoder, capture, cflows):
296 if cflow.haslayer(Data):
297 data = decoder.decode_data_set(cflow.getlayer(Set))
299 raise Exception("No CFLOW data")
304 self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
305 self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
306 self.assertEqual(len(capture), idx)
308 def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
309 """wait for CFLOW packet and verify its correctness
311 :param timeout: how long to wait
314 self.logger.info("IPFIX: Waiting for CFLOW packet")
315 # self.logger.debug(self.vapi.ppcli("show flow table"))
316 p = collector_intf.wait_for_packet(timeout=timeout)
317 self.assertEqual(p[Set].setID, set_id)
318 # self.logger.debug(self.vapi.ppcli("show flow table"))
319 self.logger.debug(ppp("IPFIX: Got packet:", p))
324 @tag_fixme_vpp_workers
325 class Flowprobe(MethodHolder):
326 """Template verification, timer tests"""
330 super(Flowprobe, cls).setUpClass()
333 def tearDownClass(cls):
334 super(Flowprobe, cls).tearDownClass()
337 """timer less than template timeout"""
338 self.logger.info("FFP_TEST_START_0001")
339 self.pg_enable_capture(self.pg_interfaces)
342 ipfix = VppCFLOW(test=self, active=2)
343 ipfix.add_vpp_config()
345 ipfix_decoder = IPFIXDecoder()
346 # template packet should arrive immediately
347 templates = ipfix.verify_templates(ipfix_decoder)
349 self.create_stream(packets=1)
351 capture = self.pg2.get_capture(1)
353 # make sure the one packet we expect actually showed up
354 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
355 self.verify_cflow_data(ipfix_decoder, capture, cflow)
357 ipfix.remove_vpp_config()
358 self.logger.info("FFP_TEST_FINISH_0001")
361 """timer greater than template timeout"""
362 self.logger.info("FFP_TEST_START_0002")
363 self.pg_enable_capture(self.pg_interfaces)
366 ipfix = VppCFLOW(test=self, timeout=3, active=4)
367 ipfix.add_vpp_config()
369 ipfix_decoder = IPFIXDecoder()
370 # template packet should arrive immediately
371 ipfix.verify_templates()
373 self.create_stream(packets=2)
375 capture = self.pg2.get_capture(2)
377 # next set of template packet should arrive after 20 seconds
378 # template packet should arrive within 20 s
379 templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
381 # make sure the one packet we expect actually showed up
382 cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
383 self.verify_cflow_data(ipfix_decoder, capture, cflow)
385 ipfix.remove_vpp_config()
386 self.logger.info("FFP_TEST_FINISH_0002")
388 def test_cflow_packet(self):
389 """verify cflow packet fields"""
390 self.logger.info("FFP_TEST_START_0000")
391 self.pg_enable_capture(self.pg_interfaces)
395 test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
397 ipfix.add_vpp_config()
399 route_9001 = VppIpRoute(
403 [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
405 route_9001.add_vpp_config()
407 ipfix_decoder = IPFIXDecoder()
408 templates = ipfix.verify_templates(ipfix_decoder, count=1)
412 Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
413 / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
414 / TCP(sport=1234, dport=4321, flags=80)
419 nowUTC = int(time.time())
420 nowUNIX = nowUTC + 2208988800
421 self.send_packets(src_if=self.pg7, dst_if=self.pg8)
423 cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
424 self.collector.get_capture(2)
426 if cflow[0].haslayer(IPFIX):
427 self.assertEqual(cflow[IPFIX].version, 10)
428 self.assertEqual(cflow[IPFIX].observationDomainID, 1)
429 self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
430 self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
431 if cflow.haslayer(Data):
432 record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
434 self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
436 self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
438 self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
440 self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
442 self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
443 flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
444 # flow start timestamp
445 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
446 flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
448 self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
450 self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
452 self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
454 self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
456 self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
458 self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
460 self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
462 self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
464 ipfix.remove_vpp_config()
465 self.logger.info("FFP_TEST_FINISH_0000")
468 @tag_fixme_vpp_workers
469 class Datapath(MethodHolder):
470 """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
474 super(Datapath, cls).setUpClass()
477 def tearDownClass(cls):
478 super(Datapath, cls).tearDownClass()
480 def test_templatesL2(self):
481 """verify template on L2 datapath"""
482 self.logger.info("FFP_TEST_START_0000")
483 self.pg_enable_capture(self.pg_interfaces)
485 ipfix = VppCFLOW(test=self, layer="l2")
486 ipfix.add_vpp_config()
488 # template packet should arrive immediately
489 self.vapi.ipfix_flush()
490 ipfix.verify_templates(timeout=3, count=1)
491 self.collector.get_capture(1)
493 ipfix.remove_vpp_config()
494 self.logger.info("FFP_TEST_FINISH_0000")
496 def test_L2onL2(self):
497 """L2 data on L2 datapath"""
498 self.logger.info("FFP_TEST_START_0001")
499 self.pg_enable_capture(self.pg_interfaces)
502 ipfix = VppCFLOW(test=self, layer="l2")
503 ipfix.add_vpp_config()
505 ipfix_decoder = IPFIXDecoder()
506 # template packet should arrive immediately
507 templates = ipfix.verify_templates(ipfix_decoder, count=1)
509 self.create_stream(packets=1)
510 capture = self.send_packets()
512 # make sure the one packet we expect actually showed up
513 self.vapi.ipfix_flush()
514 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
515 self.verify_cflow_data_detail(
516 ipfix_decoder, capture, cflow, {2: "packets", 256: 8}
518 self.collector.get_capture(2)
520 ipfix.remove_vpp_config()
521 self.logger.info("FFP_TEST_FINISH_0001")
523 def test_L3onL2(self):
524 """L3 data on L2 datapath"""
525 self.logger.info("FFP_TEST_START_0002")
526 self.pg_enable_capture(self.pg_interfaces)
529 ipfix = VppCFLOW(test=self, layer="l3")
530 ipfix.add_vpp_config()
532 ipfix_decoder = IPFIXDecoder()
533 # template packet should arrive immediately
534 templates = ipfix.verify_templates(ipfix_decoder, count=2)
536 self.create_stream(packets=1)
537 capture = self.send_packets()
539 # make sure the one packet we expect actually showed up
540 self.vapi.ipfix_flush()
541 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
542 self.verify_cflow_data_detail(
546 {2: "packets", 4: 17, 8: "src_ip", 12: "dst_ip"},
549 self.collector.get_capture(3)
551 ipfix.remove_vpp_config()
552 self.logger.info("FFP_TEST_FINISH_0002")
554 def test_L4onL2(self):
555 """L4 data on L2 datapath"""
556 self.logger.info("FFP_TEST_START_0003")
557 self.pg_enable_capture(self.pg_interfaces)
560 ipfix = VppCFLOW(test=self, layer="l4")
561 ipfix.add_vpp_config()
563 ipfix_decoder = IPFIXDecoder()
564 # template packet should arrive immediately
565 templates = ipfix.verify_templates(ipfix_decoder, count=2)
567 self.create_stream(packets=1)
568 capture = self.send_packets()
570 # make sure the one packet we expect actually showed up
571 self.vapi.ipfix_flush()
572 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
573 self.verify_cflow_data_detail(
574 ipfix_decoder, capture, cflow, {2: "packets", 7: "sport", 11: "dport"}
577 self.collector.get_capture(3)
579 ipfix.remove_vpp_config()
580 self.logger.info("FFP_TEST_FINISH_0003")
582 def test_templatesIp4(self):
583 """verify templates on IP4 datapath"""
584 self.logger.info("FFP_TEST_START_0000")
586 self.pg_enable_capture(self.pg_interfaces)
588 ipfix = VppCFLOW(test=self, datapath="ip4")
589 ipfix.add_vpp_config()
591 # template packet should arrive immediately
592 self.vapi.ipfix_flush()
593 ipfix.verify_templates(timeout=3, count=1)
594 self.collector.get_capture(1)
596 ipfix.remove_vpp_config()
598 self.logger.info("FFP_TEST_FINISH_0000")
600 def test_L2onIP4(self):
601 """L2 data on IP4 datapath"""
602 self.logger.info("FFP_TEST_START_0001")
603 self.pg_enable_capture(self.pg_interfaces)
606 ipfix = VppCFLOW(test=self, intf="pg4", layer="l2", datapath="ip4")
607 ipfix.add_vpp_config()
609 ipfix_decoder = IPFIXDecoder()
610 # template packet should arrive immediately
611 templates = ipfix.verify_templates(ipfix_decoder, count=1)
613 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
614 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
616 # make sure the one packet we expect actually showed up
617 self.vapi.ipfix_flush()
618 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
619 self.verify_cflow_data_detail(
620 ipfix_decoder, capture, cflow, {2: "packets", 256: 8}
623 # expected two templates and one cflow packet
624 self.collector.get_capture(2)
626 ipfix.remove_vpp_config()
627 self.logger.info("FFP_TEST_FINISH_0001")
629 def test_L3onIP4(self):
630 """L3 data on IP4 datapath"""
631 self.logger.info("FFP_TEST_START_0002")
632 self.pg_enable_capture(self.pg_interfaces)
635 ipfix = VppCFLOW(test=self, intf="pg4", layer="l3", datapath="ip4")
636 ipfix.add_vpp_config()
638 ipfix_decoder = IPFIXDecoder()
639 # template packet should arrive immediately
640 templates = ipfix.verify_templates(ipfix_decoder, count=1)
642 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
643 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
645 # make sure the one packet we expect actually showed up
646 self.vapi.ipfix_flush()
647 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
648 self.verify_cflow_data_detail(
652 {1: "octets", 2: "packets", 8: "src_ip", 12: "dst_ip"},
655 # expected two templates and one cflow packet
656 self.collector.get_capture(2)
658 ipfix.remove_vpp_config()
659 self.logger.info("FFP_TEST_FINISH_0002")
661 def test_L4onIP4(self):
662 """L4 data on IP4 datapath"""
663 self.logger.info("FFP_TEST_START_0003")
664 self.pg_enable_capture(self.pg_interfaces)
667 ipfix = VppCFLOW(test=self, intf="pg4", layer="l4", datapath="ip4")
668 ipfix.add_vpp_config()
670 ipfix_decoder = IPFIXDecoder()
671 # template packet should arrive immediately
672 templates = ipfix.verify_templates(ipfix_decoder, count=1)
674 self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
675 capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
677 # make sure the one packet we expect actually showed up
678 self.vapi.ipfix_flush()
679 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
680 self.verify_cflow_data_detail(
681 ipfix_decoder, capture, cflow, {2: "packets", 7: "sport", 11: "dport"}
684 # expected two templates and one cflow packet
685 self.collector.get_capture(2)
687 ipfix.remove_vpp_config()
688 self.logger.info("FFP_TEST_FINISH_0003")
690 def test_templatesIP6(self):
691 """verify templates on IP6 datapath"""
692 self.logger.info("FFP_TEST_START_0000")
693 self.pg_enable_capture(self.pg_interfaces)
695 ipfix = VppCFLOW(test=self, datapath="ip6")
696 ipfix.add_vpp_config()
698 # template packet should arrive immediately
699 ipfix.verify_templates(count=1)
700 self.collector.get_capture(1)
702 ipfix.remove_vpp_config()
704 self.logger.info("FFP_TEST_FINISH_0000")
706 def test_L2onIP6(self):
707 """L2 data on IP6 datapath"""
708 self.logger.info("FFP_TEST_START_0001")
709 self.pg_enable_capture(self.pg_interfaces)
712 ipfix = VppCFLOW(test=self, intf="pg6", layer="l2", datapath="ip6")
713 ipfix.add_vpp_config()
715 ipfix_decoder = IPFIXDecoder()
716 # template packet should arrive immediately
717 templates = ipfix.verify_templates(ipfix_decoder, count=1)
719 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
720 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
722 # make sure the one packet we expect actually showed up
723 self.vapi.ipfix_flush()
724 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
725 self.verify_cflow_data_detail(
726 ipfix_decoder, capture, cflow, {2: "packets", 256: 56710}, ip_ver="v6"
729 # expected two templates and one cflow packet
730 self.collector.get_capture(2)
732 ipfix.remove_vpp_config()
733 self.logger.info("FFP_TEST_FINISH_0001")
735 def test_L3onIP6(self):
736 """L3 data on IP6 datapath"""
737 self.logger.info("FFP_TEST_START_0002")
738 self.pg_enable_capture(self.pg_interfaces)
741 ipfix = VppCFLOW(test=self, intf="pg6", layer="l3", datapath="ip6")
742 ipfix.add_vpp_config()
744 ipfix_decoder = IPFIXDecoder()
745 # template packet should arrive immediately
746 templates = ipfix.verify_templates(ipfix_decoder, count=1)
748 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
749 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
751 # make sure the one packet we expect actually showed up
752 self.vapi.ipfix_flush()
753 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
754 self.verify_cflow_data_detail(
758 {2: "packets", 27: "src_ip", 28: "dst_ip"},
762 # expected two templates and one cflow packet
763 self.collector.get_capture(2)
765 ipfix.remove_vpp_config()
766 self.logger.info("FFP_TEST_FINISH_0002")
768 def test_L4onIP6(self):
769 """L4 data on IP6 datapath"""
770 self.logger.info("FFP_TEST_START_0003")
771 self.pg_enable_capture(self.pg_interfaces)
774 ipfix = VppCFLOW(test=self, intf="pg6", layer="l4", datapath="ip6")
775 ipfix.add_vpp_config()
777 ipfix_decoder = IPFIXDecoder()
778 # template packet should arrive immediately
779 templates = ipfix.verify_templates(ipfix_decoder, count=1)
781 self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
782 capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
784 # make sure the one packet we expect actually showed up
785 self.vapi.ipfix_flush()
786 cflow = self.wait_for_cflow_packet(self.collector, templates[0])
787 self.verify_cflow_data_detail(
791 {2: "packets", 7: "sport", 11: "dport"},
795 # expected two templates and one cflow packet
796 self.collector.get_capture(2)
798 ipfix.remove_vpp_config()
799 self.logger.info("FFP_TEST_FINISH_0003")
802 """no timers, one CFLOW packet, 9 Flows inside"""
803 self.logger.info("FFP_TEST_START_0001")
804 self.pg_enable_capture(self.pg_interfaces)
807 ipfix = VppCFLOW(test=self)
808 ipfix.add_vpp_config()
810 ipfix_decoder = IPFIXDecoder()
811 # template packet should arrive immediately
812 templates = ipfix.verify_templates(ipfix_decoder)
814 self.create_stream(packets=9)
815 capture = self.send_packets()
817 # make sure the one packet we expect actually showed up
818 self.vapi.ipfix_flush()
819 cflow = self.wait_for_cflow_packet(self.collector, templates[1])
820 self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
821 self.collector.get_capture(4)
823 ipfix.remove_vpp_config()
824 self.logger.info("FFP_TEST_FINISH_0001")
827 """no timers, two CFLOW packets (mtu=256), 3 Flows in each"""
828 self.logger.info("FFP_TEST_START_0002")
829 self.pg_enable_capture(self.pg_interfaces)
832 ipfix = VppCFLOW(test=self, mtu=256)
833 ipfix.add_vpp_config()
835 ipfix_decoder = IPFIXDecoder()
836 # template packet should arrive immediately
837 self.vapi.ipfix_flush()
838 templates = ipfix.verify_templates(ipfix_decoder)
840 self.create_stream(packets=6)
841 capture = self.send_packets()
843 # make sure the one packet we expect actually showed up
845 self.vapi.ipfix_flush()
846 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
847 cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
848 self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
849 self.collector.get_capture(5)
851 ipfix.remove_vpp_config()
852 self.logger.info("FFP_TEST_FINISH_0002")
855 @unittest.skipUnless(config.extended, "part of extended tests")
856 class DisableIPFIX(MethodHolder):
861 super(DisableIPFIX, cls).setUpClass()
864 def tearDownClass(cls):
865 super(DisableIPFIX, cls).tearDownClass()
868 """disable IPFIX after first packets"""
869 self.logger.info("FFP_TEST_START_0001")
870 self.pg_enable_capture(self.pg_interfaces)
873 ipfix = VppCFLOW(test=self)
874 ipfix.add_vpp_config()
876 ipfix_decoder = IPFIXDecoder()
877 # template packet should arrive immediately
878 templates = ipfix.verify_templates(ipfix_decoder)
883 # make sure the one packet we expect actually showed up
884 self.vapi.ipfix_flush()
885 self.wait_for_cflow_packet(self.collector, templates[1])
886 self.collector.get_capture(4)
889 ipfix.disable_exporter()
890 self.pg_enable_capture([self.collector])
894 # make sure no one packet arrived in 1 minute
895 self.vapi.ipfix_flush()
896 self.sleep(1, "wait before verifying no packets sent")
897 self.collector.assert_nothing_captured()
899 ipfix.remove_vpp_config()
900 self.logger.info("FFP_TEST_FINISH_0001")
903 @unittest.skipUnless(config.extended, "part of extended tests")
904 class ReenableIPFIX(MethodHolder):
905 """Re-enable IPFIX"""
909 super(ReenableIPFIX, cls).setUpClass()
912 def tearDownClass(cls):
913 super(ReenableIPFIX, cls).tearDownClass()
916 """disable IPFIX after first packets and re-enable after few packets"""
917 self.logger.info("FFP_TEST_START_0001")
918 self.pg_enable_capture(self.pg_interfaces)
921 ipfix = VppCFLOW(test=self)
922 ipfix.add_vpp_config()
924 ipfix_decoder = IPFIXDecoder()
925 # template packet should arrive immediately
926 templates = ipfix.verify_templates(ipfix_decoder)
928 self.create_stream(packets=5)
931 # make sure the one packet we expect actually showed up
932 self.vapi.ipfix_flush()
933 self.wait_for_cflow_packet(self.collector, templates[1])
934 self.collector.get_capture(4)
937 ipfix.disable_exporter()
938 self.vapi.ipfix_flush()
939 self.pg_enable_capture([self.collector])
943 # make sure no one packet arrived in active timer span
944 self.vapi.ipfix_flush()
945 self.sleep(1, "wait before verifying no packets sent")
946 self.collector.assert_nothing_captured()
947 self.pg2.get_capture(5)
950 ipfix.enable_exporter()
952 capture = self.collector.get_capture(4)
956 self.assertTrue(p.haslayer(IPFIX))
957 if p.haslayer(Template):
959 self.assertTrue(nr_templates, 3)
961 self.assertTrue(p.haslayer(IPFIX))
964 self.assertTrue(nr_templates, 1)
966 ipfix.remove_vpp_config()
967 self.logger.info("FFP_TEST_FINISH_0001")
970 @unittest.skipUnless(config.extended, "part of extended tests")
971 class DisableFP(MethodHolder):
972 """Disable Flowprobe feature"""
976 super(DisableFP, cls).setUpClass()
979 def tearDownClass(cls):
980 super(DisableFP, cls).tearDownClass()
983 """disable flowprobe feature after first packets"""
984 self.logger.info("FFP_TEST_START_0001")
985 self.pg_enable_capture(self.pg_interfaces)
987 ipfix = VppCFLOW(test=self)
988 ipfix.add_vpp_config()
990 ipfix_decoder = IPFIXDecoder()
991 # template packet should arrive immediately
992 templates = ipfix.verify_templates(ipfix_decoder)
997 # make sure the one packet we expect actually showed up
998 self.vapi.ipfix_flush()
999 self.wait_for_cflow_packet(self.collector, templates[1])
1000 self.collector.get_capture(4)
1003 ipfix.disable_flowprobe_feature()
1004 self.pg_enable_capture([self.collector])
1008 # make sure no one packet arrived in active timer span
1009 self.vapi.ipfix_flush()
1010 self.sleep(1, "wait before verifying no packets sent")
1011 self.collector.assert_nothing_captured()
1013 ipfix.remove_vpp_config()
1014 self.logger.info("FFP_TEST_FINISH_0001")
1017 @unittest.skipUnless(config.extended, "part of extended tests")
1018 class ReenableFP(MethodHolder):
1019 """Re-enable Flowprobe feature"""
1022 def setUpClass(cls):
1023 super(ReenableFP, cls).setUpClass()
1026 def tearDownClass(cls):
1027 super(ReenableFP, cls).tearDownClass()
1029 def test_0001(self):
1030 """disable flowprobe feature after first packets and re-enable
1031 after few packets"""
1032 self.logger.info("FFP_TEST_START_0001")
1033 self.pg_enable_capture(self.pg_interfaces)
1036 ipfix = VppCFLOW(test=self)
1037 ipfix.add_vpp_config()
1039 ipfix_decoder = IPFIXDecoder()
1040 # template packet should arrive immediately
1041 self.vapi.ipfix_flush()
1042 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1044 self.create_stream()
1047 # make sure the one packet we expect actually showed up
1048 self.vapi.ipfix_flush()
1049 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1050 self.collector.get_capture(4)
1052 # disable FPP feature
1053 ipfix.disable_flowprobe_feature()
1054 self.pg_enable_capture([self.collector])
1058 # make sure no one packet arrived in active timer span
1059 self.vapi.ipfix_flush()
1060 self.sleep(5, "wait before verifying no packets sent")
1061 self.collector.assert_nothing_captured()
1063 # enable FPP feature
1064 ipfix.enable_flowprobe_feature()
1065 self.vapi.ipfix_flush()
1066 templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1070 # make sure the next packets (templates and data) we expect actually
1072 self.vapi.ipfix_flush()
1073 self.wait_for_cflow_packet(self.collector, templates[1], 5)
1074 self.collector.get_capture(4)
1076 ipfix.remove_vpp_config()
1077 self.logger.info("FFP_TEST_FINISH_0001")
1080 if __name__ == "__main__":
1081 unittest.main(testRunner=VppTestRunner)