hsa: fix coverity warning
[vpp.git] / test / test_flowprobe.py
1 #!/usr/bin/env python3
2 from __future__ import print_function
3 import binascii
4 import random
5 import socket
6 import unittest
7 import time
8 import re
9
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, TCP, UDP
13 from scapy.layers.inet6 import IPv6
14 from scapy.contrib.lacp import SlowProtocol, LACP
15
16 from config import config
17 from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11
18 from framework import is_distro_ubuntu2204, is_distro_debian11
19 from framework import VppTestCase, VppTestRunner
20 from framework import tag_run_solo
21 from vpp_object import VppObject
22 from vpp_pg_interface import CaptureTimeoutError
23 from util import ppp
24 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
25 from vpp_ip_route import VppIpRoute, VppRoutePath
26 from vpp_papi.macaddress import mac_ntop
27 from socket import inet_ntop
28 from vpp_papi import VppEnum
29
30
31 TMPL_COMMON_FIELD_COUNT = 6
32 TMPL_L2_FIELD_COUNT = 3
33 TMPL_L3_FIELD_COUNT = 4
34 TMPL_L4_FIELD_COUNT = 3
35
36
37 class VppCFLOW(VppObject):
38     """CFLOW object for IPFIX exporter and Flowprobe feature"""
39
40     def __init__(
41         self,
42         test,
43         intf="pg2",
44         active=0,
45         passive=0,
46         timeout=100,
47         mtu=1024,
48         datapath="l2",
49         layer="l2 l3 l4",
50         direction="tx",
51     ):
52         self._test = test
53         self._intf = intf
54         self._intf_obj = getattr(self._test, intf)
55         self._active = active
56         if passive == 0 or passive < active:
57             self._passive = active + 1
58         else:
59             self._passive = passive
60         self._datapath = datapath  # l2 ip4 ip6
61         self._collect = layer  # l2 l3 l4
62         self._direction = direction  # rx tx both
63         self._timeout = timeout
64         self._mtu = mtu
65         self._configured = False
66
67     def add_vpp_config(self):
68         self.enable_exporter()
69         l2_flag = 0
70         l3_flag = 0
71         l4_flag = 0
72         if "l2" in self._collect.lower():
73             l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
74         if "l3" in self._collect.lower():
75             l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
76         if "l4" in self._collect.lower():
77             l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
78         self._test.vapi.flowprobe_set_params(
79             record_flags=(l2_flag | l3_flag | l4_flag),
80             active_timer=self._active,
81             passive_timer=self._passive,
82         )
83         self.enable_flowprobe_feature()
84         self._test.vapi.cli("ipfix flush")
85         self._configured = True
86
87     def remove_vpp_config(self):
88         self.disable_exporter()
89         self.disable_flowprobe_feature()
90         self._test.vapi.cli("ipfix flush")
91         self._configured = False
92
93     def enable_exporter(self):
94         self._test.vapi.set_ipfix_exporter(
95             collector_address=self._test.pg0.remote_ip4,
96             src_address=self._test.pg0.local_ip4,
97             path_mtu=self._mtu,
98             template_interval=self._timeout,
99         )
100
101     def _enable_disable_flowprobe_feature(self, is_add):
102         which_map = {
103             "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
104             "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
105             "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
106         }
107         direction_map = {
108             "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
109             "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
110             "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
111         }
112         self._test.vapi.flowprobe_interface_add_del(
113             is_add=is_add,
114             which=which_map[self._datapath],
115             direction=direction_map[self._direction],
116             sw_if_index=self._intf_obj.sw_if_index,
117         )
118
119     def enable_flowprobe_feature(self):
120         self._enable_disable_flowprobe_feature(is_add=True)
121
122     def disable_exporter(self):
123         self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
124
125     def disable_flowprobe_feature(self):
126         self._enable_disable_flowprobe_feature(is_add=False)
127
128     def object_id(self):
129         return "ipfix-collector-%s-%s" % (self._src, self.dst)
130
131     def query_vpp_config(self):
132         return self._configured
133
134     def verify_templates(self, decoder=None, timeout=1, count=3, field_count_in=None):
135         templates = []
136         self._test.assertIn(count, (1, 2, 3))
137         for _ in range(count):
138             p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
139             self._test.assertTrue(p.haslayer(IPFIX))
140             self._test.assertTrue(p.haslayer(Template))
141             if decoder is not None:
142                 templates.append(p[Template].templateID)
143                 decoder.add_template(p.getlayer(Template))
144             if field_count_in is not None:
145                 self._test.assertIn(p[Template].fieldCount, field_count_in)
146         return templates
147
148
149 class MethodHolder(VppTestCase):
150     """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
151
152     # Test variables
153     debug_print = False
154     max_number_of_packets = 10
155     pkts = []
156
157     @classmethod
158     def setUpClass(cls):
159         """
160         Perform standard class setup (defined by class method setUpClass in
161         class VppTestCase) before running the test case, set test case related
162         variables and configure VPP.
163         """
164         super(MethodHolder, cls).setUpClass()
165         if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
166             cls, "vpp"
167         ):
168             return
169         try:
170             # Create pg interfaces
171             cls.create_pg_interfaces(range(9))
172
173             # Packet sizes
174             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
175
176             # Create BD with MAC learning and unknown unicast flooding disabled
177             # and put interfaces to this BD
178             cls.vapi.bridge_domain_add_del_v2(
179                 bd_id=1, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
180             )
181             cls.vapi.sw_interface_set_l2_bridge(
182                 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
183             )
184             cls.vapi.sw_interface_set_l2_bridge(
185                 rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
186             )
187
188             # Set up all interfaces
189             for i in cls.pg_interfaces:
190                 i.admin_up()
191
192             cls.pg0.config_ip4()
193             cls.pg0.configure_ipv4_neighbors()
194             cls.collector = cls.pg0
195
196             cls.pg1.config_ip4()
197             cls.pg1.resolve_arp()
198             cls.pg2.config_ip4()
199             cls.pg2.resolve_arp()
200             cls.pg3.config_ip4()
201             cls.pg3.resolve_arp()
202             cls.pg4.config_ip4()
203             cls.pg4.resolve_arp()
204             cls.pg7.config_ip4()
205             cls.pg8.config_ip4()
206             cls.pg8.configure_ipv4_neighbors()
207
208             cls.pg5.config_ip6()
209             cls.pg5.resolve_ndp()
210             cls.pg5.disable_ipv6_ra()
211             cls.pg6.config_ip6()
212             cls.pg6.resolve_ndp()
213             cls.pg6.disable_ipv6_ra()
214         except Exception:
215             super(MethodHolder, cls).tearDownClass()
216             raise
217
218     @classmethod
219     def tearDownClass(cls):
220         super(MethodHolder, cls).tearDownClass()
221
222     def create_stream(
223         self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
224     ):
225         """Create a packet stream to tickle the plugin
226
227         :param VppInterface src_if: Source interface for packet stream
228         :param VppInterface src_if: Dst interface for packet stream
229         """
230         if src_if is None:
231             src_if = self.pg1
232         if dst_if is None:
233             dst_if = self.pg2
234         self.pkts = []
235         if packets is None:
236             packets = random.randint(1, self.max_number_of_packets)
237         pkt_size = size
238         for p in range(0, packets):
239             if size is None:
240                 pkt_size = random.choice(self.pg_if_packet_sizes)
241             info = self.create_packet_info(src_if, dst_if)
242             payload = self.info_to_payload(info)
243             p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
244             if ip_ver == "v4":
245                 p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
246             else:
247                 p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
248             p /= UDP(sport=1234, dport=4321)
249             p /= Raw(payload)
250             info.data = p.copy()
251             self.extend_packet(p, pkt_size)
252             self.pkts.append(p)
253
254     def verify_cflow_data(self, decoder, capture, cflow):
255         octets = 0
256         packets = 0
257         for p in capture:
258             octets += p[IP].len
259             packets += 1
260         if cflow.haslayer(Data):
261             data = decoder.decode_data_set(cflow.getlayer(Set))
262             for record in data:
263                 self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
264                 self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
265
266     def send_packets(self, src_if=None, dst_if=None):
267         if src_if is None:
268             src_if = self.pg1
269         if dst_if is None:
270             dst_if = self.pg2
271         self.pg_enable_capture([dst_if])
272         src_if.add_stream(self.pkts)
273         self.pg_start()
274         return dst_if.get_capture(len(self.pkts))
275
276     def verify_cflow_data_detail(
277         self,
278         decoder,
279         capture,
280         cflow,
281         data_set={1: "octets", 2: "packets"},
282         ip_ver="v4",
283         field_count=None,
284     ):
285         if self.debug_print:
286             print(capture[0].show())
287         if cflow.haslayer(Data):
288             data = decoder.decode_data_set(cflow.getlayer(Set))
289             if self.debug_print:
290                 print(data)
291             if ip_ver == "v4":
292                 ip_layer = capture[0][IP] if capture[0].haslayer(IP) else None
293             else:
294                 ip_layer = capture[0][IPv6] if capture[0].haslayer(IPv6) else None
295             if data_set is not None:
296                 for record in data:
297                     # skip flow if ingress/egress interface is 0
298                     if int(binascii.hexlify(record[10]), 16) == 0:
299                         continue
300                     if int(binascii.hexlify(record[14]), 16) == 0:
301                         continue
302
303                     for field in data_set:
304                         value = data_set[field]
305                         if value == "octets":
306                             value = ip_layer.len
307                             if ip_ver == "v6":
308                                 value += 40  # ??? is this correct
309                         elif value == "packets":
310                             value = 1
311                         elif value == "src_ip":
312                             if ip_ver == "v4":
313                                 ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
314                             else:
315                                 ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
316                             value = int(binascii.hexlify(ip), 16)
317                         elif value == "dst_ip":
318                             if ip_ver == "v4":
319                                 ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
320                             else:
321                                 ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
322                             value = int(binascii.hexlify(ip), 16)
323                         elif value == "sport":
324                             value = int(capture[0][UDP].sport)
325                         elif value == "dport":
326                             value = int(capture[0][UDP].dport)
327                         self.assertEqual(
328                             int(binascii.hexlify(record[field]), 16), value
329                         )
330             if field_count is not None:
331                 for record in data:
332                     self.assertEqual(len(record), field_count)
333
334     def verify_cflow_data_notimer(self, decoder, capture, cflows):
335         idx = 0
336         for cflow in cflows:
337             if cflow.haslayer(Data):
338                 data = decoder.decode_data_set(cflow.getlayer(Set))
339             else:
340                 raise Exception("No CFLOW data")
341
342             for rec in data:
343                 p = capture[idx]
344                 idx += 1
345                 self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
346                 self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
347         self.assertEqual(len(capture), idx)
348
349     def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
350         """wait for CFLOW packet and verify its correctness
351
352         :param timeout: how long to wait
353
354         """
355         self.logger.info("IPFIX: Waiting for CFLOW packet")
356         # self.logger.debug(self.vapi.ppcli("show flow table"))
357         p = collector_intf.wait_for_packet(timeout=timeout)
358         self.assertEqual(p[Set].setID, set_id)
359         # self.logger.debug(self.vapi.ppcli("show flow table"))
360         self.logger.debug(ppp("IPFIX: Got packet:", p))
361         return p
362
363
364 @tag_run_solo
365 @tag_fixme_vpp_workers
366 @tag_fixme_ubuntu2204
367 @tag_fixme_debian11
368 class Flowprobe(MethodHolder):
369     """Template verification, timer tests"""
370
371     @classmethod
372     def setUpClass(cls):
373         super(Flowprobe, cls).setUpClass()
374
375     @classmethod
376     def tearDownClass(cls):
377         super(Flowprobe, cls).tearDownClass()
378
379     def test_0001(self):
380         """timer less than template timeout"""
381         self.logger.info("FFP_TEST_START_0001")
382         self.pg_enable_capture(self.pg_interfaces)
383         self.pkts = []
384
385         ipfix = VppCFLOW(test=self, active=2)
386         ipfix.add_vpp_config()
387
388         ipfix_decoder = IPFIXDecoder()
389         # template packet should arrive immediately
390         templates = ipfix.verify_templates(ipfix_decoder)
391
392         self.create_stream(packets=1)
393         self.send_packets()
394         capture = self.pg2.get_capture(1)
395
396         # make sure the one packet we expect actually showed up
397         cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
398         self.verify_cflow_data(ipfix_decoder, capture, cflow)
399
400         ipfix.remove_vpp_config()
401         self.logger.info("FFP_TEST_FINISH_0001")
402
403     def test_0002(self):
404         """timer greater than template timeout"""
405         self.logger.info("FFP_TEST_START_0002")
406         self.pg_enable_capture(self.pg_interfaces)
407         self.pkts = []
408
409         ipfix = VppCFLOW(test=self, timeout=3, active=4)
410         ipfix.add_vpp_config()
411
412         ipfix_decoder = IPFIXDecoder()
413         # template packet should arrive immediately
414         ipfix.verify_templates()
415
416         self.create_stream(packets=2)
417         self.send_packets()
418         capture = self.pg2.get_capture(2)
419
420         # next set of template packet should arrive after 20 seconds
421         # template packet should arrive within 20 s
422         templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
423
424         # make sure the one packet we expect actually showed up
425         cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
426         self.verify_cflow_data(ipfix_decoder, capture, cflow)
427
428         ipfix.remove_vpp_config()
429         self.logger.info("FFP_TEST_FINISH_0002")
430
431     def test_cflow_packet(self):
432         """verify cflow packet fields"""
433         self.logger.info("FFP_TEST_START_0000")
434         self.pg_enable_capture(self.pg_interfaces)
435         self.pkts = []
436
437         ipfix = VppCFLOW(
438             test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
439         )
440         ipfix.add_vpp_config()
441
442         route_9001 = VppIpRoute(
443             self,
444             "9.0.0.0",
445             24,
446             [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
447         )
448         route_9001.add_vpp_config()
449
450         ipfix_decoder = IPFIXDecoder()
451         templates = ipfix.verify_templates(ipfix_decoder, count=1)
452
453         self.pkts = [
454             (
455                 Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
456                 / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
457                 / TCP(sport=1234, dport=4321, flags=80)
458                 / Raw(b"\xa5" * 100)
459             )
460         ]
461
462         nowUTC = int(time.time())
463         nowUNIX = nowUTC + 2208988800
464         self.send_packets(src_if=self.pg7, dst_if=self.pg8)
465
466         cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
467         self.collector.get_capture(2)
468
469         if cflow[0].haslayer(IPFIX):
470             self.assertEqual(cflow[IPFIX].version, 10)
471             self.assertEqual(cflow[IPFIX].observationDomainID, 1)
472             self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
473             self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
474         if cflow.haslayer(Data):
475             record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
476             # ingress interface
477             self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
478             # egress interface
479             self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
480             # direction
481             self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
482             # packets
483             self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
484             # src mac
485             self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
486             # dst mac
487             self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
488             flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
489             # flow start timestamp
490             self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
491             flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
492             # flow end timestamp
493             self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
494             # ethernet type
495             self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
496             # src ip
497             self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
498             # dst ip
499             self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
500             # protocol (TCP)
501             self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
502             # src port
503             self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
504             # dst port
505             self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
506             # tcp flags
507             self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
508
509         ipfix.remove_vpp_config()
510         self.logger.info("FFP_TEST_FINISH_0000")
511
512     def test_interface_dump(self):
513         """Dump interfaces with IPFIX flow record generation enabled"""
514         self.logger.info("FFP_TEST_START_0003")
515
516         # Enable feature for 3 interfaces
517         ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx")
518         ipfix1.add_vpp_config()
519
520         ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx")
521         ipfix2.enable_flowprobe_feature()
522
523         ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both")
524         ipfix3.enable_flowprobe_feature()
525
526         # When request "all", dump should contain all enabled interfaces
527         dump = self.vapi.flowprobe_interface_dump()
528         self.assertEqual(len(dump), 3)
529
530         # Verify 1st interface
531         self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index)
532         self.assertEqual(
533             dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2
534         )
535         self.assertEqual(
536             dump[0].direction,
537             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
538         )
539
540         # Verify 2nd interface
541         self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index)
542         self.assertEqual(
543             dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
544         )
545         self.assertEqual(
546             dump[1].direction,
547             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
548         )
549
550         # Verify 3rd interface
551         self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index)
552         self.assertEqual(
553             dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6
554         )
555         self.assertEqual(
556             dump[2].direction,
557             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
558         )
559
560         # When request 2nd interface, dump should contain only the specified interface
561         dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index)
562         self.assertEqual(len(dump), 1)
563
564         # Verify 2nd interface
565         self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index)
566         self.assertEqual(
567             dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
568         )
569         self.assertEqual(
570             dump[0].direction,
571             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
572         )
573
574         # When request 99th interface, dump should be empty
575         dump = self.vapi.flowprobe_interface_dump(sw_if_index=99)
576         self.assertEqual(len(dump), 0)
577
578         ipfix1.remove_vpp_config()
579         ipfix2.remove_vpp_config()
580         ipfix3.remove_vpp_config()
581         self.logger.info("FFP_TEST_FINISH_0003")
582
583     def test_get_params(self):
584         """Get IPFIX flow record generation parameters"""
585         self.logger.info("FFP_TEST_START_0004")
586
587         # Enable feature for an interface with custom parameters
588         ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4")
589         ipfix.add_vpp_config()
590
591         # Get and verify parameters
592         params = self.vapi.flowprobe_get_params()
593         self.assertEqual(params.active_timer, 20)
594         self.assertEqual(params.passive_timer, 40)
595         record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
596         record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
597         record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
598         self.assertEqual(params.record_flags, record_flags)
599
600         ipfix.remove_vpp_config()
601         self.logger.info("FFP_TEST_FINISH_0004")
602
603
604 class DatapathTestsHolder(object):
605     """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
606
607     @classmethod
608     def setUpClass(cls):
609         super(DatapathTestsHolder, cls).setUpClass()
610
611     @classmethod
612     def tearDownClass(cls):
613         super(DatapathTestsHolder, cls).tearDownClass()
614
615     def test_templatesL2(self):
616         """verify template on L2 datapath"""
617         self.logger.info("FFP_TEST_START_0000")
618         self.pg_enable_capture(self.pg_interfaces)
619
620         ipfix = VppCFLOW(
621             test=self, intf=self.intf1, layer="l2", direction=self.direction
622         )
623         ipfix.add_vpp_config()
624
625         # template packet should arrive immediately
626         self.vapi.ipfix_flush()
627         ipfix.verify_templates(timeout=3, count=1)
628         self.collector.get_capture(1)
629
630         ipfix.remove_vpp_config()
631         self.logger.info("FFP_TEST_FINISH_0000")
632
633     def test_L2onL2(self):
634         """L2 data on L2 datapath"""
635         self.logger.info("FFP_TEST_START_0001")
636         self.pg_enable_capture(self.pg_interfaces)
637         self.pkts = []
638
639         ipfix = VppCFLOW(
640             test=self, intf=self.intf1, layer="l2", direction=self.direction
641         )
642         ipfix.add_vpp_config()
643
644         ipfix_decoder = IPFIXDecoder()
645         # template packet should arrive immediately
646         templates = ipfix.verify_templates(ipfix_decoder, count=1)
647
648         self.create_stream(packets=1)
649         capture = self.send_packets()
650
651         # make sure the one packet we expect actually showed up
652         self.vapi.ipfix_flush()
653         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
654         self.verify_cflow_data_detail(
655             ipfix_decoder,
656             capture,
657             cflow,
658             {2: "packets", 256: 8, 61: (self.direction == "tx")},
659         )
660         self.collector.get_capture(2)
661
662         ipfix.remove_vpp_config()
663         self.logger.info("FFP_TEST_FINISH_0001")
664
665     def test_L3onL2(self):
666         """L3 data on L2 datapath"""
667         self.logger.info("FFP_TEST_START_0002")
668         self.pg_enable_capture(self.pg_interfaces)
669         self.pkts = []
670
671         ipfix = VppCFLOW(
672             test=self, intf=self.intf1, layer="l3", direction=self.direction
673         )
674         ipfix.add_vpp_config()
675
676         ipfix_decoder = IPFIXDecoder()
677         # template packet should arrive immediately
678         templates = ipfix.verify_templates(ipfix_decoder, count=2)
679
680         self.create_stream(packets=1)
681         capture = self.send_packets()
682
683         # make sure the one packet we expect actually showed up
684         self.vapi.ipfix_flush()
685         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
686         self.verify_cflow_data_detail(
687             ipfix_decoder,
688             capture,
689             cflow,
690             {
691                 2: "packets",
692                 4: 17,
693                 8: "src_ip",
694                 12: "dst_ip",
695                 61: (self.direction == "tx"),
696             },
697         )
698
699         self.collector.get_capture(3)
700
701         ipfix.remove_vpp_config()
702         self.logger.info("FFP_TEST_FINISH_0002")
703
704     def test_L234onL2(self):
705         """L2/3/4 data on L2 datapath"""
706         self.pg_enable_capture(self.pg_interfaces)
707         self.pkts = []
708
709         ipfix = VppCFLOW(
710             test=self, intf=self.intf1, layer="l2 l3 l4", direction=self.direction
711         )
712         ipfix.add_vpp_config()
713
714         ipfix_decoder = IPFIXDecoder()
715         # template packet should arrive immediately
716         tmpl_l2_field_count = TMPL_COMMON_FIELD_COUNT + TMPL_L2_FIELD_COUNT
717         tmpl_ip_field_count = (
718             TMPL_COMMON_FIELD_COUNT
719             + TMPL_L2_FIELD_COUNT
720             + TMPL_L3_FIELD_COUNT
721             + TMPL_L4_FIELD_COUNT
722         )
723         templates = ipfix.verify_templates(
724             ipfix_decoder,
725             count=3,
726             field_count_in=(tmpl_l2_field_count, tmpl_ip_field_count),
727         )
728
729         # verify IPv4 and IPv6 flows
730         for ip_ver in ("v4", "v6"):
731             self.create_stream(packets=1, ip_ver=ip_ver)
732             capture = self.send_packets()
733
734             # make sure the one packet we expect actually showed up
735             self.vapi.ipfix_flush()
736             cflow = self.wait_for_cflow_packet(
737                 self.collector, templates[1 if ip_ver == "v4" else 2]
738             )
739             src_ip_id = 8 if ip_ver == "v4" else 27
740             dst_ip_id = 12 if ip_ver == "v4" else 28
741             self.verify_cflow_data_detail(
742                 ipfix_decoder,
743                 capture,
744                 cflow,
745                 {
746                     2: "packets",
747                     256: 8 if ip_ver == "v4" else 56710,
748                     4: 17,
749                     7: "sport",
750                     11: "dport",
751                     src_ip_id: "src_ip",
752                     dst_ip_id: "dst_ip",
753                     61: (self.direction == "tx"),
754                 },
755                 ip_ver=ip_ver,
756                 field_count=tmpl_ip_field_count,
757             )
758
759         # verify non-IP flow
760         self.pkts = [
761             (
762                 Ether(dst=self.pg2.local_mac, src=self.pg1.remote_mac)
763                 / SlowProtocol()
764                 / LACP()
765             )
766         ]
767         capture = self.send_packets()
768
769         # make sure the one packet we expect actually showed up
770         self.vapi.ipfix_flush()
771         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
772         self.verify_cflow_data_detail(
773             ipfix_decoder,
774             capture,
775             cflow,
776             {2: "packets", 256: 2440, 61: (self.direction == "tx")},
777             field_count=tmpl_l2_field_count,
778         )
779
780         self.collector.get_capture(6)
781
782         ipfix.remove_vpp_config()
783
784     def test_L4onL2(self):
785         """L4 data on L2 datapath"""
786         self.logger.info("FFP_TEST_START_0003")
787         self.pg_enable_capture(self.pg_interfaces)
788         self.pkts = []
789
790         ipfix = VppCFLOW(
791             test=self, intf=self.intf1, layer="l4", direction=self.direction
792         )
793         ipfix.add_vpp_config()
794
795         ipfix_decoder = IPFIXDecoder()
796         # template packet should arrive immediately
797         templates = ipfix.verify_templates(ipfix_decoder, count=2)
798
799         self.create_stream(packets=1)
800         capture = self.send_packets()
801
802         # make sure the one packet we expect actually showed up
803         self.vapi.ipfix_flush()
804         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
805         self.verify_cflow_data_detail(
806             ipfix_decoder,
807             capture,
808             cflow,
809             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
810         )
811
812         self.collector.get_capture(3)
813
814         ipfix.remove_vpp_config()
815         self.logger.info("FFP_TEST_FINISH_0003")
816
817     def test_templatesIp4(self):
818         """verify templates on IP4 datapath"""
819         self.logger.info("FFP_TEST_START_0000")
820
821         self.pg_enable_capture(self.pg_interfaces)
822
823         ipfix = VppCFLOW(
824             test=self, intf=self.intf1, datapath="ip4", direction=self.direction
825         )
826         ipfix.add_vpp_config()
827
828         # template packet should arrive immediately
829         self.vapi.ipfix_flush()
830         ipfix.verify_templates(timeout=3, count=1)
831         self.collector.get_capture(1)
832
833         ipfix.remove_vpp_config()
834
835         self.logger.info("FFP_TEST_FINISH_0000")
836
837     def test_L2onIP4(self):
838         """L2 data on IP4 datapath"""
839         self.logger.info("FFP_TEST_START_0001")
840         self.pg_enable_capture(self.pg_interfaces)
841         self.pkts = []
842
843         ipfix = VppCFLOW(
844             test=self,
845             intf=self.intf2,
846             layer="l2",
847             datapath="ip4",
848             direction=self.direction,
849         )
850         ipfix.add_vpp_config()
851
852         ipfix_decoder = IPFIXDecoder()
853         # template packet should arrive immediately
854         templates = ipfix.verify_templates(ipfix_decoder, count=1)
855
856         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
857         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
858
859         # make sure the one packet we expect actually showed up
860         self.vapi.ipfix_flush()
861         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
862         self.verify_cflow_data_detail(
863             ipfix_decoder,
864             capture,
865             cflow,
866             {2: "packets", 256: 8, 61: (self.direction == "tx")},
867         )
868
869         # expected two templates and one cflow packet
870         self.collector.get_capture(2)
871
872         ipfix.remove_vpp_config()
873         self.logger.info("FFP_TEST_FINISH_0001")
874
875     def test_L3onIP4(self):
876         """L3 data on IP4 datapath"""
877         self.logger.info("FFP_TEST_START_0002")
878         self.pg_enable_capture(self.pg_interfaces)
879         self.pkts = []
880
881         ipfix = VppCFLOW(
882             test=self,
883             intf=self.intf2,
884             layer="l3",
885             datapath="ip4",
886             direction=self.direction,
887         )
888         ipfix.add_vpp_config()
889
890         ipfix_decoder = IPFIXDecoder()
891         # template packet should arrive immediately
892         templates = ipfix.verify_templates(ipfix_decoder, count=1)
893
894         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
895         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
896
897         # make sure the one packet we expect actually showed up
898         self.vapi.ipfix_flush()
899         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
900         self.verify_cflow_data_detail(
901             ipfix_decoder,
902             capture,
903             cflow,
904             {
905                 1: "octets",
906                 2: "packets",
907                 8: "src_ip",
908                 12: "dst_ip",
909                 61: (self.direction == "tx"),
910             },
911         )
912
913         # expected two templates and one cflow packet
914         self.collector.get_capture(2)
915
916         ipfix.remove_vpp_config()
917         self.logger.info("FFP_TEST_FINISH_0002")
918
919     def test_L4onIP4(self):
920         """L4 data on IP4 datapath"""
921         self.logger.info("FFP_TEST_START_0003")
922         self.pg_enable_capture(self.pg_interfaces)
923         self.pkts = []
924
925         ipfix = VppCFLOW(
926             test=self,
927             intf=self.intf2,
928             layer="l4",
929             datapath="ip4",
930             direction=self.direction,
931         )
932         ipfix.add_vpp_config()
933
934         ipfix_decoder = IPFIXDecoder()
935         # template packet should arrive immediately
936         templates = ipfix.verify_templates(ipfix_decoder, count=1)
937
938         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
939         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
940
941         # make sure the one packet we expect actually showed up
942         self.vapi.ipfix_flush()
943         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
944         self.verify_cflow_data_detail(
945             ipfix_decoder,
946             capture,
947             cflow,
948             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
949         )
950
951         # expected two templates and one cflow packet
952         self.collector.get_capture(2)
953
954         ipfix.remove_vpp_config()
955         self.logger.info("FFP_TEST_FINISH_0003")
956
957     def test_templatesIP6(self):
958         """verify templates on IP6 datapath"""
959         self.logger.info("FFP_TEST_START_0000")
960         self.pg_enable_capture(self.pg_interfaces)
961
962         ipfix = VppCFLOW(
963             test=self, intf=self.intf1, datapath="ip6", direction=self.direction
964         )
965         ipfix.add_vpp_config()
966
967         # template packet should arrive immediately
968         ipfix.verify_templates(count=1)
969         self.collector.get_capture(1)
970
971         ipfix.remove_vpp_config()
972
973         self.logger.info("FFP_TEST_FINISH_0000")
974
975     def test_L2onIP6(self):
976         """L2 data on IP6 datapath"""
977         self.logger.info("FFP_TEST_START_0001")
978         self.pg_enable_capture(self.pg_interfaces)
979         self.pkts = []
980
981         ipfix = VppCFLOW(
982             test=self,
983             intf=self.intf3,
984             layer="l2",
985             datapath="ip6",
986             direction=self.direction,
987         )
988         ipfix.add_vpp_config()
989
990         ipfix_decoder = IPFIXDecoder()
991         # template packet should arrive immediately
992         templates = ipfix.verify_templates(ipfix_decoder, count=1)
993
994         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
995         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
996
997         # make sure the one packet we expect actually showed up
998         self.vapi.ipfix_flush()
999         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1000         self.verify_cflow_data_detail(
1001             ipfix_decoder,
1002             capture,
1003             cflow,
1004             {2: "packets", 256: 56710, 61: (self.direction == "tx")},
1005             ip_ver="v6",
1006         )
1007
1008         # expected two templates and one cflow packet
1009         self.collector.get_capture(2)
1010
1011         ipfix.remove_vpp_config()
1012         self.logger.info("FFP_TEST_FINISH_0001")
1013
1014     def test_L3onIP6(self):
1015         """L3 data on IP6 datapath"""
1016         self.logger.info("FFP_TEST_START_0002")
1017         self.pg_enable_capture(self.pg_interfaces)
1018         self.pkts = []
1019
1020         ipfix = VppCFLOW(
1021             test=self,
1022             intf=self.intf3,
1023             layer="l3",
1024             datapath="ip6",
1025             direction=self.direction,
1026         )
1027         ipfix.add_vpp_config()
1028
1029         ipfix_decoder = IPFIXDecoder()
1030         # template packet should arrive immediately
1031         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1032
1033         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1034         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1035
1036         # make sure the one packet we expect actually showed up
1037         self.vapi.ipfix_flush()
1038         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1039         self.verify_cflow_data_detail(
1040             ipfix_decoder,
1041             capture,
1042             cflow,
1043             {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
1044             ip_ver="v6",
1045         )
1046
1047         # expected two templates and one cflow packet
1048         self.collector.get_capture(2)
1049
1050         ipfix.remove_vpp_config()
1051         self.logger.info("FFP_TEST_FINISH_0002")
1052
1053     def test_L4onIP6(self):
1054         """L4 data on IP6 datapath"""
1055         self.logger.info("FFP_TEST_START_0003")
1056         self.pg_enable_capture(self.pg_interfaces)
1057         self.pkts = []
1058
1059         ipfix = VppCFLOW(
1060             test=self,
1061             intf=self.intf3,
1062             layer="l4",
1063             datapath="ip6",
1064             direction=self.direction,
1065         )
1066         ipfix.add_vpp_config()
1067
1068         ipfix_decoder = IPFIXDecoder()
1069         # template packet should arrive immediately
1070         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1071
1072         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1073         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1074
1075         # make sure the one packet we expect actually showed up
1076         self.vapi.ipfix_flush()
1077         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1078         self.verify_cflow_data_detail(
1079             ipfix_decoder,
1080             capture,
1081             cflow,
1082             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
1083             ip_ver="v6",
1084         )
1085
1086         # expected two templates and one cflow packet
1087         self.collector.get_capture(2)
1088
1089         ipfix.remove_vpp_config()
1090         self.logger.info("FFP_TEST_FINISH_0003")
1091
1092     def test_0001(self):
1093         """no timers, one CFLOW packet, 9 Flows inside"""
1094         self.logger.info("FFP_TEST_START_0001")
1095         self.pg_enable_capture(self.pg_interfaces)
1096         self.pkts = []
1097
1098         ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
1099         ipfix.add_vpp_config()
1100
1101         ipfix_decoder = IPFIXDecoder()
1102         # template packet should arrive immediately
1103         templates = ipfix.verify_templates(ipfix_decoder)
1104
1105         self.create_stream(packets=9)
1106         capture = self.send_packets()
1107
1108         # make sure the one packet we expect actually showed up
1109         self.vapi.ipfix_flush()
1110         cflow = self.wait_for_cflow_packet(self.collector, templates[1])
1111         self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
1112         self.collector.get_capture(4)
1113
1114         ipfix.remove_vpp_config()
1115         self.logger.info("FFP_TEST_FINISH_0001")
1116
1117     def test_0002(self):
1118         """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
1119         self.logger.info("FFP_TEST_START_0002")
1120         self.pg_enable_capture(self.pg_interfaces)
1121         self.pkts = []
1122
1123         ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
1124         ipfix.add_vpp_config()
1125
1126         ipfix_decoder = IPFIXDecoder()
1127         # template packet should arrive immediately
1128         self.vapi.ipfix_flush()
1129         templates = ipfix.verify_templates(ipfix_decoder)
1130
1131         self.create_stream(packets=6)
1132         capture = self.send_packets()
1133
1134         # make sure the one packet we expect actually showed up
1135         cflows = []
1136         self.vapi.ipfix_flush()
1137         cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1138         cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1139         self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
1140         self.collector.get_capture(5)
1141
1142         ipfix.remove_vpp_config()
1143         self.logger.info("FFP_TEST_FINISH_0002")
1144
1145
1146 @tag_fixme_vpp_workers
1147 class DatapathTx(MethodHolder, DatapathTestsHolder):
1148     """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
1149
1150     intf1 = "pg2"
1151     intf2 = "pg4"
1152     intf3 = "pg6"
1153     direction = "tx"
1154
1155
1156 @tag_fixme_vpp_workers
1157 class DatapathRx(MethodHolder, DatapathTestsHolder):
1158     """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
1159
1160     intf1 = "pg1"
1161     intf2 = "pg3"
1162     intf3 = "pg5"
1163     direction = "rx"
1164
1165
1166 @unittest.skipUnless(config.extended, "part of extended tests")
1167 class DisableIPFIX(MethodHolder):
1168     """Disable IPFIX"""
1169
1170     @classmethod
1171     def setUpClass(cls):
1172         super(DisableIPFIX, cls).setUpClass()
1173
1174     @classmethod
1175     def tearDownClass(cls):
1176         super(DisableIPFIX, cls).tearDownClass()
1177
1178     def test_0001(self):
1179         """disable IPFIX after first packets"""
1180         self.logger.info("FFP_TEST_START_0001")
1181         self.pg_enable_capture(self.pg_interfaces)
1182         self.pkts = []
1183
1184         ipfix = VppCFLOW(test=self)
1185         ipfix.add_vpp_config()
1186
1187         ipfix_decoder = IPFIXDecoder()
1188         # template packet should arrive immediately
1189         templates = ipfix.verify_templates(ipfix_decoder)
1190
1191         self.create_stream()
1192         self.send_packets()
1193
1194         # make sure the one packet we expect actually showed up
1195         self.vapi.ipfix_flush()
1196         self.wait_for_cflow_packet(self.collector, templates[1])
1197         self.collector.get_capture(4)
1198
1199         # disable IPFIX
1200         ipfix.disable_exporter()
1201         self.pg_enable_capture([self.collector])
1202
1203         self.send_packets()
1204
1205         # make sure no one packet arrived in 1 minute
1206         self.vapi.ipfix_flush()
1207         self.sleep(1, "wait before verifying no packets sent")
1208         self.collector.assert_nothing_captured()
1209
1210         ipfix.remove_vpp_config()
1211         self.logger.info("FFP_TEST_FINISH_0001")
1212
1213
1214 @unittest.skipUnless(config.extended, "part of extended tests")
1215 class ReenableIPFIX(MethodHolder):
1216     """Re-enable IPFIX"""
1217
1218     @classmethod
1219     def setUpClass(cls):
1220         super(ReenableIPFIX, cls).setUpClass()
1221
1222     @classmethod
1223     def tearDownClass(cls):
1224         super(ReenableIPFIX, cls).tearDownClass()
1225
1226     def test_0011(self):
1227         """disable IPFIX after first packets and re-enable after few packets"""
1228         self.logger.info("FFP_TEST_START_0001")
1229         self.pg_enable_capture(self.pg_interfaces)
1230         self.pkts = []
1231
1232         ipfix = VppCFLOW(test=self)
1233         ipfix.add_vpp_config()
1234
1235         ipfix_decoder = IPFIXDecoder()
1236         # template packet should arrive immediately
1237         templates = ipfix.verify_templates(ipfix_decoder)
1238
1239         self.create_stream(packets=5)
1240         self.send_packets()
1241
1242         # make sure the one packet we expect actually showed up
1243         self.vapi.ipfix_flush()
1244         self.wait_for_cflow_packet(self.collector, templates[1])
1245         self.collector.get_capture(4)
1246
1247         # disable IPFIX
1248         ipfix.disable_exporter()
1249         self.vapi.ipfix_flush()
1250         self.pg_enable_capture([self.collector])
1251
1252         self.send_packets()
1253
1254         # make sure no one packet arrived in active timer span
1255         self.vapi.ipfix_flush()
1256         self.sleep(1, "wait before verifying no packets sent")
1257         self.collector.assert_nothing_captured()
1258         self.pg2.get_capture(5)
1259
1260         # enable IPFIX
1261         ipfix.enable_exporter()
1262
1263         capture = self.collector.get_capture(4)
1264         nr_templates = 0
1265         nr_data = 0
1266         for p in capture:
1267             self.assertTrue(p.haslayer(IPFIX))
1268             if p.haslayer(Template):
1269                 nr_templates += 1
1270         self.assertTrue(nr_templates, 3)
1271         for p in capture:
1272             self.assertTrue(p.haslayer(IPFIX))
1273             if p.haslayer(Data):
1274                 nr_data += 1
1275         self.assertTrue(nr_templates, 1)
1276
1277         ipfix.remove_vpp_config()
1278         self.logger.info("FFP_TEST_FINISH_0001")
1279
1280
1281 @unittest.skipUnless(config.extended, "part of extended tests")
1282 class DisableFP(MethodHolder):
1283     """Disable Flowprobe feature"""
1284
1285     @classmethod
1286     def setUpClass(cls):
1287         super(DisableFP, cls).setUpClass()
1288
1289     @classmethod
1290     def tearDownClass(cls):
1291         super(DisableFP, cls).tearDownClass()
1292
1293     def test_0001(self):
1294         """disable flowprobe feature after first packets"""
1295         self.logger.info("FFP_TEST_START_0001")
1296         self.pg_enable_capture(self.pg_interfaces)
1297         self.pkts = []
1298         ipfix = VppCFLOW(test=self)
1299         ipfix.add_vpp_config()
1300
1301         ipfix_decoder = IPFIXDecoder()
1302         # template packet should arrive immediately
1303         templates = ipfix.verify_templates(ipfix_decoder)
1304
1305         self.create_stream()
1306         self.send_packets()
1307
1308         # make sure the one packet we expect actually showed up
1309         self.vapi.ipfix_flush()
1310         self.wait_for_cflow_packet(self.collector, templates[1])
1311         self.collector.get_capture(4)
1312
1313         # disable IPFIX
1314         ipfix.disable_flowprobe_feature()
1315         self.pg_enable_capture([self.collector])
1316
1317         self.send_packets()
1318
1319         # make sure no one packet arrived in active timer span
1320         self.vapi.ipfix_flush()
1321         self.sleep(1, "wait before verifying no packets sent")
1322         self.collector.assert_nothing_captured()
1323
1324         ipfix.remove_vpp_config()
1325         self.logger.info("FFP_TEST_FINISH_0001")
1326
1327     def test_no_leftover_flows_after_disabling(self):
1328         """disable flowprobe feature and expect no leftover flows"""
1329         self.pg_enable_capture(self.pg_interfaces)
1330         self.pkts = []
1331
1332         # enable ip4 datapath for an interface
1333         # set active and passive timers
1334         ipfix = VppCFLOW(
1335             test=self,
1336             active=3,
1337             passive=4,
1338             intf="pg3",
1339             layer="l3",
1340             datapath="ip4",
1341             direction="rx",
1342             mtu=100,
1343         )
1344         ipfix.add_vpp_config()
1345
1346         # template packet should arrive immediately
1347         ipfix.verify_templates(count=1)
1348
1349         # send some ip4 packets
1350         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5)
1351         self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1352
1353         # disable feature for the interface
1354         # currently stored ip4 flows should be removed
1355         ipfix.disable_flowprobe_feature()
1356
1357         # no leftover ip4 flows are expected
1358         self.pg_enable_capture([self.collector])
1359         self.sleep(12, "wait for leftover ip4 flows during three passive intervals")
1360         self.collector.assert_nothing_captured()
1361
1362         # cleanup
1363         ipfix.disable_exporter()
1364
1365
1366 @unittest.skipUnless(config.extended, "part of extended tests")
1367 class ReenableFP(MethodHolder):
1368     """Re-enable Flowprobe feature"""
1369
1370     @classmethod
1371     def setUpClass(cls):
1372         super(ReenableFP, cls).setUpClass()
1373
1374     @classmethod
1375     def tearDownClass(cls):
1376         super(ReenableFP, cls).tearDownClass()
1377
1378     def test_0001(self):
1379         """disable flowprobe feature after first packets and re-enable
1380         after few packets"""
1381         self.logger.info("FFP_TEST_START_0001")
1382         self.pg_enable_capture(self.pg_interfaces)
1383         self.pkts = []
1384
1385         ipfix = VppCFLOW(test=self)
1386         ipfix.add_vpp_config()
1387
1388         ipfix_decoder = IPFIXDecoder()
1389         # template packet should arrive immediately
1390         self.vapi.ipfix_flush()
1391         templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1392
1393         self.create_stream()
1394         self.send_packets()
1395
1396         # make sure the one packet we expect actually showed up
1397         self.vapi.ipfix_flush()
1398         self.wait_for_cflow_packet(self.collector, templates[1], 5)
1399         self.collector.get_capture(4)
1400
1401         # disable FPP feature
1402         ipfix.disable_flowprobe_feature()
1403         self.pg_enable_capture([self.collector])
1404
1405         self.send_packets()
1406
1407         # make sure no one packet arrived in active timer span
1408         self.vapi.ipfix_flush()
1409         self.sleep(5, "wait before verifying no packets sent")
1410         self.collector.assert_nothing_captured()
1411
1412         # enable FPP feature
1413         ipfix.enable_flowprobe_feature()
1414         self.vapi.ipfix_flush()
1415         templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1416
1417         self.send_packets()
1418
1419         # make sure the next packets (templates and data) we expect actually
1420         # showed up
1421         self.vapi.ipfix_flush()
1422         self.wait_for_cflow_packet(self.collector, templates[1], 5)
1423         self.collector.get_capture(4)
1424
1425         ipfix.remove_vpp_config()
1426         self.logger.info("FFP_TEST_FINISH_0001")
1427
1428
1429 if __name__ == "__main__":
1430     unittest.main(testRunner=VppTestRunner)