vppinfra: fix AddressSanitizer
[vpp.git] / test / test_flowprobe.py
1 #!/usr/bin/env python3
2 from __future__ import print_function
3 import binascii
4 import random
5 import socket
6 import unittest
7 import time
8 import re
9
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, TCP, UDP
13 from scapy.layers.inet6 import IPv6
14
15 from config import config
16 from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11
17 from framework import is_distro_ubuntu2204, is_distro_debian11
18 from framework import VppTestCase, VppTestRunner
19 from framework import tag_run_solo
20 from vpp_object import VppObject
21 from vpp_pg_interface import CaptureTimeoutError
22 from util import ppp
23 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
24 from vpp_ip_route import VppIpRoute, VppRoutePath
25 from vpp_papi.macaddress import mac_ntop
26 from socket import inet_ntop
27 from vpp_papi import VppEnum
28
29
30 class VppCFLOW(VppObject):
31     """CFLOW object for IPFIX exporter and Flowprobe feature"""
32
33     def __init__(
34         self,
35         test,
36         intf="pg2",
37         active=0,
38         passive=0,
39         timeout=100,
40         mtu=1024,
41         datapath="l2",
42         layer="l2 l3 l4",
43         direction="tx",
44     ):
45         self._test = test
46         self._intf = intf
47         self._intf_obj = getattr(self._test, intf)
48         self._active = active
49         if passive == 0 or passive < active:
50             self._passive = active + 1
51         else:
52             self._passive = passive
53         self._datapath = datapath  # l2 ip4 ip6
54         self._collect = layer  # l2 l3 l4
55         self._direction = direction  # rx tx both
56         self._timeout = timeout
57         self._mtu = mtu
58         self._configured = False
59
60     def add_vpp_config(self):
61         self.enable_exporter()
62         l2_flag = 0
63         l3_flag = 0
64         l4_flag = 0
65         if "l2" in self._collect.lower():
66             l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
67         if "l3" in self._collect.lower():
68             l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
69         if "l4" in self._collect.lower():
70             l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
71         self._test.vapi.flowprobe_set_params(
72             record_flags=(l2_flag | l3_flag | l4_flag),
73             active_timer=self._active,
74             passive_timer=self._passive,
75         )
76         self.enable_flowprobe_feature()
77         self._test.vapi.cli("ipfix flush")
78         self._configured = True
79
80     def remove_vpp_config(self):
81         self.disable_exporter()
82         self.disable_flowprobe_feature()
83         self._test.vapi.cli("ipfix flush")
84         self._configured = False
85
86     def enable_exporter(self):
87         self._test.vapi.set_ipfix_exporter(
88             collector_address=self._test.pg0.remote_ip4,
89             src_address=self._test.pg0.local_ip4,
90             path_mtu=self._mtu,
91             template_interval=self._timeout,
92         )
93
94     def _enable_disable_flowprobe_feature(self, is_add):
95         which_map = {
96             "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
97             "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
98             "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
99         }
100         direction_map = {
101             "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
102             "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
103             "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
104         }
105         self._test.vapi.flowprobe_interface_add_del(
106             is_add=is_add,
107             which=which_map[self._datapath],
108             direction=direction_map[self._direction],
109             sw_if_index=self._intf_obj.sw_if_index,
110         )
111
112     def enable_flowprobe_feature(self):
113         self._enable_disable_flowprobe_feature(is_add=True)
114
115     def disable_exporter(self):
116         self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
117
118     def disable_flowprobe_feature(self):
119         self._enable_disable_flowprobe_feature(is_add=False)
120
121     def object_id(self):
122         return "ipfix-collector-%s-%s" % (self._src, self.dst)
123
124     def query_vpp_config(self):
125         return self._configured
126
127     def verify_templates(self, decoder=None, timeout=1, count=3):
128         templates = []
129         self._test.assertIn(count, (1, 2, 3))
130         for _ in range(count):
131             p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
132             self._test.assertTrue(p.haslayer(IPFIX))
133             if decoder is not None and p.haslayer(Template):
134                 templates.append(p[Template].templateID)
135                 decoder.add_template(p.getlayer(Template))
136         return templates
137
138
139 class MethodHolder(VppTestCase):
140     """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
141
142     # Test variables
143     debug_print = False
144     max_number_of_packets = 10
145     pkts = []
146
147     @classmethod
148     def setUpClass(cls):
149         """
150         Perform standard class setup (defined by class method setUpClass in
151         class VppTestCase) before running the test case, set test case related
152         variables and configure VPP.
153         """
154         super(MethodHolder, cls).setUpClass()
155         if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
156             cls, "vpp"
157         ):
158             return
159         try:
160             # Create pg interfaces
161             cls.create_pg_interfaces(range(9))
162
163             # Packet sizes
164             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
165
166             # Create BD with MAC learning and unknown unicast flooding disabled
167             # and put interfaces to this BD
168             cls.vapi.bridge_domain_add_del(bd_id=1, uu_flood=1, learn=1)
169             cls.vapi.sw_interface_set_l2_bridge(
170                 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
171             )
172             cls.vapi.sw_interface_set_l2_bridge(
173                 rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
174             )
175
176             # Set up all interfaces
177             for i in cls.pg_interfaces:
178                 i.admin_up()
179
180             cls.pg0.config_ip4()
181             cls.pg0.configure_ipv4_neighbors()
182             cls.collector = cls.pg0
183
184             cls.pg1.config_ip4()
185             cls.pg1.resolve_arp()
186             cls.pg2.config_ip4()
187             cls.pg2.resolve_arp()
188             cls.pg3.config_ip4()
189             cls.pg3.resolve_arp()
190             cls.pg4.config_ip4()
191             cls.pg4.resolve_arp()
192             cls.pg7.config_ip4()
193             cls.pg8.config_ip4()
194             cls.pg8.configure_ipv4_neighbors()
195
196             cls.pg5.config_ip6()
197             cls.pg5.resolve_ndp()
198             cls.pg5.disable_ipv6_ra()
199             cls.pg6.config_ip6()
200             cls.pg6.resolve_ndp()
201             cls.pg6.disable_ipv6_ra()
202         except Exception:
203             super(MethodHolder, cls).tearDownClass()
204             raise
205
206     @classmethod
207     def tearDownClass(cls):
208         super(MethodHolder, cls).tearDownClass()
209
210     def create_stream(
211         self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
212     ):
213         """Create a packet stream to tickle the plugin
214
215         :param VppInterface src_if: Source interface for packet stream
216         :param VppInterface src_if: Dst interface for packet stream
217         """
218         if src_if is None:
219             src_if = self.pg1
220         if dst_if is None:
221             dst_if = self.pg2
222         self.pkts = []
223         if packets is None:
224             packets = random.randint(1, self.max_number_of_packets)
225         pkt_size = size
226         for p in range(0, packets):
227             if size is None:
228                 pkt_size = random.choice(self.pg_if_packet_sizes)
229             info = self.create_packet_info(src_if, dst_if)
230             payload = self.info_to_payload(info)
231             p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
232             if ip_ver == "v4":
233                 p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
234             else:
235                 p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
236             p /= UDP(sport=1234, dport=4321)
237             p /= Raw(payload)
238             info.data = p.copy()
239             self.extend_packet(p, pkt_size)
240             self.pkts.append(p)
241
242     def verify_cflow_data(self, decoder, capture, cflow):
243         octets = 0
244         packets = 0
245         for p in capture:
246             octets += p[IP].len
247             packets += 1
248         if cflow.haslayer(Data):
249             data = decoder.decode_data_set(cflow.getlayer(Set))
250             for record in data:
251                 self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
252                 self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
253
254     def send_packets(self, src_if=None, dst_if=None):
255         if src_if is None:
256             src_if = self.pg1
257         if dst_if is None:
258             dst_if = self.pg2
259         self.pg_enable_capture([dst_if])
260         src_if.add_stream(self.pkts)
261         self.pg_start()
262         return dst_if.get_capture(len(self.pkts))
263
264     def verify_cflow_data_detail(
265         self, decoder, capture, cflow, data_set={1: "octets", 2: "packets"}, ip_ver="v4"
266     ):
267         if self.debug_print:
268             print(capture[0].show())
269         if cflow.haslayer(Data):
270             data = decoder.decode_data_set(cflow.getlayer(Set))
271             if self.debug_print:
272                 print(data)
273             if ip_ver == "v4":
274                 ip_layer = capture[0][IP]
275             else:
276                 ip_layer = capture[0][IPv6]
277             if data_set is not None:
278                 for record in data:
279                     # skip flow if ingress/egress interface is 0
280                     if int(binascii.hexlify(record[10]), 16) == 0:
281                         continue
282                     if int(binascii.hexlify(record[14]), 16) == 0:
283                         continue
284
285                     for field in data_set:
286                         value = data_set[field]
287                         if value == "octets":
288                             value = ip_layer.len
289                             if ip_ver == "v6":
290                                 value += 40  # ??? is this correct
291                         elif value == "packets":
292                             value = 1
293                         elif value == "src_ip":
294                             if ip_ver == "v4":
295                                 ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
296                             else:
297                                 ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
298                             value = int(binascii.hexlify(ip), 16)
299                         elif value == "dst_ip":
300                             if ip_ver == "v4":
301                                 ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
302                             else:
303                                 ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
304                             value = int(binascii.hexlify(ip), 16)
305                         elif value == "sport":
306                             value = int(capture[0][UDP].sport)
307                         elif value == "dport":
308                             value = int(capture[0][UDP].dport)
309                         self.assertEqual(
310                             int(binascii.hexlify(record[field]), 16), value
311                         )
312
313     def verify_cflow_data_notimer(self, decoder, capture, cflows):
314         idx = 0
315         for cflow in cflows:
316             if cflow.haslayer(Data):
317                 data = decoder.decode_data_set(cflow.getlayer(Set))
318             else:
319                 raise Exception("No CFLOW data")
320
321             for rec in data:
322                 p = capture[idx]
323                 idx += 1
324                 self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
325                 self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
326         self.assertEqual(len(capture), idx)
327
328     def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
329         """wait for CFLOW packet and verify its correctness
330
331         :param timeout: how long to wait
332
333         """
334         self.logger.info("IPFIX: Waiting for CFLOW packet")
335         # self.logger.debug(self.vapi.ppcli("show flow table"))
336         p = collector_intf.wait_for_packet(timeout=timeout)
337         self.assertEqual(p[Set].setID, set_id)
338         # self.logger.debug(self.vapi.ppcli("show flow table"))
339         self.logger.debug(ppp("IPFIX: Got packet:", p))
340         return p
341
342
343 @tag_run_solo
344 @tag_fixme_vpp_workers
345 @tag_fixme_ubuntu2204
346 @tag_fixme_debian11
347 class Flowprobe(MethodHolder):
348     """Template verification, timer tests"""
349
350     @classmethod
351     def setUpClass(cls):
352         super(Flowprobe, cls).setUpClass()
353
354     @classmethod
355     def tearDownClass(cls):
356         super(Flowprobe, cls).tearDownClass()
357
358     def test_0001(self):
359         """timer less than template timeout"""
360         self.logger.info("FFP_TEST_START_0001")
361         self.pg_enable_capture(self.pg_interfaces)
362         self.pkts = []
363
364         ipfix = VppCFLOW(test=self, active=2)
365         ipfix.add_vpp_config()
366
367         ipfix_decoder = IPFIXDecoder()
368         # template packet should arrive immediately
369         templates = ipfix.verify_templates(ipfix_decoder)
370
371         self.create_stream(packets=1)
372         self.send_packets()
373         capture = self.pg2.get_capture(1)
374
375         # make sure the one packet we expect actually showed up
376         cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
377         self.verify_cflow_data(ipfix_decoder, capture, cflow)
378
379         ipfix.remove_vpp_config()
380         self.logger.info("FFP_TEST_FINISH_0001")
381
382     def test_0002(self):
383         """timer greater than template timeout"""
384         self.logger.info("FFP_TEST_START_0002")
385         self.pg_enable_capture(self.pg_interfaces)
386         self.pkts = []
387
388         ipfix = VppCFLOW(test=self, timeout=3, active=4)
389         ipfix.add_vpp_config()
390
391         ipfix_decoder = IPFIXDecoder()
392         # template packet should arrive immediately
393         ipfix.verify_templates()
394
395         self.create_stream(packets=2)
396         self.send_packets()
397         capture = self.pg2.get_capture(2)
398
399         # next set of template packet should arrive after 20 seconds
400         # template packet should arrive within 20 s
401         templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
402
403         # make sure the one packet we expect actually showed up
404         cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
405         self.verify_cflow_data(ipfix_decoder, capture, cflow)
406
407         ipfix.remove_vpp_config()
408         self.logger.info("FFP_TEST_FINISH_0002")
409
410     def test_cflow_packet(self):
411         """verify cflow packet fields"""
412         self.logger.info("FFP_TEST_START_0000")
413         self.pg_enable_capture(self.pg_interfaces)
414         self.pkts = []
415
416         ipfix = VppCFLOW(
417             test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
418         )
419         ipfix.add_vpp_config()
420
421         route_9001 = VppIpRoute(
422             self,
423             "9.0.0.0",
424             24,
425             [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
426         )
427         route_9001.add_vpp_config()
428
429         ipfix_decoder = IPFIXDecoder()
430         templates = ipfix.verify_templates(ipfix_decoder, count=1)
431
432         self.pkts = [
433             (
434                 Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
435                 / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
436                 / TCP(sport=1234, dport=4321, flags=80)
437                 / Raw(b"\xa5" * 100)
438             )
439         ]
440
441         nowUTC = int(time.time())
442         nowUNIX = nowUTC + 2208988800
443         self.send_packets(src_if=self.pg7, dst_if=self.pg8)
444
445         cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
446         self.collector.get_capture(2)
447
448         if cflow[0].haslayer(IPFIX):
449             self.assertEqual(cflow[IPFIX].version, 10)
450             self.assertEqual(cflow[IPFIX].observationDomainID, 1)
451             self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
452             self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
453         if cflow.haslayer(Data):
454             record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
455             # ingress interface
456             self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
457             # egress interface
458             self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
459             # direction
460             self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
461             # packets
462             self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
463             # src mac
464             self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
465             # dst mac
466             self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
467             flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
468             # flow start timestamp
469             self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
470             flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
471             # flow end timestamp
472             self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
473             # ethernet type
474             self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
475             # src ip
476             self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
477             # dst ip
478             self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
479             # protocol (TCP)
480             self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
481             # src port
482             self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
483             # dst port
484             self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
485             # tcp flags
486             self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
487
488         ipfix.remove_vpp_config()
489         self.logger.info("FFP_TEST_FINISH_0000")
490
491     def test_interface_dump(self):
492         """Dump interfaces with IPFIX flow record generation enabled"""
493         self.logger.info("FFP_TEST_START_0003")
494
495         # Enable feature for 3 interfaces
496         ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx")
497         ipfix1.add_vpp_config()
498
499         ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx")
500         ipfix2.enable_flowprobe_feature()
501
502         ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both")
503         ipfix3.enable_flowprobe_feature()
504
505         # When request "all", dump should contain all enabled interfaces
506         dump = self.vapi.flowprobe_interface_dump()
507         self.assertEqual(len(dump), 3)
508
509         # Verify 1st interface
510         self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index)
511         self.assertEqual(
512             dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2
513         )
514         self.assertEqual(
515             dump[0].direction,
516             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
517         )
518
519         # Verify 2nd interface
520         self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index)
521         self.assertEqual(
522             dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
523         )
524         self.assertEqual(
525             dump[1].direction,
526             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
527         )
528
529         # Verify 3rd interface
530         self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index)
531         self.assertEqual(
532             dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6
533         )
534         self.assertEqual(
535             dump[2].direction,
536             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
537         )
538
539         # When request 2nd interface, dump should contain only the specified interface
540         dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index)
541         self.assertEqual(len(dump), 1)
542
543         # Verify 2nd interface
544         self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index)
545         self.assertEqual(
546             dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
547         )
548         self.assertEqual(
549             dump[0].direction,
550             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
551         )
552
553         # When request 99th interface, dump should be empty
554         dump = self.vapi.flowprobe_interface_dump(sw_if_index=99)
555         self.assertEqual(len(dump), 0)
556
557         ipfix1.remove_vpp_config()
558         ipfix2.remove_vpp_config()
559         ipfix3.remove_vpp_config()
560         self.logger.info("FFP_TEST_FINISH_0003")
561
562     def test_get_params(self):
563         """Get IPFIX flow record generation parameters"""
564         self.logger.info("FFP_TEST_START_0004")
565
566         # Enable feature for an interface with custom parameters
567         ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4")
568         ipfix.add_vpp_config()
569
570         # Get and verify parameters
571         params = self.vapi.flowprobe_get_params()
572         self.assertEqual(params.active_timer, 20)
573         self.assertEqual(params.passive_timer, 40)
574         record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
575         record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
576         record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
577         self.assertEqual(params.record_flags, record_flags)
578
579         ipfix.remove_vpp_config()
580         self.logger.info("FFP_TEST_FINISH_0004")
581
582
583 class DatapathTestsHolder(object):
584     """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
585
586     @classmethod
587     def setUpClass(cls):
588         super(DatapathTestsHolder, cls).setUpClass()
589
590     @classmethod
591     def tearDownClass(cls):
592         super(DatapathTestsHolder, cls).tearDownClass()
593
594     def test_templatesL2(self):
595         """verify template on L2 datapath"""
596         self.logger.info("FFP_TEST_START_0000")
597         self.pg_enable_capture(self.pg_interfaces)
598
599         ipfix = VppCFLOW(
600             test=self, intf=self.intf1, layer="l2", direction=self.direction
601         )
602         ipfix.add_vpp_config()
603
604         # template packet should arrive immediately
605         self.vapi.ipfix_flush()
606         ipfix.verify_templates(timeout=3, count=1)
607         self.collector.get_capture(1)
608
609         ipfix.remove_vpp_config()
610         self.logger.info("FFP_TEST_FINISH_0000")
611
612     def test_L2onL2(self):
613         """L2 data on L2 datapath"""
614         self.logger.info("FFP_TEST_START_0001")
615         self.pg_enable_capture(self.pg_interfaces)
616         self.pkts = []
617
618         ipfix = VppCFLOW(
619             test=self, intf=self.intf1, layer="l2", direction=self.direction
620         )
621         ipfix.add_vpp_config()
622
623         ipfix_decoder = IPFIXDecoder()
624         # template packet should arrive immediately
625         templates = ipfix.verify_templates(ipfix_decoder, count=1)
626
627         self.create_stream(packets=1)
628         capture = self.send_packets()
629
630         # make sure the one packet we expect actually showed up
631         self.vapi.ipfix_flush()
632         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
633         self.verify_cflow_data_detail(
634             ipfix_decoder,
635             capture,
636             cflow,
637             {2: "packets", 256: 8, 61: (self.direction == "tx")},
638         )
639         self.collector.get_capture(2)
640
641         ipfix.remove_vpp_config()
642         self.logger.info("FFP_TEST_FINISH_0001")
643
644     def test_L3onL2(self):
645         """L3 data on L2 datapath"""
646         self.logger.info("FFP_TEST_START_0002")
647         self.pg_enable_capture(self.pg_interfaces)
648         self.pkts = []
649
650         ipfix = VppCFLOW(
651             test=self, intf=self.intf1, layer="l3", direction=self.direction
652         )
653         ipfix.add_vpp_config()
654
655         ipfix_decoder = IPFIXDecoder()
656         # template packet should arrive immediately
657         templates = ipfix.verify_templates(ipfix_decoder, count=2)
658
659         self.create_stream(packets=1)
660         capture = self.send_packets()
661
662         # make sure the one packet we expect actually showed up
663         self.vapi.ipfix_flush()
664         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
665         self.verify_cflow_data_detail(
666             ipfix_decoder,
667             capture,
668             cflow,
669             {
670                 2: "packets",
671                 4: 17,
672                 8: "src_ip",
673                 12: "dst_ip",
674                 61: (self.direction == "tx"),
675             },
676         )
677
678         self.collector.get_capture(3)
679
680         ipfix.remove_vpp_config()
681         self.logger.info("FFP_TEST_FINISH_0002")
682
683     def test_L4onL2(self):
684         """L4 data on L2 datapath"""
685         self.logger.info("FFP_TEST_START_0003")
686         self.pg_enable_capture(self.pg_interfaces)
687         self.pkts = []
688
689         ipfix = VppCFLOW(
690             test=self, intf=self.intf1, layer="l4", direction=self.direction
691         )
692         ipfix.add_vpp_config()
693
694         ipfix_decoder = IPFIXDecoder()
695         # template packet should arrive immediately
696         templates = ipfix.verify_templates(ipfix_decoder, count=2)
697
698         self.create_stream(packets=1)
699         capture = self.send_packets()
700
701         # make sure the one packet we expect actually showed up
702         self.vapi.ipfix_flush()
703         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
704         self.verify_cflow_data_detail(
705             ipfix_decoder,
706             capture,
707             cflow,
708             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
709         )
710
711         self.collector.get_capture(3)
712
713         ipfix.remove_vpp_config()
714         self.logger.info("FFP_TEST_FINISH_0003")
715
716     def test_templatesIp4(self):
717         """verify templates on IP4 datapath"""
718         self.logger.info("FFP_TEST_START_0000")
719
720         self.pg_enable_capture(self.pg_interfaces)
721
722         ipfix = VppCFLOW(
723             test=self, intf=self.intf1, datapath="ip4", direction=self.direction
724         )
725         ipfix.add_vpp_config()
726
727         # template packet should arrive immediately
728         self.vapi.ipfix_flush()
729         ipfix.verify_templates(timeout=3, count=1)
730         self.collector.get_capture(1)
731
732         ipfix.remove_vpp_config()
733
734         self.logger.info("FFP_TEST_FINISH_0000")
735
736     def test_L2onIP4(self):
737         """L2 data on IP4 datapath"""
738         self.logger.info("FFP_TEST_START_0001")
739         self.pg_enable_capture(self.pg_interfaces)
740         self.pkts = []
741
742         ipfix = VppCFLOW(
743             test=self,
744             intf=self.intf2,
745             layer="l2",
746             datapath="ip4",
747             direction=self.direction,
748         )
749         ipfix.add_vpp_config()
750
751         ipfix_decoder = IPFIXDecoder()
752         # template packet should arrive immediately
753         templates = ipfix.verify_templates(ipfix_decoder, count=1)
754
755         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
756         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
757
758         # make sure the one packet we expect actually showed up
759         self.vapi.ipfix_flush()
760         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
761         self.verify_cflow_data_detail(
762             ipfix_decoder,
763             capture,
764             cflow,
765             {2: "packets", 256: 8, 61: (self.direction == "tx")},
766         )
767
768         # expected two templates and one cflow packet
769         self.collector.get_capture(2)
770
771         ipfix.remove_vpp_config()
772         self.logger.info("FFP_TEST_FINISH_0001")
773
774     def test_L3onIP4(self):
775         """L3 data on IP4 datapath"""
776         self.logger.info("FFP_TEST_START_0002")
777         self.pg_enable_capture(self.pg_interfaces)
778         self.pkts = []
779
780         ipfix = VppCFLOW(
781             test=self,
782             intf=self.intf2,
783             layer="l3",
784             datapath="ip4",
785             direction=self.direction,
786         )
787         ipfix.add_vpp_config()
788
789         ipfix_decoder = IPFIXDecoder()
790         # template packet should arrive immediately
791         templates = ipfix.verify_templates(ipfix_decoder, count=1)
792
793         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
794         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
795
796         # make sure the one packet we expect actually showed up
797         self.vapi.ipfix_flush()
798         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
799         self.verify_cflow_data_detail(
800             ipfix_decoder,
801             capture,
802             cflow,
803             {
804                 1: "octets",
805                 2: "packets",
806                 8: "src_ip",
807                 12: "dst_ip",
808                 61: (self.direction == "tx"),
809             },
810         )
811
812         # expected two templates and one cflow packet
813         self.collector.get_capture(2)
814
815         ipfix.remove_vpp_config()
816         self.logger.info("FFP_TEST_FINISH_0002")
817
818     def test_L4onIP4(self):
819         """L4 data on IP4 datapath"""
820         self.logger.info("FFP_TEST_START_0003")
821         self.pg_enable_capture(self.pg_interfaces)
822         self.pkts = []
823
824         ipfix = VppCFLOW(
825             test=self,
826             intf=self.intf2,
827             layer="l4",
828             datapath="ip4",
829             direction=self.direction,
830         )
831         ipfix.add_vpp_config()
832
833         ipfix_decoder = IPFIXDecoder()
834         # template packet should arrive immediately
835         templates = ipfix.verify_templates(ipfix_decoder, count=1)
836
837         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
838         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
839
840         # make sure the one packet we expect actually showed up
841         self.vapi.ipfix_flush()
842         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
843         self.verify_cflow_data_detail(
844             ipfix_decoder,
845             capture,
846             cflow,
847             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
848         )
849
850         # expected two templates and one cflow packet
851         self.collector.get_capture(2)
852
853         ipfix.remove_vpp_config()
854         self.logger.info("FFP_TEST_FINISH_0003")
855
856     def test_templatesIP6(self):
857         """verify templates on IP6 datapath"""
858         self.logger.info("FFP_TEST_START_0000")
859         self.pg_enable_capture(self.pg_interfaces)
860
861         ipfix = VppCFLOW(
862             test=self, intf=self.intf1, datapath="ip6", direction=self.direction
863         )
864         ipfix.add_vpp_config()
865
866         # template packet should arrive immediately
867         ipfix.verify_templates(count=1)
868         self.collector.get_capture(1)
869
870         ipfix.remove_vpp_config()
871
872         self.logger.info("FFP_TEST_FINISH_0000")
873
874     def test_L2onIP6(self):
875         """L2 data on IP6 datapath"""
876         self.logger.info("FFP_TEST_START_0001")
877         self.pg_enable_capture(self.pg_interfaces)
878         self.pkts = []
879
880         ipfix = VppCFLOW(
881             test=self,
882             intf=self.intf3,
883             layer="l2",
884             datapath="ip6",
885             direction=self.direction,
886         )
887         ipfix.add_vpp_config()
888
889         ipfix_decoder = IPFIXDecoder()
890         # template packet should arrive immediately
891         templates = ipfix.verify_templates(ipfix_decoder, count=1)
892
893         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
894         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
895
896         # make sure the one packet we expect actually showed up
897         self.vapi.ipfix_flush()
898         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
899         self.verify_cflow_data_detail(
900             ipfix_decoder,
901             capture,
902             cflow,
903             {2: "packets", 256: 56710, 61: (self.direction == "tx")},
904             ip_ver="v6",
905         )
906
907         # expected two templates and one cflow packet
908         self.collector.get_capture(2)
909
910         ipfix.remove_vpp_config()
911         self.logger.info("FFP_TEST_FINISH_0001")
912
913     def test_L3onIP6(self):
914         """L3 data on IP6 datapath"""
915         self.logger.info("FFP_TEST_START_0002")
916         self.pg_enable_capture(self.pg_interfaces)
917         self.pkts = []
918
919         ipfix = VppCFLOW(
920             test=self,
921             intf=self.intf3,
922             layer="l3",
923             datapath="ip6",
924             direction=self.direction,
925         )
926         ipfix.add_vpp_config()
927
928         ipfix_decoder = IPFIXDecoder()
929         # template packet should arrive immediately
930         templates = ipfix.verify_templates(ipfix_decoder, count=1)
931
932         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
933         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
934
935         # make sure the one packet we expect actually showed up
936         self.vapi.ipfix_flush()
937         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
938         self.verify_cflow_data_detail(
939             ipfix_decoder,
940             capture,
941             cflow,
942             {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
943             ip_ver="v6",
944         )
945
946         # expected two templates and one cflow packet
947         self.collector.get_capture(2)
948
949         ipfix.remove_vpp_config()
950         self.logger.info("FFP_TEST_FINISH_0002")
951
952     def test_L4onIP6(self):
953         """L4 data on IP6 datapath"""
954         self.logger.info("FFP_TEST_START_0003")
955         self.pg_enable_capture(self.pg_interfaces)
956         self.pkts = []
957
958         ipfix = VppCFLOW(
959             test=self,
960             intf=self.intf3,
961             layer="l4",
962             datapath="ip6",
963             direction=self.direction,
964         )
965         ipfix.add_vpp_config()
966
967         ipfix_decoder = IPFIXDecoder()
968         # template packet should arrive immediately
969         templates = ipfix.verify_templates(ipfix_decoder, count=1)
970
971         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
972         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
973
974         # make sure the one packet we expect actually showed up
975         self.vapi.ipfix_flush()
976         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
977         self.verify_cflow_data_detail(
978             ipfix_decoder,
979             capture,
980             cflow,
981             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
982             ip_ver="v6",
983         )
984
985         # expected two templates and one cflow packet
986         self.collector.get_capture(2)
987
988         ipfix.remove_vpp_config()
989         self.logger.info("FFP_TEST_FINISH_0003")
990
991     def test_0001(self):
992         """no timers, one CFLOW packet, 9 Flows inside"""
993         self.logger.info("FFP_TEST_START_0001")
994         self.pg_enable_capture(self.pg_interfaces)
995         self.pkts = []
996
997         ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
998         ipfix.add_vpp_config()
999
1000         ipfix_decoder = IPFIXDecoder()
1001         # template packet should arrive immediately
1002         templates = ipfix.verify_templates(ipfix_decoder)
1003
1004         self.create_stream(packets=9)
1005         capture = self.send_packets()
1006
1007         # make sure the one packet we expect actually showed up
1008         self.vapi.ipfix_flush()
1009         cflow = self.wait_for_cflow_packet(self.collector, templates[1])
1010         self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
1011         self.collector.get_capture(4)
1012
1013         ipfix.remove_vpp_config()
1014         self.logger.info("FFP_TEST_FINISH_0001")
1015
1016     def test_0002(self):
1017         """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
1018         self.logger.info("FFP_TEST_START_0002")
1019         self.pg_enable_capture(self.pg_interfaces)
1020         self.pkts = []
1021
1022         ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
1023         ipfix.add_vpp_config()
1024
1025         ipfix_decoder = IPFIXDecoder()
1026         # template packet should arrive immediately
1027         self.vapi.ipfix_flush()
1028         templates = ipfix.verify_templates(ipfix_decoder)
1029
1030         self.create_stream(packets=6)
1031         capture = self.send_packets()
1032
1033         # make sure the one packet we expect actually showed up
1034         cflows = []
1035         self.vapi.ipfix_flush()
1036         cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1037         cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1038         self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
1039         self.collector.get_capture(5)
1040
1041         ipfix.remove_vpp_config()
1042         self.logger.info("FFP_TEST_FINISH_0002")
1043
1044
1045 @tag_fixme_vpp_workers
1046 class DatapathTx(MethodHolder, DatapathTestsHolder):
1047     """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
1048
1049     intf1 = "pg2"
1050     intf2 = "pg4"
1051     intf3 = "pg6"
1052     direction = "tx"
1053
1054
1055 @tag_fixme_vpp_workers
1056 class DatapathRx(MethodHolder, DatapathTestsHolder):
1057     """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
1058
1059     intf1 = "pg1"
1060     intf2 = "pg3"
1061     intf3 = "pg5"
1062     direction = "rx"
1063
1064
1065 @unittest.skipUnless(config.extended, "part of extended tests")
1066 class DisableIPFIX(MethodHolder):
1067     """Disable IPFIX"""
1068
1069     @classmethod
1070     def setUpClass(cls):
1071         super(DisableIPFIX, cls).setUpClass()
1072
1073     @classmethod
1074     def tearDownClass(cls):
1075         super(DisableIPFIX, cls).tearDownClass()
1076
1077     def test_0001(self):
1078         """disable IPFIX after first packets"""
1079         self.logger.info("FFP_TEST_START_0001")
1080         self.pg_enable_capture(self.pg_interfaces)
1081         self.pkts = []
1082
1083         ipfix = VppCFLOW(test=self)
1084         ipfix.add_vpp_config()
1085
1086         ipfix_decoder = IPFIXDecoder()
1087         # template packet should arrive immediately
1088         templates = ipfix.verify_templates(ipfix_decoder)
1089
1090         self.create_stream()
1091         self.send_packets()
1092
1093         # make sure the one packet we expect actually showed up
1094         self.vapi.ipfix_flush()
1095         self.wait_for_cflow_packet(self.collector, templates[1])
1096         self.collector.get_capture(4)
1097
1098         # disable IPFIX
1099         ipfix.disable_exporter()
1100         self.pg_enable_capture([self.collector])
1101
1102         self.send_packets()
1103
1104         # make sure no one packet arrived in 1 minute
1105         self.vapi.ipfix_flush()
1106         self.sleep(1, "wait before verifying no packets sent")
1107         self.collector.assert_nothing_captured()
1108
1109         ipfix.remove_vpp_config()
1110         self.logger.info("FFP_TEST_FINISH_0001")
1111
1112
1113 @unittest.skipUnless(config.extended, "part of extended tests")
1114 class ReenableIPFIX(MethodHolder):
1115     """Re-enable IPFIX"""
1116
1117     @classmethod
1118     def setUpClass(cls):
1119         super(ReenableIPFIX, cls).setUpClass()
1120
1121     @classmethod
1122     def tearDownClass(cls):
1123         super(ReenableIPFIX, cls).tearDownClass()
1124
1125     def test_0011(self):
1126         """disable IPFIX after first packets and re-enable after few packets"""
1127         self.logger.info("FFP_TEST_START_0001")
1128         self.pg_enable_capture(self.pg_interfaces)
1129         self.pkts = []
1130
1131         ipfix = VppCFLOW(test=self)
1132         ipfix.add_vpp_config()
1133
1134         ipfix_decoder = IPFIXDecoder()
1135         # template packet should arrive immediately
1136         templates = ipfix.verify_templates(ipfix_decoder)
1137
1138         self.create_stream(packets=5)
1139         self.send_packets()
1140
1141         # make sure the one packet we expect actually showed up
1142         self.vapi.ipfix_flush()
1143         self.wait_for_cflow_packet(self.collector, templates[1])
1144         self.collector.get_capture(4)
1145
1146         # disable IPFIX
1147         ipfix.disable_exporter()
1148         self.vapi.ipfix_flush()
1149         self.pg_enable_capture([self.collector])
1150
1151         self.send_packets()
1152
1153         # make sure no one packet arrived in active timer span
1154         self.vapi.ipfix_flush()
1155         self.sleep(1, "wait before verifying no packets sent")
1156         self.collector.assert_nothing_captured()
1157         self.pg2.get_capture(5)
1158
1159         # enable IPFIX
1160         ipfix.enable_exporter()
1161
1162         capture = self.collector.get_capture(4)
1163         nr_templates = 0
1164         nr_data = 0
1165         for p in capture:
1166             self.assertTrue(p.haslayer(IPFIX))
1167             if p.haslayer(Template):
1168                 nr_templates += 1
1169         self.assertTrue(nr_templates, 3)
1170         for p in capture:
1171             self.assertTrue(p.haslayer(IPFIX))
1172             if p.haslayer(Data):
1173                 nr_data += 1
1174         self.assertTrue(nr_templates, 1)
1175
1176         ipfix.remove_vpp_config()
1177         self.logger.info("FFP_TEST_FINISH_0001")
1178
1179
1180 @unittest.skipUnless(config.extended, "part of extended tests")
1181 class DisableFP(MethodHolder):
1182     """Disable Flowprobe feature"""
1183
1184     @classmethod
1185     def setUpClass(cls):
1186         super(DisableFP, cls).setUpClass()
1187
1188     @classmethod
1189     def tearDownClass(cls):
1190         super(DisableFP, cls).tearDownClass()
1191
1192     def test_0001(self):
1193         """disable flowprobe feature after first packets"""
1194         self.logger.info("FFP_TEST_START_0001")
1195         self.pg_enable_capture(self.pg_interfaces)
1196         self.pkts = []
1197         ipfix = VppCFLOW(test=self)
1198         ipfix.add_vpp_config()
1199
1200         ipfix_decoder = IPFIXDecoder()
1201         # template packet should arrive immediately
1202         templates = ipfix.verify_templates(ipfix_decoder)
1203
1204         self.create_stream()
1205         self.send_packets()
1206
1207         # make sure the one packet we expect actually showed up
1208         self.vapi.ipfix_flush()
1209         self.wait_for_cflow_packet(self.collector, templates[1])
1210         self.collector.get_capture(4)
1211
1212         # disable IPFIX
1213         ipfix.disable_flowprobe_feature()
1214         self.pg_enable_capture([self.collector])
1215
1216         self.send_packets()
1217
1218         # make sure no one packet arrived in active timer span
1219         self.vapi.ipfix_flush()
1220         self.sleep(1, "wait before verifying no packets sent")
1221         self.collector.assert_nothing_captured()
1222
1223         ipfix.remove_vpp_config()
1224         self.logger.info("FFP_TEST_FINISH_0001")
1225
1226
1227 @unittest.skipUnless(config.extended, "part of extended tests")
1228 class ReenableFP(MethodHolder):
1229     """Re-enable Flowprobe feature"""
1230
1231     @classmethod
1232     def setUpClass(cls):
1233         super(ReenableFP, cls).setUpClass()
1234
1235     @classmethod
1236     def tearDownClass(cls):
1237         super(ReenableFP, cls).tearDownClass()
1238
1239     def test_0001(self):
1240         """disable flowprobe feature after first packets and re-enable
1241         after few packets"""
1242         self.logger.info("FFP_TEST_START_0001")
1243         self.pg_enable_capture(self.pg_interfaces)
1244         self.pkts = []
1245
1246         ipfix = VppCFLOW(test=self)
1247         ipfix.add_vpp_config()
1248
1249         ipfix_decoder = IPFIXDecoder()
1250         # template packet should arrive immediately
1251         self.vapi.ipfix_flush()
1252         templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1253
1254         self.create_stream()
1255         self.send_packets()
1256
1257         # make sure the one packet we expect actually showed up
1258         self.vapi.ipfix_flush()
1259         self.wait_for_cflow_packet(self.collector, templates[1], 5)
1260         self.collector.get_capture(4)
1261
1262         # disable FPP feature
1263         ipfix.disable_flowprobe_feature()
1264         self.pg_enable_capture([self.collector])
1265
1266         self.send_packets()
1267
1268         # make sure no one packet arrived in active timer span
1269         self.vapi.ipfix_flush()
1270         self.sleep(5, "wait before verifying no packets sent")
1271         self.collector.assert_nothing_captured()
1272
1273         # enable FPP feature
1274         ipfix.enable_flowprobe_feature()
1275         self.vapi.ipfix_flush()
1276         templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1277
1278         self.send_packets()
1279
1280         # make sure the next packets (templates and data) we expect actually
1281         # showed up
1282         self.vapi.ipfix_flush()
1283         self.wait_for_cflow_packet(self.collector, templates[1], 5)
1284         self.collector.get_capture(4)
1285
1286         ipfix.remove_vpp_config()
1287         self.logger.info("FFP_TEST_FINISH_0001")
1288
1289
1290 if __name__ == "__main__":
1291     unittest.main(testRunner=VppTestRunner)