tests: replace pycodestyle with black
[vpp.git] / test / test_flowprobe.py
1 #!/usr/bin/env python3
2 from __future__ import print_function
3 import binascii
4 import random
5 import socket
6 import unittest
7 import time
8 import re
9
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, TCP, UDP
13 from scapy.layers.inet6 import IPv6
14
15 from config import config
16 from framework import tag_fixme_vpp_workers
17 from framework import VppTestCase, VppTestRunner
18 from framework import tag_run_solo
19 from vpp_object import VppObject
20 from vpp_pg_interface import CaptureTimeoutError
21 from util import ppp
22 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
23 from vpp_ip_route import VppIpRoute, VppRoutePath
24 from vpp_papi.macaddress import mac_ntop
25 from socket import inet_ntop
26 from vpp_papi import VppEnum
27
28
29 class VppCFLOW(VppObject):
30     """CFLOW object for IPFIX exporter and Flowprobe feature"""
31
32     def __init__(
33         self,
34         test,
35         intf="pg2",
36         active=0,
37         passive=0,
38         timeout=100,
39         mtu=1024,
40         datapath="l2",
41         layer="l2 l3 l4",
42     ):
43         self._test = test
44         self._intf = intf
45         self._active = active
46         if passive == 0 or passive < active:
47             self._passive = active + 1
48         else:
49             self._passive = passive
50         self._datapath = datapath  # l2 ip4 ip6
51         self._collect = layer  # l2 l3 l4
52         self._timeout = timeout
53         self._mtu = mtu
54         self._configured = False
55
56     def add_vpp_config(self):
57         self.enable_exporter()
58         l2_flag = 0
59         l3_flag = 0
60         l4_flag = 0
61         if "l2" in self._collect.lower():
62             l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
63         if "l3" in self._collect.lower():
64             l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
65         if "l4" in self._collect.lower():
66             l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
67         self._test.vapi.flowprobe_params(
68             record_flags=(l2_flag | l3_flag | l4_flag),
69             active_timer=self._active,
70             passive_timer=self._passive,
71         )
72         self.enable_flowprobe_feature()
73         self._test.vapi.cli("ipfix flush")
74         self._configured = True
75
76     def remove_vpp_config(self):
77         self.disable_exporter()
78         self.disable_flowprobe_feature()
79         self._test.vapi.cli("ipfix flush")
80         self._configured = False
81
82     def enable_exporter(self):
83         self._test.vapi.set_ipfix_exporter(
84             collector_address=self._test.pg0.remote_ip4,
85             src_address=self._test.pg0.local_ip4,
86             path_mtu=self._mtu,
87             template_interval=self._timeout,
88         )
89
90     def enable_flowprobe_feature(self):
91         self._test.vapi.ppcli(
92             "flowprobe feature add-del %s %s" % (self._intf, self._datapath)
93         )
94
95     def disable_exporter(self):
96         self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
97
98     def disable_flowprobe_feature(self):
99         self._test.vapi.cli(
100             "flowprobe feature add-del %s %s disable" % (self._intf, self._datapath)
101         )
102
103     def object_id(self):
104         return "ipfix-collector-%s-%s" % (self._src, self.dst)
105
106     def query_vpp_config(self):
107         return self._configured
108
109     def verify_templates(self, decoder=None, timeout=1, count=3):
110         templates = []
111         self._test.assertIn(count, (1, 2, 3))
112         for _ in range(count):
113             p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
114             self._test.assertTrue(p.haslayer(IPFIX))
115             if decoder is not None and p.haslayer(Template):
116                 templates.append(p[Template].templateID)
117                 decoder.add_template(p.getlayer(Template))
118         return templates
119
120
121 class MethodHolder(VppTestCase):
122     """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
123
124     # Test variables
125     debug_print = False
126     max_number_of_packets = 10
127     pkts = []
128
129     @classmethod
130     def setUpClass(cls):
131         """
132         Perform standard class setup (defined by class method setUpClass in
133         class VppTestCase) before running the test case, set test case related
134         variables and configure VPP.
135         """
136         super(MethodHolder, cls).setUpClass()
137         try:
138             # Create pg interfaces
139             cls.create_pg_interfaces(range(9))
140
141             # Packet sizes
142             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
143
144             # Create BD with MAC learning and unknown unicast flooding disabled
145             # and put interfaces to this BD
146             cls.vapi.bridge_domain_add_del(bd_id=1, uu_flood=1, learn=1)
147             cls.vapi.sw_interface_set_l2_bridge(
148                 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
149             )
150             cls.vapi.sw_interface_set_l2_bridge(
151                 rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
152             )
153
154             # Set up all interfaces
155             for i in cls.pg_interfaces:
156                 i.admin_up()
157
158             cls.pg0.config_ip4()
159             cls.pg0.configure_ipv4_neighbors()
160             cls.collector = cls.pg0
161
162             cls.pg1.config_ip4()
163             cls.pg1.resolve_arp()
164             cls.pg2.config_ip4()
165             cls.pg2.resolve_arp()
166             cls.pg3.config_ip4()
167             cls.pg3.resolve_arp()
168             cls.pg4.config_ip4()
169             cls.pg4.resolve_arp()
170             cls.pg7.config_ip4()
171             cls.pg8.config_ip4()
172             cls.pg8.configure_ipv4_neighbors()
173
174             cls.pg5.config_ip6()
175             cls.pg5.resolve_ndp()
176             cls.pg5.disable_ipv6_ra()
177             cls.pg6.config_ip6()
178             cls.pg6.resolve_ndp()
179             cls.pg6.disable_ipv6_ra()
180         except Exception:
181             super(MethodHolder, cls).tearDownClass()
182             raise
183
184     @classmethod
185     def tearDownClass(cls):
186         super(MethodHolder, cls).tearDownClass()
187
188     def create_stream(
189         self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
190     ):
191         """Create a packet stream to tickle the plugin
192
193         :param VppInterface src_if: Source interface for packet stream
194         :param VppInterface src_if: Dst interface for packet stream
195         """
196         if src_if is None:
197             src_if = self.pg1
198         if dst_if is None:
199             dst_if = self.pg2
200         self.pkts = []
201         if packets is None:
202             packets = random.randint(1, self.max_number_of_packets)
203         pkt_size = size
204         for p in range(0, packets):
205             if size is None:
206                 pkt_size = random.choice(self.pg_if_packet_sizes)
207             info = self.create_packet_info(src_if, dst_if)
208             payload = self.info_to_payload(info)
209             p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
210             if ip_ver == "v4":
211                 p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
212             else:
213                 p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
214             p /= UDP(sport=1234, dport=4321)
215             p /= Raw(payload)
216             info.data = p.copy()
217             self.extend_packet(p, pkt_size)
218             self.pkts.append(p)
219
220     def verify_cflow_data(self, decoder, capture, cflow):
221         octets = 0
222         packets = 0
223         for p in capture:
224             octets += p[IP].len
225             packets += 1
226         if cflow.haslayer(Data):
227             data = decoder.decode_data_set(cflow.getlayer(Set))
228             for record in data:
229                 self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
230                 self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
231
232     def send_packets(self, src_if=None, dst_if=None):
233         if src_if is None:
234             src_if = self.pg1
235         if dst_if is None:
236             dst_if = self.pg2
237         self.pg_enable_capture([dst_if])
238         src_if.add_stream(self.pkts)
239         self.pg_start()
240         return dst_if.get_capture(len(self.pkts))
241
242     def verify_cflow_data_detail(
243         self, decoder, capture, cflow, data_set={1: "octets", 2: "packets"}, ip_ver="v4"
244     ):
245         if self.debug_print:
246             print(capture[0].show())
247         if cflow.haslayer(Data):
248             data = decoder.decode_data_set(cflow.getlayer(Set))
249             if self.debug_print:
250                 print(data)
251             if ip_ver == "v4":
252                 ip_layer = capture[0][IP]
253             else:
254                 ip_layer = capture[0][IPv6]
255             if data_set is not None:
256                 for record in data:
257                     # skip flow if ingress/egress interface is 0
258                     if int(binascii.hexlify(record[10]), 16) == 0:
259                         continue
260                     if int(binascii.hexlify(record[14]), 16) == 0:
261                         continue
262
263                     for field in data_set:
264                         if field not in record.keys():
265                             continue
266                         value = data_set[field]
267                         if value == "octets":
268                             value = ip_layer.len
269                             if ip_ver == "v6":
270                                 value += 40  # ??? is this correct
271                         elif value == "packets":
272                             value = 1
273                         elif value == "src_ip":
274                             if ip_ver == "v4":
275                                 ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
276                             else:
277                                 ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
278                             value = int(binascii.hexlify(ip), 16)
279                         elif value == "dst_ip":
280                             if ip_ver == "v4":
281                                 ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
282                             else:
283                                 ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
284                             value = int(binascii.hexlify(ip), 16)
285                         elif value == "sport":
286                             value = int(capture[0][UDP].sport)
287                         elif value == "dport":
288                             value = int(capture[0][UDP].dport)
289                         self.assertEqual(
290                             int(binascii.hexlify(record[field]), 16), value
291                         )
292
293     def verify_cflow_data_notimer(self, decoder, capture, cflows):
294         idx = 0
295         for cflow in cflows:
296             if cflow.haslayer(Data):
297                 data = decoder.decode_data_set(cflow.getlayer(Set))
298             else:
299                 raise Exception("No CFLOW data")
300
301             for rec in data:
302                 p = capture[idx]
303                 idx += 1
304                 self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
305                 self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
306         self.assertEqual(len(capture), idx)
307
308     def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
309         """wait for CFLOW packet and verify its correctness
310
311         :param timeout: how long to wait
312
313         """
314         self.logger.info("IPFIX: Waiting for CFLOW packet")
315         # self.logger.debug(self.vapi.ppcli("show flow table"))
316         p = collector_intf.wait_for_packet(timeout=timeout)
317         self.assertEqual(p[Set].setID, set_id)
318         # self.logger.debug(self.vapi.ppcli("show flow table"))
319         self.logger.debug(ppp("IPFIX: Got packet:", p))
320         return p
321
322
323 @tag_run_solo
324 @tag_fixme_vpp_workers
325 class Flowprobe(MethodHolder):
326     """Template verification, timer tests"""
327
328     @classmethod
329     def setUpClass(cls):
330         super(Flowprobe, cls).setUpClass()
331
332     @classmethod
333     def tearDownClass(cls):
334         super(Flowprobe, cls).tearDownClass()
335
336     def test_0001(self):
337         """timer less than template timeout"""
338         self.logger.info("FFP_TEST_START_0001")
339         self.pg_enable_capture(self.pg_interfaces)
340         self.pkts = []
341
342         ipfix = VppCFLOW(test=self, active=2)
343         ipfix.add_vpp_config()
344
345         ipfix_decoder = IPFIXDecoder()
346         # template packet should arrive immediately
347         templates = ipfix.verify_templates(ipfix_decoder)
348
349         self.create_stream(packets=1)
350         self.send_packets()
351         capture = self.pg2.get_capture(1)
352
353         # make sure the one packet we expect actually showed up
354         cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
355         self.verify_cflow_data(ipfix_decoder, capture, cflow)
356
357         ipfix.remove_vpp_config()
358         self.logger.info("FFP_TEST_FINISH_0001")
359
360     def test_0002(self):
361         """timer greater than template timeout"""
362         self.logger.info("FFP_TEST_START_0002")
363         self.pg_enable_capture(self.pg_interfaces)
364         self.pkts = []
365
366         ipfix = VppCFLOW(test=self, timeout=3, active=4)
367         ipfix.add_vpp_config()
368
369         ipfix_decoder = IPFIXDecoder()
370         # template packet should arrive immediately
371         ipfix.verify_templates()
372
373         self.create_stream(packets=2)
374         self.send_packets()
375         capture = self.pg2.get_capture(2)
376
377         # next set of template packet should arrive after 20 seconds
378         # template packet should arrive within 20 s
379         templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
380
381         # make sure the one packet we expect actually showed up
382         cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
383         self.verify_cflow_data(ipfix_decoder, capture, cflow)
384
385         ipfix.remove_vpp_config()
386         self.logger.info("FFP_TEST_FINISH_0002")
387
388     def test_cflow_packet(self):
389         """verify cflow packet fields"""
390         self.logger.info("FFP_TEST_START_0000")
391         self.pg_enable_capture(self.pg_interfaces)
392         self.pkts = []
393
394         ipfix = VppCFLOW(
395             test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
396         )
397         ipfix.add_vpp_config()
398
399         route_9001 = VppIpRoute(
400             self,
401             "9.0.0.0",
402             24,
403             [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
404         )
405         route_9001.add_vpp_config()
406
407         ipfix_decoder = IPFIXDecoder()
408         templates = ipfix.verify_templates(ipfix_decoder, count=1)
409
410         self.pkts = [
411             (
412                 Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
413                 / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
414                 / TCP(sport=1234, dport=4321, flags=80)
415                 / Raw(b"\xa5" * 100)
416             )
417         ]
418
419         nowUTC = int(time.time())
420         nowUNIX = nowUTC + 2208988800
421         self.send_packets(src_if=self.pg7, dst_if=self.pg8)
422
423         cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
424         self.collector.get_capture(2)
425
426         if cflow[0].haslayer(IPFIX):
427             self.assertEqual(cflow[IPFIX].version, 10)
428             self.assertEqual(cflow[IPFIX].observationDomainID, 1)
429             self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
430             self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
431         if cflow.haslayer(Data):
432             record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
433             # ingress interface
434             self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
435             # egress interface
436             self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
437             # packets
438             self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
439             # src mac
440             self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
441             # dst mac
442             self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
443             flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
444             # flow start timestamp
445             self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
446             flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
447             # flow end timestamp
448             self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
449             # ethernet type
450             self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
451             # src ip
452             self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
453             # dst ip
454             self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
455             # protocol (TCP)
456             self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
457             # src port
458             self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
459             # dst port
460             self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
461             # tcp flags
462             self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
463
464         ipfix.remove_vpp_config()
465         self.logger.info("FFP_TEST_FINISH_0000")
466
467
468 @tag_fixme_vpp_workers
469 class Datapath(MethodHolder):
470     """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
471
472     @classmethod
473     def setUpClass(cls):
474         super(Datapath, cls).setUpClass()
475
476     @classmethod
477     def tearDownClass(cls):
478         super(Datapath, cls).tearDownClass()
479
480     def test_templatesL2(self):
481         """verify template on L2 datapath"""
482         self.logger.info("FFP_TEST_START_0000")
483         self.pg_enable_capture(self.pg_interfaces)
484
485         ipfix = VppCFLOW(test=self, layer="l2")
486         ipfix.add_vpp_config()
487
488         # template packet should arrive immediately
489         self.vapi.ipfix_flush()
490         ipfix.verify_templates(timeout=3, count=1)
491         self.collector.get_capture(1)
492
493         ipfix.remove_vpp_config()
494         self.logger.info("FFP_TEST_FINISH_0000")
495
496     def test_L2onL2(self):
497         """L2 data on L2 datapath"""
498         self.logger.info("FFP_TEST_START_0001")
499         self.pg_enable_capture(self.pg_interfaces)
500         self.pkts = []
501
502         ipfix = VppCFLOW(test=self, layer="l2")
503         ipfix.add_vpp_config()
504
505         ipfix_decoder = IPFIXDecoder()
506         # template packet should arrive immediately
507         templates = ipfix.verify_templates(ipfix_decoder, count=1)
508
509         self.create_stream(packets=1)
510         capture = self.send_packets()
511
512         # make sure the one packet we expect actually showed up
513         self.vapi.ipfix_flush()
514         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
515         self.verify_cflow_data_detail(
516             ipfix_decoder, capture, cflow, {2: "packets", 256: 8}
517         )
518         self.collector.get_capture(2)
519
520         ipfix.remove_vpp_config()
521         self.logger.info("FFP_TEST_FINISH_0001")
522
523     def test_L3onL2(self):
524         """L3 data on L2 datapath"""
525         self.logger.info("FFP_TEST_START_0002")
526         self.pg_enable_capture(self.pg_interfaces)
527         self.pkts = []
528
529         ipfix = VppCFLOW(test=self, layer="l3")
530         ipfix.add_vpp_config()
531
532         ipfix_decoder = IPFIXDecoder()
533         # template packet should arrive immediately
534         templates = ipfix.verify_templates(ipfix_decoder, count=2)
535
536         self.create_stream(packets=1)
537         capture = self.send_packets()
538
539         # make sure the one packet we expect actually showed up
540         self.vapi.ipfix_flush()
541         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
542         self.verify_cflow_data_detail(
543             ipfix_decoder,
544             capture,
545             cflow,
546             {2: "packets", 4: 17, 8: "src_ip", 12: "dst_ip"},
547         )
548
549         self.collector.get_capture(3)
550
551         ipfix.remove_vpp_config()
552         self.logger.info("FFP_TEST_FINISH_0002")
553
554     def test_L4onL2(self):
555         """L4 data on L2 datapath"""
556         self.logger.info("FFP_TEST_START_0003")
557         self.pg_enable_capture(self.pg_interfaces)
558         self.pkts = []
559
560         ipfix = VppCFLOW(test=self, layer="l4")
561         ipfix.add_vpp_config()
562
563         ipfix_decoder = IPFIXDecoder()
564         # template packet should arrive immediately
565         templates = ipfix.verify_templates(ipfix_decoder, count=2)
566
567         self.create_stream(packets=1)
568         capture = self.send_packets()
569
570         # make sure the one packet we expect actually showed up
571         self.vapi.ipfix_flush()
572         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
573         self.verify_cflow_data_detail(
574             ipfix_decoder, capture, cflow, {2: "packets", 7: "sport", 11: "dport"}
575         )
576
577         self.collector.get_capture(3)
578
579         ipfix.remove_vpp_config()
580         self.logger.info("FFP_TEST_FINISH_0003")
581
582     def test_templatesIp4(self):
583         """verify templates on IP4 datapath"""
584         self.logger.info("FFP_TEST_START_0000")
585
586         self.pg_enable_capture(self.pg_interfaces)
587
588         ipfix = VppCFLOW(test=self, datapath="ip4")
589         ipfix.add_vpp_config()
590
591         # template packet should arrive immediately
592         self.vapi.ipfix_flush()
593         ipfix.verify_templates(timeout=3, count=1)
594         self.collector.get_capture(1)
595
596         ipfix.remove_vpp_config()
597
598         self.logger.info("FFP_TEST_FINISH_0000")
599
600     def test_L2onIP4(self):
601         """L2 data on IP4 datapath"""
602         self.logger.info("FFP_TEST_START_0001")
603         self.pg_enable_capture(self.pg_interfaces)
604         self.pkts = []
605
606         ipfix = VppCFLOW(test=self, intf="pg4", layer="l2", datapath="ip4")
607         ipfix.add_vpp_config()
608
609         ipfix_decoder = IPFIXDecoder()
610         # template packet should arrive immediately
611         templates = ipfix.verify_templates(ipfix_decoder, count=1)
612
613         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
614         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
615
616         # make sure the one packet we expect actually showed up
617         self.vapi.ipfix_flush()
618         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
619         self.verify_cflow_data_detail(
620             ipfix_decoder, capture, cflow, {2: "packets", 256: 8}
621         )
622
623         # expected two templates and one cflow packet
624         self.collector.get_capture(2)
625
626         ipfix.remove_vpp_config()
627         self.logger.info("FFP_TEST_FINISH_0001")
628
629     def test_L3onIP4(self):
630         """L3 data on IP4 datapath"""
631         self.logger.info("FFP_TEST_START_0002")
632         self.pg_enable_capture(self.pg_interfaces)
633         self.pkts = []
634
635         ipfix = VppCFLOW(test=self, intf="pg4", layer="l3", datapath="ip4")
636         ipfix.add_vpp_config()
637
638         ipfix_decoder = IPFIXDecoder()
639         # template packet should arrive immediately
640         templates = ipfix.verify_templates(ipfix_decoder, count=1)
641
642         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
643         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
644
645         # make sure the one packet we expect actually showed up
646         self.vapi.ipfix_flush()
647         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
648         self.verify_cflow_data_detail(
649             ipfix_decoder,
650             capture,
651             cflow,
652             {1: "octets", 2: "packets", 8: "src_ip", 12: "dst_ip"},
653         )
654
655         # expected two templates and one cflow packet
656         self.collector.get_capture(2)
657
658         ipfix.remove_vpp_config()
659         self.logger.info("FFP_TEST_FINISH_0002")
660
661     def test_L4onIP4(self):
662         """L4 data on IP4 datapath"""
663         self.logger.info("FFP_TEST_START_0003")
664         self.pg_enable_capture(self.pg_interfaces)
665         self.pkts = []
666
667         ipfix = VppCFLOW(test=self, intf="pg4", layer="l4", datapath="ip4")
668         ipfix.add_vpp_config()
669
670         ipfix_decoder = IPFIXDecoder()
671         # template packet should arrive immediately
672         templates = ipfix.verify_templates(ipfix_decoder, count=1)
673
674         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
675         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
676
677         # make sure the one packet we expect actually showed up
678         self.vapi.ipfix_flush()
679         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
680         self.verify_cflow_data_detail(
681             ipfix_decoder, capture, cflow, {2: "packets", 7: "sport", 11: "dport"}
682         )
683
684         # expected two templates and one cflow packet
685         self.collector.get_capture(2)
686
687         ipfix.remove_vpp_config()
688         self.logger.info("FFP_TEST_FINISH_0003")
689
690     def test_templatesIP6(self):
691         """verify templates on IP6 datapath"""
692         self.logger.info("FFP_TEST_START_0000")
693         self.pg_enable_capture(self.pg_interfaces)
694
695         ipfix = VppCFLOW(test=self, datapath="ip6")
696         ipfix.add_vpp_config()
697
698         # template packet should arrive immediately
699         ipfix.verify_templates(count=1)
700         self.collector.get_capture(1)
701
702         ipfix.remove_vpp_config()
703
704         self.logger.info("FFP_TEST_FINISH_0000")
705
706     def test_L2onIP6(self):
707         """L2 data on IP6 datapath"""
708         self.logger.info("FFP_TEST_START_0001")
709         self.pg_enable_capture(self.pg_interfaces)
710         self.pkts = []
711
712         ipfix = VppCFLOW(test=self, intf="pg6", layer="l2", datapath="ip6")
713         ipfix.add_vpp_config()
714
715         ipfix_decoder = IPFIXDecoder()
716         # template packet should arrive immediately
717         templates = ipfix.verify_templates(ipfix_decoder, count=1)
718
719         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
720         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
721
722         # make sure the one packet we expect actually showed up
723         self.vapi.ipfix_flush()
724         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
725         self.verify_cflow_data_detail(
726             ipfix_decoder, capture, cflow, {2: "packets", 256: 56710}, ip_ver="v6"
727         )
728
729         # expected two templates and one cflow packet
730         self.collector.get_capture(2)
731
732         ipfix.remove_vpp_config()
733         self.logger.info("FFP_TEST_FINISH_0001")
734
735     def test_L3onIP6(self):
736         """L3 data on IP6 datapath"""
737         self.logger.info("FFP_TEST_START_0002")
738         self.pg_enable_capture(self.pg_interfaces)
739         self.pkts = []
740
741         ipfix = VppCFLOW(test=self, intf="pg6", layer="l3", datapath="ip6")
742         ipfix.add_vpp_config()
743
744         ipfix_decoder = IPFIXDecoder()
745         # template packet should arrive immediately
746         templates = ipfix.verify_templates(ipfix_decoder, count=1)
747
748         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
749         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
750
751         # make sure the one packet we expect actually showed up
752         self.vapi.ipfix_flush()
753         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
754         self.verify_cflow_data_detail(
755             ipfix_decoder,
756             capture,
757             cflow,
758             {2: "packets", 27: "src_ip", 28: "dst_ip"},
759             ip_ver="v6",
760         )
761
762         # expected two templates and one cflow packet
763         self.collector.get_capture(2)
764
765         ipfix.remove_vpp_config()
766         self.logger.info("FFP_TEST_FINISH_0002")
767
768     def test_L4onIP6(self):
769         """L4 data on IP6 datapath"""
770         self.logger.info("FFP_TEST_START_0003")
771         self.pg_enable_capture(self.pg_interfaces)
772         self.pkts = []
773
774         ipfix = VppCFLOW(test=self, intf="pg6", layer="l4", datapath="ip6")
775         ipfix.add_vpp_config()
776
777         ipfix_decoder = IPFIXDecoder()
778         # template packet should arrive immediately
779         templates = ipfix.verify_templates(ipfix_decoder, count=1)
780
781         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
782         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
783
784         # make sure the one packet we expect actually showed up
785         self.vapi.ipfix_flush()
786         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
787         self.verify_cflow_data_detail(
788             ipfix_decoder,
789             capture,
790             cflow,
791             {2: "packets", 7: "sport", 11: "dport"},
792             ip_ver="v6",
793         )
794
795         # expected two templates and one cflow packet
796         self.collector.get_capture(2)
797
798         ipfix.remove_vpp_config()
799         self.logger.info("FFP_TEST_FINISH_0003")
800
801     def test_0001(self):
802         """no timers, one CFLOW packet, 9 Flows inside"""
803         self.logger.info("FFP_TEST_START_0001")
804         self.pg_enable_capture(self.pg_interfaces)
805         self.pkts = []
806
807         ipfix = VppCFLOW(test=self)
808         ipfix.add_vpp_config()
809
810         ipfix_decoder = IPFIXDecoder()
811         # template packet should arrive immediately
812         templates = ipfix.verify_templates(ipfix_decoder)
813
814         self.create_stream(packets=9)
815         capture = self.send_packets()
816
817         # make sure the one packet we expect actually showed up
818         self.vapi.ipfix_flush()
819         cflow = self.wait_for_cflow_packet(self.collector, templates[1])
820         self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
821         self.collector.get_capture(4)
822
823         ipfix.remove_vpp_config()
824         self.logger.info("FFP_TEST_FINISH_0001")
825
826     def test_0002(self):
827         """no timers, two CFLOW packets (mtu=256), 3 Flows in each"""
828         self.logger.info("FFP_TEST_START_0002")
829         self.pg_enable_capture(self.pg_interfaces)
830         self.pkts = []
831
832         ipfix = VppCFLOW(test=self, mtu=256)
833         ipfix.add_vpp_config()
834
835         ipfix_decoder = IPFIXDecoder()
836         # template packet should arrive immediately
837         self.vapi.ipfix_flush()
838         templates = ipfix.verify_templates(ipfix_decoder)
839
840         self.create_stream(packets=6)
841         capture = self.send_packets()
842
843         # make sure the one packet we expect actually showed up
844         cflows = []
845         self.vapi.ipfix_flush()
846         cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
847         cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
848         self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
849         self.collector.get_capture(5)
850
851         ipfix.remove_vpp_config()
852         self.logger.info("FFP_TEST_FINISH_0002")
853
854
855 @unittest.skipUnless(config.extended, "part of extended tests")
856 class DisableIPFIX(MethodHolder):
857     """Disable IPFIX"""
858
859     @classmethod
860     def setUpClass(cls):
861         super(DisableIPFIX, cls).setUpClass()
862
863     @classmethod
864     def tearDownClass(cls):
865         super(DisableIPFIX, cls).tearDownClass()
866
867     def test_0001(self):
868         """disable IPFIX after first packets"""
869         self.logger.info("FFP_TEST_START_0001")
870         self.pg_enable_capture(self.pg_interfaces)
871         self.pkts = []
872
873         ipfix = VppCFLOW(test=self)
874         ipfix.add_vpp_config()
875
876         ipfix_decoder = IPFIXDecoder()
877         # template packet should arrive immediately
878         templates = ipfix.verify_templates(ipfix_decoder)
879
880         self.create_stream()
881         self.send_packets()
882
883         # make sure the one packet we expect actually showed up
884         self.vapi.ipfix_flush()
885         self.wait_for_cflow_packet(self.collector, templates[1])
886         self.collector.get_capture(4)
887
888         # disable IPFIX
889         ipfix.disable_exporter()
890         self.pg_enable_capture([self.collector])
891
892         self.send_packets()
893
894         # make sure no one packet arrived in 1 minute
895         self.vapi.ipfix_flush()
896         self.sleep(1, "wait before verifying no packets sent")
897         self.collector.assert_nothing_captured()
898
899         ipfix.remove_vpp_config()
900         self.logger.info("FFP_TEST_FINISH_0001")
901
902
903 @unittest.skipUnless(config.extended, "part of extended tests")
904 class ReenableIPFIX(MethodHolder):
905     """Re-enable IPFIX"""
906
907     @classmethod
908     def setUpClass(cls):
909         super(ReenableIPFIX, cls).setUpClass()
910
911     @classmethod
912     def tearDownClass(cls):
913         super(ReenableIPFIX, cls).tearDownClass()
914
915     def test_0011(self):
916         """disable IPFIX after first packets and re-enable after few packets"""
917         self.logger.info("FFP_TEST_START_0001")
918         self.pg_enable_capture(self.pg_interfaces)
919         self.pkts = []
920
921         ipfix = VppCFLOW(test=self)
922         ipfix.add_vpp_config()
923
924         ipfix_decoder = IPFIXDecoder()
925         # template packet should arrive immediately
926         templates = ipfix.verify_templates(ipfix_decoder)
927
928         self.create_stream(packets=5)
929         self.send_packets()
930
931         # make sure the one packet we expect actually showed up
932         self.vapi.ipfix_flush()
933         self.wait_for_cflow_packet(self.collector, templates[1])
934         self.collector.get_capture(4)
935
936         # disable IPFIX
937         ipfix.disable_exporter()
938         self.vapi.ipfix_flush()
939         self.pg_enable_capture([self.collector])
940
941         self.send_packets()
942
943         # make sure no one packet arrived in active timer span
944         self.vapi.ipfix_flush()
945         self.sleep(1, "wait before verifying no packets sent")
946         self.collector.assert_nothing_captured()
947         self.pg2.get_capture(5)
948
949         # enable IPFIX
950         ipfix.enable_exporter()
951
952         capture = self.collector.get_capture(4)
953         nr_templates = 0
954         nr_data = 0
955         for p in capture:
956             self.assertTrue(p.haslayer(IPFIX))
957             if p.haslayer(Template):
958                 nr_templates += 1
959         self.assertTrue(nr_templates, 3)
960         for p in capture:
961             self.assertTrue(p.haslayer(IPFIX))
962             if p.haslayer(Data):
963                 nr_data += 1
964         self.assertTrue(nr_templates, 1)
965
966         ipfix.remove_vpp_config()
967         self.logger.info("FFP_TEST_FINISH_0001")
968
969
970 @unittest.skipUnless(config.extended, "part of extended tests")
971 class DisableFP(MethodHolder):
972     """Disable Flowprobe feature"""
973
974     @classmethod
975     def setUpClass(cls):
976         super(DisableFP, cls).setUpClass()
977
978     @classmethod
979     def tearDownClass(cls):
980         super(DisableFP, cls).tearDownClass()
981
982     def test_0001(self):
983         """disable flowprobe feature after first packets"""
984         self.logger.info("FFP_TEST_START_0001")
985         self.pg_enable_capture(self.pg_interfaces)
986         self.pkts = []
987         ipfix = VppCFLOW(test=self)
988         ipfix.add_vpp_config()
989
990         ipfix_decoder = IPFIXDecoder()
991         # template packet should arrive immediately
992         templates = ipfix.verify_templates(ipfix_decoder)
993
994         self.create_stream()
995         self.send_packets()
996
997         # make sure the one packet we expect actually showed up
998         self.vapi.ipfix_flush()
999         self.wait_for_cflow_packet(self.collector, templates[1])
1000         self.collector.get_capture(4)
1001
1002         # disable IPFIX
1003         ipfix.disable_flowprobe_feature()
1004         self.pg_enable_capture([self.collector])
1005
1006         self.send_packets()
1007
1008         # make sure no one packet arrived in active timer span
1009         self.vapi.ipfix_flush()
1010         self.sleep(1, "wait before verifying no packets sent")
1011         self.collector.assert_nothing_captured()
1012
1013         ipfix.remove_vpp_config()
1014         self.logger.info("FFP_TEST_FINISH_0001")
1015
1016
1017 @unittest.skipUnless(config.extended, "part of extended tests")
1018 class ReenableFP(MethodHolder):
1019     """Re-enable Flowprobe feature"""
1020
1021     @classmethod
1022     def setUpClass(cls):
1023         super(ReenableFP, cls).setUpClass()
1024
1025     @classmethod
1026     def tearDownClass(cls):
1027         super(ReenableFP, cls).tearDownClass()
1028
1029     def test_0001(self):
1030         """disable flowprobe feature after first packets and re-enable
1031         after few packets"""
1032         self.logger.info("FFP_TEST_START_0001")
1033         self.pg_enable_capture(self.pg_interfaces)
1034         self.pkts = []
1035
1036         ipfix = VppCFLOW(test=self)
1037         ipfix.add_vpp_config()
1038
1039         ipfix_decoder = IPFIXDecoder()
1040         # template packet should arrive immediately
1041         self.vapi.ipfix_flush()
1042         templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1043
1044         self.create_stream()
1045         self.send_packets()
1046
1047         # make sure the one packet we expect actually showed up
1048         self.vapi.ipfix_flush()
1049         self.wait_for_cflow_packet(self.collector, templates[1], 5)
1050         self.collector.get_capture(4)
1051
1052         # disable FPP feature
1053         ipfix.disable_flowprobe_feature()
1054         self.pg_enable_capture([self.collector])
1055
1056         self.send_packets()
1057
1058         # make sure no one packet arrived in active timer span
1059         self.vapi.ipfix_flush()
1060         self.sleep(5, "wait before verifying no packets sent")
1061         self.collector.assert_nothing_captured()
1062
1063         # enable FPP feature
1064         ipfix.enable_flowprobe_feature()
1065         self.vapi.ipfix_flush()
1066         templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1067
1068         self.send_packets()
1069
1070         # make sure the next packets (templates and data) we expect actually
1071         # showed up
1072         self.vapi.ipfix_flush()
1073         self.wait_for_cflow_packet(self.collector, templates[1], 5)
1074         self.collector.get_capture(4)
1075
1076         ipfix.remove_vpp_config()
1077         self.logger.info("FFP_TEST_FINISH_0001")
1078
1079
1080 if __name__ == "__main__":
1081     unittest.main(testRunner=VppTestRunner)