flowprobe: fix accumulation of tcp flags in flow entries
[vpp.git] / test / test_flowprobe.py
1 #!/usr/bin/env python3
2 from __future__ import print_function
3 import binascii
4 import random
5 import socket
6 import unittest
7 import time
8 import re
9
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, TCP, UDP
13 from scapy.layers.inet6 import IPv6
14 from scapy.contrib.lacp import SlowProtocol, LACP
15
16 from config import config
17 from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11
18 from framework import is_distro_ubuntu2204, is_distro_debian11
19 from framework import VppTestCase, VppTestRunner
20 from framework import tag_run_solo
21 from vpp_object import VppObject
22 from vpp_pg_interface import CaptureTimeoutError
23 from util import ppp
24 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
25 from vpp_ip_route import VppIpRoute, VppRoutePath
26 from vpp_papi.macaddress import mac_ntop
27 from socket import inet_ntop
28 from vpp_papi import VppEnum
29
30
31 TMPL_COMMON_FIELD_COUNT = 6
32 TMPL_L2_FIELD_COUNT = 3
33 TMPL_L3_FIELD_COUNT = 4
34 TMPL_L4_FIELD_COUNT = 3
35
36 IPFIX_TCP_FLAGS_ID = 6
37 IPFIX_SRC_TRANS_PORT_ID = 7
38 IPFIX_DST_TRANS_PORT_ID = 11
39
40 TCP_F_FIN = 0x01
41 TCP_F_SYN = 0x02
42 TCP_F_RST = 0x04
43 TCP_F_PSH = 0x08
44 TCP_F_ACK = 0x10
45 TCP_F_URG = 0x20
46 TCP_F_ECE = 0x40
47 TCP_F_CWR = 0x80
48
49
50 class VppCFLOW(VppObject):
51     """CFLOW object for IPFIX exporter and Flowprobe feature"""
52
53     def __init__(
54         self,
55         test,
56         intf="pg2",
57         active=0,
58         passive=0,
59         timeout=100,
60         mtu=1024,
61         datapath="l2",
62         layer="l2 l3 l4",
63         direction="tx",
64     ):
65         self._test = test
66         self._intf = intf
67         self._intf_obj = getattr(self._test, intf)
68         self._active = active
69         if passive == 0 or passive < active:
70             self._passive = active + 1
71         else:
72             self._passive = passive
73         self._datapath = datapath  # l2 ip4 ip6
74         self._collect = layer  # l2 l3 l4
75         self._direction = direction  # rx tx both
76         self._timeout = timeout
77         self._mtu = mtu
78         self._configured = False
79
80     def add_vpp_config(self):
81         self.enable_exporter()
82         l2_flag = 0
83         l3_flag = 0
84         l4_flag = 0
85         if "l2" in self._collect.lower():
86             l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
87         if "l3" in self._collect.lower():
88             l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
89         if "l4" in self._collect.lower():
90             l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
91         self._test.vapi.flowprobe_set_params(
92             record_flags=(l2_flag | l3_flag | l4_flag),
93             active_timer=self._active,
94             passive_timer=self._passive,
95         )
96         self.enable_flowprobe_feature()
97         self._test.vapi.cli("ipfix flush")
98         self._configured = True
99
100     def remove_vpp_config(self):
101         self.disable_exporter()
102         self.disable_flowprobe_feature()
103         self._test.vapi.cli("ipfix flush")
104         self._configured = False
105
106     def enable_exporter(self):
107         self._test.vapi.set_ipfix_exporter(
108             collector_address=self._test.pg0.remote_ip4,
109             src_address=self._test.pg0.local_ip4,
110             path_mtu=self._mtu,
111             template_interval=self._timeout,
112         )
113
114     def _enable_disable_flowprobe_feature(self, is_add):
115         which_map = {
116             "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
117             "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
118             "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
119         }
120         direction_map = {
121             "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
122             "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
123             "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
124         }
125         self._test.vapi.flowprobe_interface_add_del(
126             is_add=is_add,
127             which=which_map[self._datapath],
128             direction=direction_map[self._direction],
129             sw_if_index=self._intf_obj.sw_if_index,
130         )
131
132     def enable_flowprobe_feature(self):
133         self._enable_disable_flowprobe_feature(is_add=True)
134
135     def disable_exporter(self):
136         self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
137
138     def disable_flowprobe_feature(self):
139         self._enable_disable_flowprobe_feature(is_add=False)
140
141     def object_id(self):
142         return "ipfix-collector-%s-%s" % (self._src, self.dst)
143
144     def query_vpp_config(self):
145         return self._configured
146
147     def verify_templates(self, decoder=None, timeout=1, count=3, field_count_in=None):
148         templates = []
149         self._test.assertIn(count, (1, 2, 3))
150         for _ in range(count):
151             p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
152             self._test.assertTrue(p.haslayer(IPFIX))
153             self._test.assertTrue(p.haslayer(Template))
154             if decoder is not None:
155                 templates.append(p[Template].templateID)
156                 decoder.add_template(p.getlayer(Template))
157             if field_count_in is not None:
158                 self._test.assertIn(p[Template].fieldCount, field_count_in)
159         return templates
160
161
162 class MethodHolder(VppTestCase):
163     """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
164
165     # Test variables
166     debug_print = False
167     max_number_of_packets = 10
168     pkts = []
169
170     @classmethod
171     def setUpClass(cls):
172         """
173         Perform standard class setup (defined by class method setUpClass in
174         class VppTestCase) before running the test case, set test case related
175         variables and configure VPP.
176         """
177         super(MethodHolder, cls).setUpClass()
178         if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
179             cls, "vpp"
180         ):
181             return
182         try:
183             # Create pg interfaces
184             cls.create_pg_interfaces(range(9))
185
186             # Packet sizes
187             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
188
189             # Create BD with MAC learning and unknown unicast flooding disabled
190             # and put interfaces to this BD
191             cls.vapi.bridge_domain_add_del_v2(
192                 bd_id=1, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
193             )
194             cls.vapi.sw_interface_set_l2_bridge(
195                 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
196             )
197             cls.vapi.sw_interface_set_l2_bridge(
198                 rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
199             )
200
201             # Set up all interfaces
202             for i in cls.pg_interfaces:
203                 i.admin_up()
204
205             cls.pg0.config_ip4()
206             cls.pg0.configure_ipv4_neighbors()
207             cls.collector = cls.pg0
208
209             cls.pg1.config_ip4()
210             cls.pg1.resolve_arp()
211             cls.pg2.config_ip4()
212             cls.pg2.resolve_arp()
213             cls.pg3.config_ip4()
214             cls.pg3.resolve_arp()
215             cls.pg4.config_ip4()
216             cls.pg4.resolve_arp()
217             cls.pg7.config_ip4()
218             cls.pg8.config_ip4()
219             cls.pg8.configure_ipv4_neighbors()
220
221             cls.pg5.config_ip6()
222             cls.pg5.resolve_ndp()
223             cls.pg5.disable_ipv6_ra()
224             cls.pg6.config_ip6()
225             cls.pg6.resolve_ndp()
226             cls.pg6.disable_ipv6_ra()
227         except Exception:
228             super(MethodHolder, cls).tearDownClass()
229             raise
230
231     @classmethod
232     def tearDownClass(cls):
233         super(MethodHolder, cls).tearDownClass()
234
235     def create_stream(
236         self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
237     ):
238         """Create a packet stream to tickle the plugin
239
240         :param VppInterface src_if: Source interface for packet stream
241         :param VppInterface src_if: Dst interface for packet stream
242         """
243         if src_if is None:
244             src_if = self.pg1
245         if dst_if is None:
246             dst_if = self.pg2
247         self.pkts = []
248         if packets is None:
249             packets = random.randint(1, self.max_number_of_packets)
250         pkt_size = size
251         for p in range(0, packets):
252             if size is None:
253                 pkt_size = random.choice(self.pg_if_packet_sizes)
254             info = self.create_packet_info(src_if, dst_if)
255             payload = self.info_to_payload(info)
256             p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
257             if ip_ver == "v4":
258                 p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
259             else:
260                 p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
261             p /= UDP(sport=1234, dport=4321)
262             p /= Raw(payload)
263             info.data = p.copy()
264             self.extend_packet(p, pkt_size)
265             self.pkts.append(p)
266
267     def verify_cflow_data(self, decoder, capture, cflow):
268         octets = 0
269         packets = 0
270         for p in capture:
271             octets += p[IP].len
272             packets += 1
273         if cflow.haslayer(Data):
274             data = decoder.decode_data_set(cflow.getlayer(Set))
275             for record in data:
276                 self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
277                 self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
278
279     def send_packets(self, src_if=None, dst_if=None):
280         if src_if is None:
281             src_if = self.pg1
282         if dst_if is None:
283             dst_if = self.pg2
284         self.pg_enable_capture([dst_if])
285         src_if.add_stream(self.pkts)
286         self.pg_start()
287         return dst_if.get_capture(len(self.pkts))
288
289     def verify_cflow_data_detail(
290         self,
291         decoder,
292         capture,
293         cflow,
294         data_set={1: "octets", 2: "packets"},
295         ip_ver="v4",
296         field_count=None,
297     ):
298         if self.debug_print:
299             print(capture[0].show())
300         if cflow.haslayer(Data):
301             data = decoder.decode_data_set(cflow.getlayer(Set))
302             if self.debug_print:
303                 print(data)
304             if ip_ver == "v4":
305                 ip_layer = capture[0][IP] if capture[0].haslayer(IP) else None
306             else:
307                 ip_layer = capture[0][IPv6] if capture[0].haslayer(IPv6) else None
308             if data_set is not None:
309                 for record in data:
310                     # skip flow if ingress/egress interface is 0
311                     if int(binascii.hexlify(record[10]), 16) == 0:
312                         continue
313                     if int(binascii.hexlify(record[14]), 16) == 0:
314                         continue
315
316                     for field in data_set:
317                         value = data_set[field]
318                         if value == "octets":
319                             value = ip_layer.len
320                             if ip_ver == "v6":
321                                 value += 40  # ??? is this correct
322                         elif value == "packets":
323                             value = 1
324                         elif value == "src_ip":
325                             if ip_ver == "v4":
326                                 ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
327                             else:
328                                 ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
329                             value = int(binascii.hexlify(ip), 16)
330                         elif value == "dst_ip":
331                             if ip_ver == "v4":
332                                 ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
333                             else:
334                                 ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
335                             value = int(binascii.hexlify(ip), 16)
336                         elif value == "sport":
337                             value = int(capture[0][UDP].sport)
338                         elif value == "dport":
339                             value = int(capture[0][UDP].dport)
340                         self.assertEqual(
341                             int(binascii.hexlify(record[field]), 16), value
342                         )
343             if field_count is not None:
344                 for record in data:
345                     self.assertEqual(len(record), field_count)
346
347     def verify_cflow_data_notimer(self, decoder, capture, cflows):
348         idx = 0
349         for cflow in cflows:
350             if cflow.haslayer(Data):
351                 data = decoder.decode_data_set(cflow.getlayer(Set))
352             else:
353                 raise Exception("No CFLOW data")
354
355             for rec in data:
356                 p = capture[idx]
357                 idx += 1
358                 self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
359                 self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
360         self.assertEqual(len(capture), idx)
361
362     def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
363         """wait for CFLOW packet and verify its correctness
364
365         :param timeout: how long to wait
366
367         """
368         self.logger.info("IPFIX: Waiting for CFLOW packet")
369         # self.logger.debug(self.vapi.ppcli("show flow table"))
370         p = collector_intf.wait_for_packet(timeout=timeout)
371         self.assertEqual(p[Set].setID, set_id)
372         # self.logger.debug(self.vapi.ppcli("show flow table"))
373         self.logger.debug(ppp("IPFIX: Got packet:", p))
374         return p
375
376
377 @tag_run_solo
378 @tag_fixme_vpp_workers
379 @tag_fixme_ubuntu2204
380 @tag_fixme_debian11
381 class Flowprobe(MethodHolder):
382     """Template verification, timer tests"""
383
384     @classmethod
385     def setUpClass(cls):
386         super(Flowprobe, cls).setUpClass()
387
388     @classmethod
389     def tearDownClass(cls):
390         super(Flowprobe, cls).tearDownClass()
391
392     def test_0001(self):
393         """timer less than template timeout"""
394         self.logger.info("FFP_TEST_START_0001")
395         self.pg_enable_capture(self.pg_interfaces)
396         self.pkts = []
397
398         ipfix = VppCFLOW(test=self, active=2)
399         ipfix.add_vpp_config()
400
401         ipfix_decoder = IPFIXDecoder()
402         # template packet should arrive immediately
403         templates = ipfix.verify_templates(ipfix_decoder)
404
405         self.create_stream(packets=1)
406         self.send_packets()
407         capture = self.pg2.get_capture(1)
408
409         # make sure the one packet we expect actually showed up
410         cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
411         self.verify_cflow_data(ipfix_decoder, capture, cflow)
412
413         ipfix.remove_vpp_config()
414         self.logger.info("FFP_TEST_FINISH_0001")
415
416     def test_0002(self):
417         """timer greater than template timeout"""
418         self.logger.info("FFP_TEST_START_0002")
419         self.pg_enable_capture(self.pg_interfaces)
420         self.pkts = []
421
422         ipfix = VppCFLOW(test=self, timeout=3, active=4)
423         ipfix.add_vpp_config()
424
425         ipfix_decoder = IPFIXDecoder()
426         # template packet should arrive immediately
427         ipfix.verify_templates()
428
429         self.create_stream(packets=2)
430         self.send_packets()
431         capture = self.pg2.get_capture(2)
432
433         # next set of template packet should arrive after 20 seconds
434         # template packet should arrive within 20 s
435         templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
436
437         # make sure the one packet we expect actually showed up
438         cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
439         self.verify_cflow_data(ipfix_decoder, capture, cflow)
440
441         ipfix.remove_vpp_config()
442         self.logger.info("FFP_TEST_FINISH_0002")
443
444     def test_cflow_packet(self):
445         """verify cflow packet fields"""
446         self.logger.info("FFP_TEST_START_0000")
447         self.pg_enable_capture(self.pg_interfaces)
448         self.pkts = []
449
450         ipfix = VppCFLOW(
451             test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
452         )
453         ipfix.add_vpp_config()
454
455         route_9001 = VppIpRoute(
456             self,
457             "9.0.0.0",
458             24,
459             [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
460         )
461         route_9001.add_vpp_config()
462
463         ipfix_decoder = IPFIXDecoder()
464         templates = ipfix.verify_templates(ipfix_decoder, count=1)
465
466         self.pkts = [
467             (
468                 Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
469                 / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
470                 / TCP(sport=1234, dport=4321, flags=80)
471                 / Raw(b"\xa5" * 100)
472             )
473         ]
474
475         nowUTC = int(time.time())
476         nowUNIX = nowUTC + 2208988800
477         self.send_packets(src_if=self.pg7, dst_if=self.pg8)
478
479         cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
480         self.collector.get_capture(2)
481
482         if cflow[0].haslayer(IPFIX):
483             self.assertEqual(cflow[IPFIX].version, 10)
484             self.assertEqual(cflow[IPFIX].observationDomainID, 1)
485             self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
486             self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
487         if cflow.haslayer(Data):
488             record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
489             # ingress interface
490             self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
491             # egress interface
492             self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
493             # direction
494             self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
495             # packets
496             self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
497             # src mac
498             self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
499             # dst mac
500             self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
501             flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
502             # flow start timestamp
503             self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
504             flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
505             # flow end timestamp
506             self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
507             # ethernet type
508             self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
509             # src ip
510             self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
511             # dst ip
512             self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
513             # protocol (TCP)
514             self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
515             # src port
516             self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
517             # dst port
518             self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
519             # tcp flags
520             self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
521
522         ipfix.remove_vpp_config()
523         self.logger.info("FFP_TEST_FINISH_0000")
524
525     def test_flow_entry_reuse(self):
526         """Verify flow entry reuse doesn't accumulate meta info"""
527         self.pg_enable_capture(self.pg_interfaces)
528         self.pkts = []
529
530         # enable ip4 datapath for an interface
531         # set active and passive timers
532         ipfix = VppCFLOW(
533             test=self,
534             active=2,
535             passive=3,
536             intf="pg3",
537             layer="l3 l4",
538             datapath="ip4",
539             direction="rx",
540             mtu=100,
541         )
542         ipfix.add_vpp_config()
543
544         # template packet should arrive immediately
545         ipfix_decoder = IPFIXDecoder()
546         templates = ipfix.verify_templates(ipfix_decoder, count=1)
547
548         # make a tcp packet
549         self.pkts = [
550             (
551                 Ether(src=self.pg3.remote_mac, dst=self.pg4.local_mac)
552                 / IP(src=self.pg3.remote_ip4, dst=self.pg4.remote_ip4)
553                 / TCP(sport=1234, dport=4321)
554                 / Raw(b"\xa5" * 50)
555             )
556         ]
557
558         # send the tcp packet two times, each time with new set of flags
559         tcp_flags = (
560             TCP_F_SYN | TCP_F_ACK,
561             TCP_F_RST | TCP_F_PSH,
562         )
563         for f in tcp_flags:
564             self.pkts[0][TCP].flags = f
565             capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
566
567             # verify meta info - packet/octet delta and tcp flags
568             cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=6)
569             self.verify_cflow_data(ipfix_decoder, capture, cflow)
570             self.verify_cflow_data_detail(
571                 ipfix_decoder,
572                 capture,
573                 cflow,
574                 {
575                     IPFIX_TCP_FLAGS_ID: f,
576                     IPFIX_SRC_TRANS_PORT_ID: 1234,
577                     IPFIX_DST_TRANS_PORT_ID: 4321,
578                 },
579             )
580
581         self.collector.get_capture(3)
582
583         # cleanup
584         ipfix.remove_vpp_config()
585
586     def test_interface_dump(self):
587         """Dump interfaces with IPFIX flow record generation enabled"""
588         self.logger.info("FFP_TEST_START_0003")
589
590         # Enable feature for 3 interfaces
591         ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx")
592         ipfix1.add_vpp_config()
593
594         ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx")
595         ipfix2.enable_flowprobe_feature()
596
597         ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both")
598         ipfix3.enable_flowprobe_feature()
599
600         # When request "all", dump should contain all enabled interfaces
601         dump = self.vapi.flowprobe_interface_dump()
602         self.assertEqual(len(dump), 3)
603
604         # Verify 1st interface
605         self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index)
606         self.assertEqual(
607             dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2
608         )
609         self.assertEqual(
610             dump[0].direction,
611             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
612         )
613
614         # Verify 2nd interface
615         self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index)
616         self.assertEqual(
617             dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
618         )
619         self.assertEqual(
620             dump[1].direction,
621             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
622         )
623
624         # Verify 3rd interface
625         self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index)
626         self.assertEqual(
627             dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6
628         )
629         self.assertEqual(
630             dump[2].direction,
631             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
632         )
633
634         # When request 2nd interface, dump should contain only the specified interface
635         dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index)
636         self.assertEqual(len(dump), 1)
637
638         # Verify 2nd interface
639         self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index)
640         self.assertEqual(
641             dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
642         )
643         self.assertEqual(
644             dump[0].direction,
645             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
646         )
647
648         # When request 99th interface, dump should be empty
649         dump = self.vapi.flowprobe_interface_dump(sw_if_index=99)
650         self.assertEqual(len(dump), 0)
651
652         ipfix1.remove_vpp_config()
653         ipfix2.remove_vpp_config()
654         ipfix3.remove_vpp_config()
655         self.logger.info("FFP_TEST_FINISH_0003")
656
657     def test_get_params(self):
658         """Get IPFIX flow record generation parameters"""
659         self.logger.info("FFP_TEST_START_0004")
660
661         # Enable feature for an interface with custom parameters
662         ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4")
663         ipfix.add_vpp_config()
664
665         # Get and verify parameters
666         params = self.vapi.flowprobe_get_params()
667         self.assertEqual(params.active_timer, 20)
668         self.assertEqual(params.passive_timer, 40)
669         record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
670         record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
671         record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
672         self.assertEqual(params.record_flags, record_flags)
673
674         ipfix.remove_vpp_config()
675         self.logger.info("FFP_TEST_FINISH_0004")
676
677
678 class DatapathTestsHolder(object):
679     """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
680
681     @classmethod
682     def setUpClass(cls):
683         super(DatapathTestsHolder, cls).setUpClass()
684
685     @classmethod
686     def tearDownClass(cls):
687         super(DatapathTestsHolder, cls).tearDownClass()
688
689     def test_templatesL2(self):
690         """verify template on L2 datapath"""
691         self.logger.info("FFP_TEST_START_0000")
692         self.pg_enable_capture(self.pg_interfaces)
693
694         ipfix = VppCFLOW(
695             test=self, intf=self.intf1, layer="l2", direction=self.direction
696         )
697         ipfix.add_vpp_config()
698
699         # template packet should arrive immediately
700         self.vapi.ipfix_flush()
701         ipfix.verify_templates(timeout=3, count=1)
702         self.collector.get_capture(1)
703
704         ipfix.remove_vpp_config()
705         self.logger.info("FFP_TEST_FINISH_0000")
706
707     def test_L2onL2(self):
708         """L2 data on L2 datapath"""
709         self.logger.info("FFP_TEST_START_0001")
710         self.pg_enable_capture(self.pg_interfaces)
711         self.pkts = []
712
713         ipfix = VppCFLOW(
714             test=self, intf=self.intf1, layer="l2", direction=self.direction
715         )
716         ipfix.add_vpp_config()
717
718         ipfix_decoder = IPFIXDecoder()
719         # template packet should arrive immediately
720         templates = ipfix.verify_templates(ipfix_decoder, count=1)
721
722         self.create_stream(packets=1)
723         capture = self.send_packets()
724
725         # make sure the one packet we expect actually showed up
726         self.vapi.ipfix_flush()
727         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
728         self.verify_cflow_data_detail(
729             ipfix_decoder,
730             capture,
731             cflow,
732             {2: "packets", 256: 8, 61: (self.direction == "tx")},
733         )
734         self.collector.get_capture(2)
735
736         ipfix.remove_vpp_config()
737         self.logger.info("FFP_TEST_FINISH_0001")
738
739     def test_L3onL2(self):
740         """L3 data on L2 datapath"""
741         self.logger.info("FFP_TEST_START_0002")
742         self.pg_enable_capture(self.pg_interfaces)
743         self.pkts = []
744
745         ipfix = VppCFLOW(
746             test=self, intf=self.intf1, layer="l3", direction=self.direction
747         )
748         ipfix.add_vpp_config()
749
750         ipfix_decoder = IPFIXDecoder()
751         # template packet should arrive immediately
752         templates = ipfix.verify_templates(ipfix_decoder, count=2)
753
754         self.create_stream(packets=1)
755         capture = self.send_packets()
756
757         # make sure the one packet we expect actually showed up
758         self.vapi.ipfix_flush()
759         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
760         self.verify_cflow_data_detail(
761             ipfix_decoder,
762             capture,
763             cflow,
764             {
765                 2: "packets",
766                 4: 17,
767                 8: "src_ip",
768                 12: "dst_ip",
769                 61: (self.direction == "tx"),
770             },
771         )
772
773         self.collector.get_capture(3)
774
775         ipfix.remove_vpp_config()
776         self.logger.info("FFP_TEST_FINISH_0002")
777
778     def test_L234onL2(self):
779         """L2/3/4 data on L2 datapath"""
780         self.pg_enable_capture(self.pg_interfaces)
781         self.pkts = []
782
783         ipfix = VppCFLOW(
784             test=self, intf=self.intf1, layer="l2 l3 l4", direction=self.direction
785         )
786         ipfix.add_vpp_config()
787
788         ipfix_decoder = IPFIXDecoder()
789         # template packet should arrive immediately
790         tmpl_l2_field_count = TMPL_COMMON_FIELD_COUNT + TMPL_L2_FIELD_COUNT
791         tmpl_ip_field_count = (
792             TMPL_COMMON_FIELD_COUNT
793             + TMPL_L2_FIELD_COUNT
794             + TMPL_L3_FIELD_COUNT
795             + TMPL_L4_FIELD_COUNT
796         )
797         templates = ipfix.verify_templates(
798             ipfix_decoder,
799             count=3,
800             field_count_in=(tmpl_l2_field_count, tmpl_ip_field_count),
801         )
802
803         # verify IPv4 and IPv6 flows
804         for ip_ver in ("v4", "v6"):
805             self.create_stream(packets=1, ip_ver=ip_ver)
806             capture = self.send_packets()
807
808             # make sure the one packet we expect actually showed up
809             self.vapi.ipfix_flush()
810             cflow = self.wait_for_cflow_packet(
811                 self.collector, templates[1 if ip_ver == "v4" else 2]
812             )
813             src_ip_id = 8 if ip_ver == "v4" else 27
814             dst_ip_id = 12 if ip_ver == "v4" else 28
815             self.verify_cflow_data_detail(
816                 ipfix_decoder,
817                 capture,
818                 cflow,
819                 {
820                     2: "packets",
821                     256: 8 if ip_ver == "v4" else 56710,
822                     4: 17,
823                     7: "sport",
824                     11: "dport",
825                     src_ip_id: "src_ip",
826                     dst_ip_id: "dst_ip",
827                     61: (self.direction == "tx"),
828                 },
829                 ip_ver=ip_ver,
830                 field_count=tmpl_ip_field_count,
831             )
832
833         # verify non-IP flow
834         self.pkts = [
835             (
836                 Ether(dst=self.pg2.local_mac, src=self.pg1.remote_mac)
837                 / SlowProtocol()
838                 / LACP()
839             )
840         ]
841         capture = self.send_packets()
842
843         # make sure the one packet we expect actually showed up
844         self.vapi.ipfix_flush()
845         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
846         self.verify_cflow_data_detail(
847             ipfix_decoder,
848             capture,
849             cflow,
850             {2: "packets", 256: 2440, 61: (self.direction == "tx")},
851             field_count=tmpl_l2_field_count,
852         )
853
854         self.collector.get_capture(6)
855
856         ipfix.remove_vpp_config()
857
858     def test_L4onL2(self):
859         """L4 data on L2 datapath"""
860         self.logger.info("FFP_TEST_START_0003")
861         self.pg_enable_capture(self.pg_interfaces)
862         self.pkts = []
863
864         ipfix = VppCFLOW(
865             test=self, intf=self.intf1, layer="l4", direction=self.direction
866         )
867         ipfix.add_vpp_config()
868
869         ipfix_decoder = IPFIXDecoder()
870         # template packet should arrive immediately
871         templates = ipfix.verify_templates(ipfix_decoder, count=2)
872
873         self.create_stream(packets=1)
874         capture = self.send_packets()
875
876         # make sure the one packet we expect actually showed up
877         self.vapi.ipfix_flush()
878         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
879         self.verify_cflow_data_detail(
880             ipfix_decoder,
881             capture,
882             cflow,
883             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
884         )
885
886         self.collector.get_capture(3)
887
888         ipfix.remove_vpp_config()
889         self.logger.info("FFP_TEST_FINISH_0003")
890
891     def test_templatesIp4(self):
892         """verify templates on IP4 datapath"""
893         self.logger.info("FFP_TEST_START_0000")
894
895         self.pg_enable_capture(self.pg_interfaces)
896
897         ipfix = VppCFLOW(
898             test=self, intf=self.intf1, datapath="ip4", direction=self.direction
899         )
900         ipfix.add_vpp_config()
901
902         # template packet should arrive immediately
903         self.vapi.ipfix_flush()
904         ipfix.verify_templates(timeout=3, count=1)
905         self.collector.get_capture(1)
906
907         ipfix.remove_vpp_config()
908
909         self.logger.info("FFP_TEST_FINISH_0000")
910
911     def test_L2onIP4(self):
912         """L2 data on IP4 datapath"""
913         self.logger.info("FFP_TEST_START_0001")
914         self.pg_enable_capture(self.pg_interfaces)
915         self.pkts = []
916
917         ipfix = VppCFLOW(
918             test=self,
919             intf=self.intf2,
920             layer="l2",
921             datapath="ip4",
922             direction=self.direction,
923         )
924         ipfix.add_vpp_config()
925
926         ipfix_decoder = IPFIXDecoder()
927         # template packet should arrive immediately
928         templates = ipfix.verify_templates(ipfix_decoder, count=1)
929
930         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
931         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
932
933         # make sure the one packet we expect actually showed up
934         self.vapi.ipfix_flush()
935         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
936         self.verify_cflow_data_detail(
937             ipfix_decoder,
938             capture,
939             cflow,
940             {2: "packets", 256: 8, 61: (self.direction == "tx")},
941         )
942
943         # expected two templates and one cflow packet
944         self.collector.get_capture(2)
945
946         ipfix.remove_vpp_config()
947         self.logger.info("FFP_TEST_FINISH_0001")
948
949     def test_L3onIP4(self):
950         """L3 data on IP4 datapath"""
951         self.logger.info("FFP_TEST_START_0002")
952         self.pg_enable_capture(self.pg_interfaces)
953         self.pkts = []
954
955         ipfix = VppCFLOW(
956             test=self,
957             intf=self.intf2,
958             layer="l3",
959             datapath="ip4",
960             direction=self.direction,
961         )
962         ipfix.add_vpp_config()
963
964         ipfix_decoder = IPFIXDecoder()
965         # template packet should arrive immediately
966         templates = ipfix.verify_templates(ipfix_decoder, count=1)
967
968         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
969         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
970
971         # make sure the one packet we expect actually showed up
972         self.vapi.ipfix_flush()
973         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
974         self.verify_cflow_data_detail(
975             ipfix_decoder,
976             capture,
977             cflow,
978             {
979                 1: "octets",
980                 2: "packets",
981                 8: "src_ip",
982                 12: "dst_ip",
983                 61: (self.direction == "tx"),
984             },
985         )
986
987         # expected two templates and one cflow packet
988         self.collector.get_capture(2)
989
990         ipfix.remove_vpp_config()
991         self.logger.info("FFP_TEST_FINISH_0002")
992
993     def test_L4onIP4(self):
994         """L4 data on IP4 datapath"""
995         self.logger.info("FFP_TEST_START_0003")
996         self.pg_enable_capture(self.pg_interfaces)
997         self.pkts = []
998
999         ipfix = VppCFLOW(
1000             test=self,
1001             intf=self.intf2,
1002             layer="l4",
1003             datapath="ip4",
1004             direction=self.direction,
1005         )
1006         ipfix.add_vpp_config()
1007
1008         ipfix_decoder = IPFIXDecoder()
1009         # template packet should arrive immediately
1010         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1011
1012         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
1013         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1014
1015         # make sure the one packet we expect actually showed up
1016         self.vapi.ipfix_flush()
1017         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1018         self.verify_cflow_data_detail(
1019             ipfix_decoder,
1020             capture,
1021             cflow,
1022             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
1023         )
1024
1025         # expected two templates and one cflow packet
1026         self.collector.get_capture(2)
1027
1028         ipfix.remove_vpp_config()
1029         self.logger.info("FFP_TEST_FINISH_0003")
1030
1031     def test_templatesIP6(self):
1032         """verify templates on IP6 datapath"""
1033         self.logger.info("FFP_TEST_START_0000")
1034         self.pg_enable_capture(self.pg_interfaces)
1035
1036         ipfix = VppCFLOW(
1037             test=self, intf=self.intf1, datapath="ip6", direction=self.direction
1038         )
1039         ipfix.add_vpp_config()
1040
1041         # template packet should arrive immediately
1042         ipfix.verify_templates(count=1)
1043         self.collector.get_capture(1)
1044
1045         ipfix.remove_vpp_config()
1046
1047         self.logger.info("FFP_TEST_FINISH_0000")
1048
1049     def test_L2onIP6(self):
1050         """L2 data on IP6 datapath"""
1051         self.logger.info("FFP_TEST_START_0001")
1052         self.pg_enable_capture(self.pg_interfaces)
1053         self.pkts = []
1054
1055         ipfix = VppCFLOW(
1056             test=self,
1057             intf=self.intf3,
1058             layer="l2",
1059             datapath="ip6",
1060             direction=self.direction,
1061         )
1062         ipfix.add_vpp_config()
1063
1064         ipfix_decoder = IPFIXDecoder()
1065         # template packet should arrive immediately
1066         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1067
1068         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1069         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1070
1071         # make sure the one packet we expect actually showed up
1072         self.vapi.ipfix_flush()
1073         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1074         self.verify_cflow_data_detail(
1075             ipfix_decoder,
1076             capture,
1077             cflow,
1078             {2: "packets", 256: 56710, 61: (self.direction == "tx")},
1079             ip_ver="v6",
1080         )
1081
1082         # expected two templates and one cflow packet
1083         self.collector.get_capture(2)
1084
1085         ipfix.remove_vpp_config()
1086         self.logger.info("FFP_TEST_FINISH_0001")
1087
1088     def test_L3onIP6(self):
1089         """L3 data on IP6 datapath"""
1090         self.logger.info("FFP_TEST_START_0002")
1091         self.pg_enable_capture(self.pg_interfaces)
1092         self.pkts = []
1093
1094         ipfix = VppCFLOW(
1095             test=self,
1096             intf=self.intf3,
1097             layer="l3",
1098             datapath="ip6",
1099             direction=self.direction,
1100         )
1101         ipfix.add_vpp_config()
1102
1103         ipfix_decoder = IPFIXDecoder()
1104         # template packet should arrive immediately
1105         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1106
1107         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1108         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1109
1110         # make sure the one packet we expect actually showed up
1111         self.vapi.ipfix_flush()
1112         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1113         self.verify_cflow_data_detail(
1114             ipfix_decoder,
1115             capture,
1116             cflow,
1117             {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
1118             ip_ver="v6",
1119         )
1120
1121         # expected two templates and one cflow packet
1122         self.collector.get_capture(2)
1123
1124         ipfix.remove_vpp_config()
1125         self.logger.info("FFP_TEST_FINISH_0002")
1126
1127     def test_L4onIP6(self):
1128         """L4 data on IP6 datapath"""
1129         self.logger.info("FFP_TEST_START_0003")
1130         self.pg_enable_capture(self.pg_interfaces)
1131         self.pkts = []
1132
1133         ipfix = VppCFLOW(
1134             test=self,
1135             intf=self.intf3,
1136             layer="l4",
1137             datapath="ip6",
1138             direction=self.direction,
1139         )
1140         ipfix.add_vpp_config()
1141
1142         ipfix_decoder = IPFIXDecoder()
1143         # template packet should arrive immediately
1144         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1145
1146         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1147         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1148
1149         # make sure the one packet we expect actually showed up
1150         self.vapi.ipfix_flush()
1151         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1152         self.verify_cflow_data_detail(
1153             ipfix_decoder,
1154             capture,
1155             cflow,
1156             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
1157             ip_ver="v6",
1158         )
1159
1160         # expected two templates and one cflow packet
1161         self.collector.get_capture(2)
1162
1163         ipfix.remove_vpp_config()
1164         self.logger.info("FFP_TEST_FINISH_0003")
1165
1166     def test_0001(self):
1167         """no timers, one CFLOW packet, 9 Flows inside"""
1168         self.logger.info("FFP_TEST_START_0001")
1169         self.pg_enable_capture(self.pg_interfaces)
1170         self.pkts = []
1171
1172         ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
1173         ipfix.add_vpp_config()
1174
1175         ipfix_decoder = IPFIXDecoder()
1176         # template packet should arrive immediately
1177         templates = ipfix.verify_templates(ipfix_decoder)
1178
1179         self.create_stream(packets=9)
1180         capture = self.send_packets()
1181
1182         # make sure the one packet we expect actually showed up
1183         self.vapi.ipfix_flush()
1184         cflow = self.wait_for_cflow_packet(self.collector, templates[1])
1185         self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
1186         self.collector.get_capture(4)
1187
1188         ipfix.remove_vpp_config()
1189         self.logger.info("FFP_TEST_FINISH_0001")
1190
1191     def test_0002(self):
1192         """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
1193         self.logger.info("FFP_TEST_START_0002")
1194         self.pg_enable_capture(self.pg_interfaces)
1195         self.pkts = []
1196
1197         ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
1198         ipfix.add_vpp_config()
1199
1200         ipfix_decoder = IPFIXDecoder()
1201         # template packet should arrive immediately
1202         self.vapi.ipfix_flush()
1203         templates = ipfix.verify_templates(ipfix_decoder)
1204
1205         self.create_stream(packets=6)
1206         capture = self.send_packets()
1207
1208         # make sure the one packet we expect actually showed up
1209         cflows = []
1210         self.vapi.ipfix_flush()
1211         cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1212         cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1213         self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
1214         self.collector.get_capture(5)
1215
1216         ipfix.remove_vpp_config()
1217         self.logger.info("FFP_TEST_FINISH_0002")
1218
1219
1220 @tag_fixme_vpp_workers
1221 class DatapathTx(MethodHolder, DatapathTestsHolder):
1222     """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
1223
1224     intf1 = "pg2"
1225     intf2 = "pg4"
1226     intf3 = "pg6"
1227     direction = "tx"
1228
1229
1230 @tag_fixme_vpp_workers
1231 class DatapathRx(MethodHolder, DatapathTestsHolder):
1232     """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
1233
1234     intf1 = "pg1"
1235     intf2 = "pg3"
1236     intf3 = "pg5"
1237     direction = "rx"
1238
1239
1240 @unittest.skipUnless(config.extended, "part of extended tests")
1241 class DisableIPFIX(MethodHolder):
1242     """Disable IPFIX"""
1243
1244     @classmethod
1245     def setUpClass(cls):
1246         super(DisableIPFIX, cls).setUpClass()
1247
1248     @classmethod
1249     def tearDownClass(cls):
1250         super(DisableIPFIX, cls).tearDownClass()
1251
1252     def test_0001(self):
1253         """disable IPFIX after first packets"""
1254         self.logger.info("FFP_TEST_START_0001")
1255         self.pg_enable_capture(self.pg_interfaces)
1256         self.pkts = []
1257
1258         ipfix = VppCFLOW(test=self)
1259         ipfix.add_vpp_config()
1260
1261         ipfix_decoder = IPFIXDecoder()
1262         # template packet should arrive immediately
1263         templates = ipfix.verify_templates(ipfix_decoder)
1264
1265         self.create_stream()
1266         self.send_packets()
1267
1268         # make sure the one packet we expect actually showed up
1269         self.vapi.ipfix_flush()
1270         self.wait_for_cflow_packet(self.collector, templates[1])
1271         self.collector.get_capture(4)
1272
1273         # disable IPFIX
1274         ipfix.disable_exporter()
1275         self.pg_enable_capture([self.collector])
1276
1277         self.send_packets()
1278
1279         # make sure no one packet arrived in 1 minute
1280         self.vapi.ipfix_flush()
1281         self.sleep(1, "wait before verifying no packets sent")
1282         self.collector.assert_nothing_captured()
1283
1284         ipfix.remove_vpp_config()
1285         self.logger.info("FFP_TEST_FINISH_0001")
1286
1287
1288 @unittest.skipUnless(config.extended, "part of extended tests")
1289 class ReenableIPFIX(MethodHolder):
1290     """Re-enable IPFIX"""
1291
1292     @classmethod
1293     def setUpClass(cls):
1294         super(ReenableIPFIX, cls).setUpClass()
1295
1296     @classmethod
1297     def tearDownClass(cls):
1298         super(ReenableIPFIX, cls).tearDownClass()
1299
1300     def test_0011(self):
1301         """disable IPFIX after first packets and re-enable after few packets"""
1302         self.logger.info("FFP_TEST_START_0001")
1303         self.pg_enable_capture(self.pg_interfaces)
1304         self.pkts = []
1305
1306         ipfix = VppCFLOW(test=self)
1307         ipfix.add_vpp_config()
1308
1309         ipfix_decoder = IPFIXDecoder()
1310         # template packet should arrive immediately
1311         templates = ipfix.verify_templates(ipfix_decoder)
1312
1313         self.create_stream(packets=5)
1314         self.send_packets()
1315
1316         # make sure the one packet we expect actually showed up
1317         self.vapi.ipfix_flush()
1318         self.wait_for_cflow_packet(self.collector, templates[1])
1319         self.collector.get_capture(4)
1320
1321         # disable IPFIX
1322         ipfix.disable_exporter()
1323         self.vapi.ipfix_flush()
1324         self.pg_enable_capture([self.collector])
1325
1326         self.send_packets()
1327
1328         # make sure no one packet arrived in active timer span
1329         self.vapi.ipfix_flush()
1330         self.sleep(1, "wait before verifying no packets sent")
1331         self.collector.assert_nothing_captured()
1332         self.pg2.get_capture(5)
1333
1334         # enable IPFIX
1335         ipfix.enable_exporter()
1336
1337         capture = self.collector.get_capture(4)
1338         nr_templates = 0
1339         nr_data = 0
1340         for p in capture:
1341             self.assertTrue(p.haslayer(IPFIX))
1342             if p.haslayer(Template):
1343                 nr_templates += 1
1344         self.assertTrue(nr_templates, 3)
1345         for p in capture:
1346             self.assertTrue(p.haslayer(IPFIX))
1347             if p.haslayer(Data):
1348                 nr_data += 1
1349         self.assertTrue(nr_templates, 1)
1350
1351         ipfix.remove_vpp_config()
1352         self.logger.info("FFP_TEST_FINISH_0001")
1353
1354
1355 @unittest.skipUnless(config.extended, "part of extended tests")
1356 class DisableFP(MethodHolder):
1357     """Disable Flowprobe feature"""
1358
1359     @classmethod
1360     def setUpClass(cls):
1361         super(DisableFP, cls).setUpClass()
1362
1363     @classmethod
1364     def tearDownClass(cls):
1365         super(DisableFP, cls).tearDownClass()
1366
1367     def test_0001(self):
1368         """disable flowprobe feature after first packets"""
1369         self.logger.info("FFP_TEST_START_0001")
1370         self.pg_enable_capture(self.pg_interfaces)
1371         self.pkts = []
1372         ipfix = VppCFLOW(test=self)
1373         ipfix.add_vpp_config()
1374
1375         ipfix_decoder = IPFIXDecoder()
1376         # template packet should arrive immediately
1377         templates = ipfix.verify_templates(ipfix_decoder)
1378
1379         self.create_stream()
1380         self.send_packets()
1381
1382         # make sure the one packet we expect actually showed up
1383         self.vapi.ipfix_flush()
1384         self.wait_for_cflow_packet(self.collector, templates[1])
1385         self.collector.get_capture(4)
1386
1387         # disable IPFIX
1388         ipfix.disable_flowprobe_feature()
1389         self.pg_enable_capture([self.collector])
1390
1391         self.send_packets()
1392
1393         # make sure no one packet arrived in active timer span
1394         self.vapi.ipfix_flush()
1395         self.sleep(1, "wait before verifying no packets sent")
1396         self.collector.assert_nothing_captured()
1397
1398         ipfix.remove_vpp_config()
1399         self.logger.info("FFP_TEST_FINISH_0001")
1400
1401     def test_no_leftover_flows_after_disabling(self):
1402         """disable flowprobe feature and expect no leftover flows"""
1403         self.pg_enable_capture(self.pg_interfaces)
1404         self.pkts = []
1405
1406         # enable ip4 datapath for an interface
1407         # set active and passive timers
1408         ipfix = VppCFLOW(
1409             test=self,
1410             active=3,
1411             passive=4,
1412             intf="pg3",
1413             layer="l3",
1414             datapath="ip4",
1415             direction="rx",
1416             mtu=100,
1417         )
1418         ipfix.add_vpp_config()
1419
1420         # template packet should arrive immediately
1421         ipfix.verify_templates(count=1)
1422
1423         # send some ip4 packets
1424         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5)
1425         self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1426
1427         # disable feature for the interface
1428         # currently stored ip4 flows should be removed
1429         ipfix.disable_flowprobe_feature()
1430
1431         # no leftover ip4 flows are expected
1432         self.pg_enable_capture([self.collector])
1433         self.sleep(12, "wait for leftover ip4 flows during three passive intervals")
1434         self.collector.assert_nothing_captured()
1435
1436         # cleanup
1437         ipfix.disable_exporter()
1438
1439
1440 @unittest.skipUnless(config.extended, "part of extended tests")
1441 class ReenableFP(MethodHolder):
1442     """Re-enable Flowprobe feature"""
1443
1444     @classmethod
1445     def setUpClass(cls):
1446         super(ReenableFP, cls).setUpClass()
1447
1448     @classmethod
1449     def tearDownClass(cls):
1450         super(ReenableFP, cls).tearDownClass()
1451
1452     def test_0001(self):
1453         """disable flowprobe feature after first packets and re-enable
1454         after few packets"""
1455         self.logger.info("FFP_TEST_START_0001")
1456         self.pg_enable_capture(self.pg_interfaces)
1457         self.pkts = []
1458
1459         ipfix = VppCFLOW(test=self)
1460         ipfix.add_vpp_config()
1461
1462         ipfix_decoder = IPFIXDecoder()
1463         # template packet should arrive immediately
1464         self.vapi.ipfix_flush()
1465         templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1466
1467         self.create_stream()
1468         self.send_packets()
1469
1470         # make sure the one packet we expect actually showed up
1471         self.vapi.ipfix_flush()
1472         self.wait_for_cflow_packet(self.collector, templates[1], 5)
1473         self.collector.get_capture(4)
1474
1475         # disable FPP feature
1476         ipfix.disable_flowprobe_feature()
1477         self.pg_enable_capture([self.collector])
1478
1479         self.send_packets()
1480
1481         # make sure no one packet arrived in active timer span
1482         self.vapi.ipfix_flush()
1483         self.sleep(5, "wait before verifying no packets sent")
1484         self.collector.assert_nothing_captured()
1485
1486         # enable FPP feature
1487         ipfix.enable_flowprobe_feature()
1488         self.vapi.ipfix_flush()
1489         templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1490
1491         self.send_packets()
1492
1493         # make sure the next packets (templates and data) we expect actually
1494         # showed up
1495         self.vapi.ipfix_flush()
1496         self.wait_for_cflow_packet(self.collector, templates[1], 5)
1497         self.collector.get_capture(4)
1498
1499         ipfix.remove_vpp_config()
1500         self.logger.info("FFP_TEST_FINISH_0001")
1501
1502
1503 if __name__ == "__main__":
1504     unittest.main(testRunner=VppTestRunner)