nat: improve nat44-ed outside address distribution
[vpp.git] / test / test_flowprobe.py
1 #!/usr/bin/env python3
2 from __future__ import print_function
3 import binascii
4 import random
5 import socket
6 import unittest
7 import time
8 import re
9
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, TCP, UDP
13 from scapy.layers.inet6 import IPv6
14
15 from config import config
16 from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11
17 from framework import is_distro_ubuntu2204, is_distro_debian11
18 from framework import VppTestCase, VppTestRunner
19 from framework import tag_run_solo
20 from vpp_object import VppObject
21 from vpp_pg_interface import CaptureTimeoutError
22 from util import ppp
23 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
24 from vpp_ip_route import VppIpRoute, VppRoutePath
25 from vpp_papi.macaddress import mac_ntop
26 from socket import inet_ntop
27 from vpp_papi import VppEnum
28
29
30 class VppCFLOW(VppObject):
31     """CFLOW object for IPFIX exporter and Flowprobe feature"""
32
33     def __init__(
34         self,
35         test,
36         intf="pg2",
37         active=0,
38         passive=0,
39         timeout=100,
40         mtu=1024,
41         datapath="l2",
42         layer="l2 l3 l4",
43         direction="tx",
44     ):
45         self._test = test
46         self._intf = intf
47         self._intf_obj = getattr(self._test, intf)
48         self._active = active
49         if passive == 0 or passive < active:
50             self._passive = active + 1
51         else:
52             self._passive = passive
53         self._datapath = datapath  # l2 ip4 ip6
54         self._collect = layer  # l2 l3 l4
55         self._direction = direction  # rx tx both
56         self._timeout = timeout
57         self._mtu = mtu
58         self._configured = False
59
60     def add_vpp_config(self):
61         self.enable_exporter()
62         l2_flag = 0
63         l3_flag = 0
64         l4_flag = 0
65         if "l2" in self._collect.lower():
66             l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
67         if "l3" in self._collect.lower():
68             l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
69         if "l4" in self._collect.lower():
70             l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
71         self._test.vapi.flowprobe_set_params(
72             record_flags=(l2_flag | l3_flag | l4_flag),
73             active_timer=self._active,
74             passive_timer=self._passive,
75         )
76         self.enable_flowprobe_feature()
77         self._test.vapi.cli("ipfix flush")
78         self._configured = True
79
80     def remove_vpp_config(self):
81         self.disable_exporter()
82         self.disable_flowprobe_feature()
83         self._test.vapi.cli("ipfix flush")
84         self._configured = False
85
86     def enable_exporter(self):
87         self._test.vapi.set_ipfix_exporter(
88             collector_address=self._test.pg0.remote_ip4,
89             src_address=self._test.pg0.local_ip4,
90             path_mtu=self._mtu,
91             template_interval=self._timeout,
92         )
93
94     def _enable_disable_flowprobe_feature(self, is_add):
95         which_map = {
96             "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
97             "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
98             "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
99         }
100         direction_map = {
101             "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
102             "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
103             "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
104         }
105         self._test.vapi.flowprobe_interface_add_del(
106             is_add=is_add,
107             which=which_map[self._datapath],
108             direction=direction_map[self._direction],
109             sw_if_index=self._intf_obj.sw_if_index,
110         )
111
112     def enable_flowprobe_feature(self):
113         self._enable_disable_flowprobe_feature(is_add=True)
114
115     def disable_exporter(self):
116         self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
117
118     def disable_flowprobe_feature(self):
119         self._enable_disable_flowprobe_feature(is_add=False)
120
121     def object_id(self):
122         return "ipfix-collector-%s-%s" % (self._src, self.dst)
123
124     def query_vpp_config(self):
125         return self._configured
126
127     def verify_templates(self, decoder=None, timeout=1, count=3):
128         templates = []
129         self._test.assertIn(count, (1, 2, 3))
130         for _ in range(count):
131             p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
132             self._test.assertTrue(p.haslayer(IPFIX))
133             if decoder is not None and p.haslayer(Template):
134                 templates.append(p[Template].templateID)
135                 decoder.add_template(p.getlayer(Template))
136         return templates
137
138
139 class MethodHolder(VppTestCase):
140     """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
141
142     # Test variables
143     debug_print = False
144     max_number_of_packets = 10
145     pkts = []
146
147     @classmethod
148     def setUpClass(cls):
149         """
150         Perform standard class setup (defined by class method setUpClass in
151         class VppTestCase) before running the test case, set test case related
152         variables and configure VPP.
153         """
154         super(MethodHolder, cls).setUpClass()
155         if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
156             cls, "vpp"
157         ):
158             return
159         try:
160             # Create pg interfaces
161             cls.create_pg_interfaces(range(9))
162
163             # Packet sizes
164             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
165
166             # Create BD with MAC learning and unknown unicast flooding disabled
167             # and put interfaces to this BD
168             cls.vapi.bridge_domain_add_del_v2(
169                 bd_id=1, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
170             )
171             cls.vapi.sw_interface_set_l2_bridge(
172                 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
173             )
174             cls.vapi.sw_interface_set_l2_bridge(
175                 rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
176             )
177
178             # Set up all interfaces
179             for i in cls.pg_interfaces:
180                 i.admin_up()
181
182             cls.pg0.config_ip4()
183             cls.pg0.configure_ipv4_neighbors()
184             cls.collector = cls.pg0
185
186             cls.pg1.config_ip4()
187             cls.pg1.resolve_arp()
188             cls.pg2.config_ip4()
189             cls.pg2.resolve_arp()
190             cls.pg3.config_ip4()
191             cls.pg3.resolve_arp()
192             cls.pg4.config_ip4()
193             cls.pg4.resolve_arp()
194             cls.pg7.config_ip4()
195             cls.pg8.config_ip4()
196             cls.pg8.configure_ipv4_neighbors()
197
198             cls.pg5.config_ip6()
199             cls.pg5.resolve_ndp()
200             cls.pg5.disable_ipv6_ra()
201             cls.pg6.config_ip6()
202             cls.pg6.resolve_ndp()
203             cls.pg6.disable_ipv6_ra()
204         except Exception:
205             super(MethodHolder, cls).tearDownClass()
206             raise
207
208     @classmethod
209     def tearDownClass(cls):
210         super(MethodHolder, cls).tearDownClass()
211
212     def create_stream(
213         self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
214     ):
215         """Create a packet stream to tickle the plugin
216
217         :param VppInterface src_if: Source interface for packet stream
218         :param VppInterface src_if: Dst interface for packet stream
219         """
220         if src_if is None:
221             src_if = self.pg1
222         if dst_if is None:
223             dst_if = self.pg2
224         self.pkts = []
225         if packets is None:
226             packets = random.randint(1, self.max_number_of_packets)
227         pkt_size = size
228         for p in range(0, packets):
229             if size is None:
230                 pkt_size = random.choice(self.pg_if_packet_sizes)
231             info = self.create_packet_info(src_if, dst_if)
232             payload = self.info_to_payload(info)
233             p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
234             if ip_ver == "v4":
235                 p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
236             else:
237                 p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
238             p /= UDP(sport=1234, dport=4321)
239             p /= Raw(payload)
240             info.data = p.copy()
241             self.extend_packet(p, pkt_size)
242             self.pkts.append(p)
243
244     def verify_cflow_data(self, decoder, capture, cflow):
245         octets = 0
246         packets = 0
247         for p in capture:
248             octets += p[IP].len
249             packets += 1
250         if cflow.haslayer(Data):
251             data = decoder.decode_data_set(cflow.getlayer(Set))
252             for record in data:
253                 self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
254                 self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
255
256     def send_packets(self, src_if=None, dst_if=None):
257         if src_if is None:
258             src_if = self.pg1
259         if dst_if is None:
260             dst_if = self.pg2
261         self.pg_enable_capture([dst_if])
262         src_if.add_stream(self.pkts)
263         self.pg_start()
264         return dst_if.get_capture(len(self.pkts))
265
266     def verify_cflow_data_detail(
267         self, decoder, capture, cflow, data_set={1: "octets", 2: "packets"}, ip_ver="v4"
268     ):
269         if self.debug_print:
270             print(capture[0].show())
271         if cflow.haslayer(Data):
272             data = decoder.decode_data_set(cflow.getlayer(Set))
273             if self.debug_print:
274                 print(data)
275             if ip_ver == "v4":
276                 ip_layer = capture[0][IP]
277             else:
278                 ip_layer = capture[0][IPv6]
279             if data_set is not None:
280                 for record in data:
281                     # skip flow if ingress/egress interface is 0
282                     if int(binascii.hexlify(record[10]), 16) == 0:
283                         continue
284                     if int(binascii.hexlify(record[14]), 16) == 0:
285                         continue
286
287                     for field in data_set:
288                         value = data_set[field]
289                         if value == "octets":
290                             value = ip_layer.len
291                             if ip_ver == "v6":
292                                 value += 40  # ??? is this correct
293                         elif value == "packets":
294                             value = 1
295                         elif value == "src_ip":
296                             if ip_ver == "v4":
297                                 ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
298                             else:
299                                 ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
300                             value = int(binascii.hexlify(ip), 16)
301                         elif value == "dst_ip":
302                             if ip_ver == "v4":
303                                 ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
304                             else:
305                                 ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
306                             value = int(binascii.hexlify(ip), 16)
307                         elif value == "sport":
308                             value = int(capture[0][UDP].sport)
309                         elif value == "dport":
310                             value = int(capture[0][UDP].dport)
311                         self.assertEqual(
312                             int(binascii.hexlify(record[field]), 16), value
313                         )
314
315     def verify_cflow_data_notimer(self, decoder, capture, cflows):
316         idx = 0
317         for cflow in cflows:
318             if cflow.haslayer(Data):
319                 data = decoder.decode_data_set(cflow.getlayer(Set))
320             else:
321                 raise Exception("No CFLOW data")
322
323             for rec in data:
324                 p = capture[idx]
325                 idx += 1
326                 self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
327                 self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
328         self.assertEqual(len(capture), idx)
329
330     def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
331         """wait for CFLOW packet and verify its correctness
332
333         :param timeout: how long to wait
334
335         """
336         self.logger.info("IPFIX: Waiting for CFLOW packet")
337         # self.logger.debug(self.vapi.ppcli("show flow table"))
338         p = collector_intf.wait_for_packet(timeout=timeout)
339         self.assertEqual(p[Set].setID, set_id)
340         # self.logger.debug(self.vapi.ppcli("show flow table"))
341         self.logger.debug(ppp("IPFIX: Got packet:", p))
342         return p
343
344
345 @tag_run_solo
346 @tag_fixme_vpp_workers
347 @tag_fixme_ubuntu2204
348 @tag_fixme_debian11
349 class Flowprobe(MethodHolder):
350     """Template verification, timer tests"""
351
352     @classmethod
353     def setUpClass(cls):
354         super(Flowprobe, cls).setUpClass()
355
356     @classmethod
357     def tearDownClass(cls):
358         super(Flowprobe, cls).tearDownClass()
359
360     def test_0001(self):
361         """timer less than template timeout"""
362         self.logger.info("FFP_TEST_START_0001")
363         self.pg_enable_capture(self.pg_interfaces)
364         self.pkts = []
365
366         ipfix = VppCFLOW(test=self, active=2)
367         ipfix.add_vpp_config()
368
369         ipfix_decoder = IPFIXDecoder()
370         # template packet should arrive immediately
371         templates = ipfix.verify_templates(ipfix_decoder)
372
373         self.create_stream(packets=1)
374         self.send_packets()
375         capture = self.pg2.get_capture(1)
376
377         # make sure the one packet we expect actually showed up
378         cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
379         self.verify_cflow_data(ipfix_decoder, capture, cflow)
380
381         ipfix.remove_vpp_config()
382         self.logger.info("FFP_TEST_FINISH_0001")
383
384     def test_0002(self):
385         """timer greater than template timeout"""
386         self.logger.info("FFP_TEST_START_0002")
387         self.pg_enable_capture(self.pg_interfaces)
388         self.pkts = []
389
390         ipfix = VppCFLOW(test=self, timeout=3, active=4)
391         ipfix.add_vpp_config()
392
393         ipfix_decoder = IPFIXDecoder()
394         # template packet should arrive immediately
395         ipfix.verify_templates()
396
397         self.create_stream(packets=2)
398         self.send_packets()
399         capture = self.pg2.get_capture(2)
400
401         # next set of template packet should arrive after 20 seconds
402         # template packet should arrive within 20 s
403         templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
404
405         # make sure the one packet we expect actually showed up
406         cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
407         self.verify_cflow_data(ipfix_decoder, capture, cflow)
408
409         ipfix.remove_vpp_config()
410         self.logger.info("FFP_TEST_FINISH_0002")
411
412     def test_cflow_packet(self):
413         """verify cflow packet fields"""
414         self.logger.info("FFP_TEST_START_0000")
415         self.pg_enable_capture(self.pg_interfaces)
416         self.pkts = []
417
418         ipfix = VppCFLOW(
419             test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
420         )
421         ipfix.add_vpp_config()
422
423         route_9001 = VppIpRoute(
424             self,
425             "9.0.0.0",
426             24,
427             [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
428         )
429         route_9001.add_vpp_config()
430
431         ipfix_decoder = IPFIXDecoder()
432         templates = ipfix.verify_templates(ipfix_decoder, count=1)
433
434         self.pkts = [
435             (
436                 Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
437                 / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
438                 / TCP(sport=1234, dport=4321, flags=80)
439                 / Raw(b"\xa5" * 100)
440             )
441         ]
442
443         nowUTC = int(time.time())
444         nowUNIX = nowUTC + 2208988800
445         self.send_packets(src_if=self.pg7, dst_if=self.pg8)
446
447         cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
448         self.collector.get_capture(2)
449
450         if cflow[0].haslayer(IPFIX):
451             self.assertEqual(cflow[IPFIX].version, 10)
452             self.assertEqual(cflow[IPFIX].observationDomainID, 1)
453             self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
454             self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
455         if cflow.haslayer(Data):
456             record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
457             # ingress interface
458             self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
459             # egress interface
460             self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
461             # direction
462             self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
463             # packets
464             self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
465             # src mac
466             self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
467             # dst mac
468             self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
469             flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
470             # flow start timestamp
471             self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
472             flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
473             # flow end timestamp
474             self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
475             # ethernet type
476             self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
477             # src ip
478             self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
479             # dst ip
480             self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
481             # protocol (TCP)
482             self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
483             # src port
484             self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
485             # dst port
486             self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
487             # tcp flags
488             self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
489
490         ipfix.remove_vpp_config()
491         self.logger.info("FFP_TEST_FINISH_0000")
492
493     def test_interface_dump(self):
494         """Dump interfaces with IPFIX flow record generation enabled"""
495         self.logger.info("FFP_TEST_START_0003")
496
497         # Enable feature for 3 interfaces
498         ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx")
499         ipfix1.add_vpp_config()
500
501         ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx")
502         ipfix2.enable_flowprobe_feature()
503
504         ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both")
505         ipfix3.enable_flowprobe_feature()
506
507         # When request "all", dump should contain all enabled interfaces
508         dump = self.vapi.flowprobe_interface_dump()
509         self.assertEqual(len(dump), 3)
510
511         # Verify 1st interface
512         self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index)
513         self.assertEqual(
514             dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2
515         )
516         self.assertEqual(
517             dump[0].direction,
518             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
519         )
520
521         # Verify 2nd interface
522         self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index)
523         self.assertEqual(
524             dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
525         )
526         self.assertEqual(
527             dump[1].direction,
528             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
529         )
530
531         # Verify 3rd interface
532         self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index)
533         self.assertEqual(
534             dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6
535         )
536         self.assertEqual(
537             dump[2].direction,
538             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
539         )
540
541         # When request 2nd interface, dump should contain only the specified interface
542         dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index)
543         self.assertEqual(len(dump), 1)
544
545         # Verify 2nd interface
546         self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index)
547         self.assertEqual(
548             dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
549         )
550         self.assertEqual(
551             dump[0].direction,
552             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
553         )
554
555         # When request 99th interface, dump should be empty
556         dump = self.vapi.flowprobe_interface_dump(sw_if_index=99)
557         self.assertEqual(len(dump), 0)
558
559         ipfix1.remove_vpp_config()
560         ipfix2.remove_vpp_config()
561         ipfix3.remove_vpp_config()
562         self.logger.info("FFP_TEST_FINISH_0003")
563
564     def test_get_params(self):
565         """Get IPFIX flow record generation parameters"""
566         self.logger.info("FFP_TEST_START_0004")
567
568         # Enable feature for an interface with custom parameters
569         ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4")
570         ipfix.add_vpp_config()
571
572         # Get and verify parameters
573         params = self.vapi.flowprobe_get_params()
574         self.assertEqual(params.active_timer, 20)
575         self.assertEqual(params.passive_timer, 40)
576         record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
577         record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
578         record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
579         self.assertEqual(params.record_flags, record_flags)
580
581         ipfix.remove_vpp_config()
582         self.logger.info("FFP_TEST_FINISH_0004")
583
584
585 class DatapathTestsHolder(object):
586     """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
587
588     @classmethod
589     def setUpClass(cls):
590         super(DatapathTestsHolder, cls).setUpClass()
591
592     @classmethod
593     def tearDownClass(cls):
594         super(DatapathTestsHolder, cls).tearDownClass()
595
596     def test_templatesL2(self):
597         """verify template on L2 datapath"""
598         self.logger.info("FFP_TEST_START_0000")
599         self.pg_enable_capture(self.pg_interfaces)
600
601         ipfix = VppCFLOW(
602             test=self, intf=self.intf1, layer="l2", direction=self.direction
603         )
604         ipfix.add_vpp_config()
605
606         # template packet should arrive immediately
607         self.vapi.ipfix_flush()
608         ipfix.verify_templates(timeout=3, count=1)
609         self.collector.get_capture(1)
610
611         ipfix.remove_vpp_config()
612         self.logger.info("FFP_TEST_FINISH_0000")
613
614     def test_L2onL2(self):
615         """L2 data on L2 datapath"""
616         self.logger.info("FFP_TEST_START_0001")
617         self.pg_enable_capture(self.pg_interfaces)
618         self.pkts = []
619
620         ipfix = VppCFLOW(
621             test=self, intf=self.intf1, layer="l2", direction=self.direction
622         )
623         ipfix.add_vpp_config()
624
625         ipfix_decoder = IPFIXDecoder()
626         # template packet should arrive immediately
627         templates = ipfix.verify_templates(ipfix_decoder, count=1)
628
629         self.create_stream(packets=1)
630         capture = self.send_packets()
631
632         # make sure the one packet we expect actually showed up
633         self.vapi.ipfix_flush()
634         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
635         self.verify_cflow_data_detail(
636             ipfix_decoder,
637             capture,
638             cflow,
639             {2: "packets", 256: 8, 61: (self.direction == "tx")},
640         )
641         self.collector.get_capture(2)
642
643         ipfix.remove_vpp_config()
644         self.logger.info("FFP_TEST_FINISH_0001")
645
646     def test_L3onL2(self):
647         """L3 data on L2 datapath"""
648         self.logger.info("FFP_TEST_START_0002")
649         self.pg_enable_capture(self.pg_interfaces)
650         self.pkts = []
651
652         ipfix = VppCFLOW(
653             test=self, intf=self.intf1, layer="l3", direction=self.direction
654         )
655         ipfix.add_vpp_config()
656
657         ipfix_decoder = IPFIXDecoder()
658         # template packet should arrive immediately
659         templates = ipfix.verify_templates(ipfix_decoder, count=2)
660
661         self.create_stream(packets=1)
662         capture = self.send_packets()
663
664         # make sure the one packet we expect actually showed up
665         self.vapi.ipfix_flush()
666         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
667         self.verify_cflow_data_detail(
668             ipfix_decoder,
669             capture,
670             cflow,
671             {
672                 2: "packets",
673                 4: 17,
674                 8: "src_ip",
675                 12: "dst_ip",
676                 61: (self.direction == "tx"),
677             },
678         )
679
680         self.collector.get_capture(3)
681
682         ipfix.remove_vpp_config()
683         self.logger.info("FFP_TEST_FINISH_0002")
684
685     def test_L4onL2(self):
686         """L4 data on L2 datapath"""
687         self.logger.info("FFP_TEST_START_0003")
688         self.pg_enable_capture(self.pg_interfaces)
689         self.pkts = []
690
691         ipfix = VppCFLOW(
692             test=self, intf=self.intf1, layer="l4", direction=self.direction
693         )
694         ipfix.add_vpp_config()
695
696         ipfix_decoder = IPFIXDecoder()
697         # template packet should arrive immediately
698         templates = ipfix.verify_templates(ipfix_decoder, count=2)
699
700         self.create_stream(packets=1)
701         capture = self.send_packets()
702
703         # make sure the one packet we expect actually showed up
704         self.vapi.ipfix_flush()
705         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
706         self.verify_cflow_data_detail(
707             ipfix_decoder,
708             capture,
709             cflow,
710             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
711         )
712
713         self.collector.get_capture(3)
714
715         ipfix.remove_vpp_config()
716         self.logger.info("FFP_TEST_FINISH_0003")
717
718     def test_templatesIp4(self):
719         """verify templates on IP4 datapath"""
720         self.logger.info("FFP_TEST_START_0000")
721
722         self.pg_enable_capture(self.pg_interfaces)
723
724         ipfix = VppCFLOW(
725             test=self, intf=self.intf1, datapath="ip4", direction=self.direction
726         )
727         ipfix.add_vpp_config()
728
729         # template packet should arrive immediately
730         self.vapi.ipfix_flush()
731         ipfix.verify_templates(timeout=3, count=1)
732         self.collector.get_capture(1)
733
734         ipfix.remove_vpp_config()
735
736         self.logger.info("FFP_TEST_FINISH_0000")
737
738     def test_L2onIP4(self):
739         """L2 data on IP4 datapath"""
740         self.logger.info("FFP_TEST_START_0001")
741         self.pg_enable_capture(self.pg_interfaces)
742         self.pkts = []
743
744         ipfix = VppCFLOW(
745             test=self,
746             intf=self.intf2,
747             layer="l2",
748             datapath="ip4",
749             direction=self.direction,
750         )
751         ipfix.add_vpp_config()
752
753         ipfix_decoder = IPFIXDecoder()
754         # template packet should arrive immediately
755         templates = ipfix.verify_templates(ipfix_decoder, count=1)
756
757         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
758         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
759
760         # make sure the one packet we expect actually showed up
761         self.vapi.ipfix_flush()
762         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
763         self.verify_cflow_data_detail(
764             ipfix_decoder,
765             capture,
766             cflow,
767             {2: "packets", 256: 8, 61: (self.direction == "tx")},
768         )
769
770         # expected two templates and one cflow packet
771         self.collector.get_capture(2)
772
773         ipfix.remove_vpp_config()
774         self.logger.info("FFP_TEST_FINISH_0001")
775
776     def test_L3onIP4(self):
777         """L3 data on IP4 datapath"""
778         self.logger.info("FFP_TEST_START_0002")
779         self.pg_enable_capture(self.pg_interfaces)
780         self.pkts = []
781
782         ipfix = VppCFLOW(
783             test=self,
784             intf=self.intf2,
785             layer="l3",
786             datapath="ip4",
787             direction=self.direction,
788         )
789         ipfix.add_vpp_config()
790
791         ipfix_decoder = IPFIXDecoder()
792         # template packet should arrive immediately
793         templates = ipfix.verify_templates(ipfix_decoder, count=1)
794
795         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
796         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
797
798         # make sure the one packet we expect actually showed up
799         self.vapi.ipfix_flush()
800         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
801         self.verify_cflow_data_detail(
802             ipfix_decoder,
803             capture,
804             cflow,
805             {
806                 1: "octets",
807                 2: "packets",
808                 8: "src_ip",
809                 12: "dst_ip",
810                 61: (self.direction == "tx"),
811             },
812         )
813
814         # expected two templates and one cflow packet
815         self.collector.get_capture(2)
816
817         ipfix.remove_vpp_config()
818         self.logger.info("FFP_TEST_FINISH_0002")
819
820     def test_L4onIP4(self):
821         """L4 data on IP4 datapath"""
822         self.logger.info("FFP_TEST_START_0003")
823         self.pg_enable_capture(self.pg_interfaces)
824         self.pkts = []
825
826         ipfix = VppCFLOW(
827             test=self,
828             intf=self.intf2,
829             layer="l4",
830             datapath="ip4",
831             direction=self.direction,
832         )
833         ipfix.add_vpp_config()
834
835         ipfix_decoder = IPFIXDecoder()
836         # template packet should arrive immediately
837         templates = ipfix.verify_templates(ipfix_decoder, count=1)
838
839         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
840         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
841
842         # make sure the one packet we expect actually showed up
843         self.vapi.ipfix_flush()
844         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
845         self.verify_cflow_data_detail(
846             ipfix_decoder,
847             capture,
848             cflow,
849             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
850         )
851
852         # expected two templates and one cflow packet
853         self.collector.get_capture(2)
854
855         ipfix.remove_vpp_config()
856         self.logger.info("FFP_TEST_FINISH_0003")
857
858     def test_templatesIP6(self):
859         """verify templates on IP6 datapath"""
860         self.logger.info("FFP_TEST_START_0000")
861         self.pg_enable_capture(self.pg_interfaces)
862
863         ipfix = VppCFLOW(
864             test=self, intf=self.intf1, datapath="ip6", direction=self.direction
865         )
866         ipfix.add_vpp_config()
867
868         # template packet should arrive immediately
869         ipfix.verify_templates(count=1)
870         self.collector.get_capture(1)
871
872         ipfix.remove_vpp_config()
873
874         self.logger.info("FFP_TEST_FINISH_0000")
875
876     def test_L2onIP6(self):
877         """L2 data on IP6 datapath"""
878         self.logger.info("FFP_TEST_START_0001")
879         self.pg_enable_capture(self.pg_interfaces)
880         self.pkts = []
881
882         ipfix = VppCFLOW(
883             test=self,
884             intf=self.intf3,
885             layer="l2",
886             datapath="ip6",
887             direction=self.direction,
888         )
889         ipfix.add_vpp_config()
890
891         ipfix_decoder = IPFIXDecoder()
892         # template packet should arrive immediately
893         templates = ipfix.verify_templates(ipfix_decoder, count=1)
894
895         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
896         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
897
898         # make sure the one packet we expect actually showed up
899         self.vapi.ipfix_flush()
900         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
901         self.verify_cflow_data_detail(
902             ipfix_decoder,
903             capture,
904             cflow,
905             {2: "packets", 256: 56710, 61: (self.direction == "tx")},
906             ip_ver="v6",
907         )
908
909         # expected two templates and one cflow packet
910         self.collector.get_capture(2)
911
912         ipfix.remove_vpp_config()
913         self.logger.info("FFP_TEST_FINISH_0001")
914
915     def test_L3onIP6(self):
916         """L3 data on IP6 datapath"""
917         self.logger.info("FFP_TEST_START_0002")
918         self.pg_enable_capture(self.pg_interfaces)
919         self.pkts = []
920
921         ipfix = VppCFLOW(
922             test=self,
923             intf=self.intf3,
924             layer="l3",
925             datapath="ip6",
926             direction=self.direction,
927         )
928         ipfix.add_vpp_config()
929
930         ipfix_decoder = IPFIXDecoder()
931         # template packet should arrive immediately
932         templates = ipfix.verify_templates(ipfix_decoder, count=1)
933
934         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
935         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
936
937         # make sure the one packet we expect actually showed up
938         self.vapi.ipfix_flush()
939         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
940         self.verify_cflow_data_detail(
941             ipfix_decoder,
942             capture,
943             cflow,
944             {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
945             ip_ver="v6",
946         )
947
948         # expected two templates and one cflow packet
949         self.collector.get_capture(2)
950
951         ipfix.remove_vpp_config()
952         self.logger.info("FFP_TEST_FINISH_0002")
953
954     def test_L4onIP6(self):
955         """L4 data on IP6 datapath"""
956         self.logger.info("FFP_TEST_START_0003")
957         self.pg_enable_capture(self.pg_interfaces)
958         self.pkts = []
959
960         ipfix = VppCFLOW(
961             test=self,
962             intf=self.intf3,
963             layer="l4",
964             datapath="ip6",
965             direction=self.direction,
966         )
967         ipfix.add_vpp_config()
968
969         ipfix_decoder = IPFIXDecoder()
970         # template packet should arrive immediately
971         templates = ipfix.verify_templates(ipfix_decoder, count=1)
972
973         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
974         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
975
976         # make sure the one packet we expect actually showed up
977         self.vapi.ipfix_flush()
978         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
979         self.verify_cflow_data_detail(
980             ipfix_decoder,
981             capture,
982             cflow,
983             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
984             ip_ver="v6",
985         )
986
987         # expected two templates and one cflow packet
988         self.collector.get_capture(2)
989
990         ipfix.remove_vpp_config()
991         self.logger.info("FFP_TEST_FINISH_0003")
992
993     def test_0001(self):
994         """no timers, one CFLOW packet, 9 Flows inside"""
995         self.logger.info("FFP_TEST_START_0001")
996         self.pg_enable_capture(self.pg_interfaces)
997         self.pkts = []
998
999         ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
1000         ipfix.add_vpp_config()
1001
1002         ipfix_decoder = IPFIXDecoder()
1003         # template packet should arrive immediately
1004         templates = ipfix.verify_templates(ipfix_decoder)
1005
1006         self.create_stream(packets=9)
1007         capture = self.send_packets()
1008
1009         # make sure the one packet we expect actually showed up
1010         self.vapi.ipfix_flush()
1011         cflow = self.wait_for_cflow_packet(self.collector, templates[1])
1012         self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
1013         self.collector.get_capture(4)
1014
1015         ipfix.remove_vpp_config()
1016         self.logger.info("FFP_TEST_FINISH_0001")
1017
1018     def test_0002(self):
1019         """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
1020         self.logger.info("FFP_TEST_START_0002")
1021         self.pg_enable_capture(self.pg_interfaces)
1022         self.pkts = []
1023
1024         ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
1025         ipfix.add_vpp_config()
1026
1027         ipfix_decoder = IPFIXDecoder()
1028         # template packet should arrive immediately
1029         self.vapi.ipfix_flush()
1030         templates = ipfix.verify_templates(ipfix_decoder)
1031
1032         self.create_stream(packets=6)
1033         capture = self.send_packets()
1034
1035         # make sure the one packet we expect actually showed up
1036         cflows = []
1037         self.vapi.ipfix_flush()
1038         cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1039         cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1040         self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
1041         self.collector.get_capture(5)
1042
1043         ipfix.remove_vpp_config()
1044         self.logger.info("FFP_TEST_FINISH_0002")
1045
1046
1047 @tag_fixme_vpp_workers
1048 class DatapathTx(MethodHolder, DatapathTestsHolder):
1049     """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
1050
1051     intf1 = "pg2"
1052     intf2 = "pg4"
1053     intf3 = "pg6"
1054     direction = "tx"
1055
1056
1057 @tag_fixme_vpp_workers
1058 class DatapathRx(MethodHolder, DatapathTestsHolder):
1059     """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
1060
1061     intf1 = "pg1"
1062     intf2 = "pg3"
1063     intf3 = "pg5"
1064     direction = "rx"
1065
1066
1067 @unittest.skipUnless(config.extended, "part of extended tests")
1068 class DisableIPFIX(MethodHolder):
1069     """Disable IPFIX"""
1070
1071     @classmethod
1072     def setUpClass(cls):
1073         super(DisableIPFIX, cls).setUpClass()
1074
1075     @classmethod
1076     def tearDownClass(cls):
1077         super(DisableIPFIX, cls).tearDownClass()
1078
1079     def test_0001(self):
1080         """disable IPFIX after first packets"""
1081         self.logger.info("FFP_TEST_START_0001")
1082         self.pg_enable_capture(self.pg_interfaces)
1083         self.pkts = []
1084
1085         ipfix = VppCFLOW(test=self)
1086         ipfix.add_vpp_config()
1087
1088         ipfix_decoder = IPFIXDecoder()
1089         # template packet should arrive immediately
1090         templates = ipfix.verify_templates(ipfix_decoder)
1091
1092         self.create_stream()
1093         self.send_packets()
1094
1095         # make sure the one packet we expect actually showed up
1096         self.vapi.ipfix_flush()
1097         self.wait_for_cflow_packet(self.collector, templates[1])
1098         self.collector.get_capture(4)
1099
1100         # disable IPFIX
1101         ipfix.disable_exporter()
1102         self.pg_enable_capture([self.collector])
1103
1104         self.send_packets()
1105
1106         # make sure no one packet arrived in 1 minute
1107         self.vapi.ipfix_flush()
1108         self.sleep(1, "wait before verifying no packets sent")
1109         self.collector.assert_nothing_captured()
1110
1111         ipfix.remove_vpp_config()
1112         self.logger.info("FFP_TEST_FINISH_0001")
1113
1114
1115 @unittest.skipUnless(config.extended, "part of extended tests")
1116 class ReenableIPFIX(MethodHolder):
1117     """Re-enable IPFIX"""
1118
1119     @classmethod
1120     def setUpClass(cls):
1121         super(ReenableIPFIX, cls).setUpClass()
1122
1123     @classmethod
1124     def tearDownClass(cls):
1125         super(ReenableIPFIX, cls).tearDownClass()
1126
1127     def test_0011(self):
1128         """disable IPFIX after first packets and re-enable after few packets"""
1129         self.logger.info("FFP_TEST_START_0001")
1130         self.pg_enable_capture(self.pg_interfaces)
1131         self.pkts = []
1132
1133         ipfix = VppCFLOW(test=self)
1134         ipfix.add_vpp_config()
1135
1136         ipfix_decoder = IPFIXDecoder()
1137         # template packet should arrive immediately
1138         templates = ipfix.verify_templates(ipfix_decoder)
1139
1140         self.create_stream(packets=5)
1141         self.send_packets()
1142
1143         # make sure the one packet we expect actually showed up
1144         self.vapi.ipfix_flush()
1145         self.wait_for_cflow_packet(self.collector, templates[1])
1146         self.collector.get_capture(4)
1147
1148         # disable IPFIX
1149         ipfix.disable_exporter()
1150         self.vapi.ipfix_flush()
1151         self.pg_enable_capture([self.collector])
1152
1153         self.send_packets()
1154
1155         # make sure no one packet arrived in active timer span
1156         self.vapi.ipfix_flush()
1157         self.sleep(1, "wait before verifying no packets sent")
1158         self.collector.assert_nothing_captured()
1159         self.pg2.get_capture(5)
1160
1161         # enable IPFIX
1162         ipfix.enable_exporter()
1163
1164         capture = self.collector.get_capture(4)
1165         nr_templates = 0
1166         nr_data = 0
1167         for p in capture:
1168             self.assertTrue(p.haslayer(IPFIX))
1169             if p.haslayer(Template):
1170                 nr_templates += 1
1171         self.assertTrue(nr_templates, 3)
1172         for p in capture:
1173             self.assertTrue(p.haslayer(IPFIX))
1174             if p.haslayer(Data):
1175                 nr_data += 1
1176         self.assertTrue(nr_templates, 1)
1177
1178         ipfix.remove_vpp_config()
1179         self.logger.info("FFP_TEST_FINISH_0001")
1180
1181
1182 @unittest.skipUnless(config.extended, "part of extended tests")
1183 class DisableFP(MethodHolder):
1184     """Disable Flowprobe feature"""
1185
1186     @classmethod
1187     def setUpClass(cls):
1188         super(DisableFP, cls).setUpClass()
1189
1190     @classmethod
1191     def tearDownClass(cls):
1192         super(DisableFP, cls).tearDownClass()
1193
1194     def test_0001(self):
1195         """disable flowprobe feature after first packets"""
1196         self.logger.info("FFP_TEST_START_0001")
1197         self.pg_enable_capture(self.pg_interfaces)
1198         self.pkts = []
1199         ipfix = VppCFLOW(test=self)
1200         ipfix.add_vpp_config()
1201
1202         ipfix_decoder = IPFIXDecoder()
1203         # template packet should arrive immediately
1204         templates = ipfix.verify_templates(ipfix_decoder)
1205
1206         self.create_stream()
1207         self.send_packets()
1208
1209         # make sure the one packet we expect actually showed up
1210         self.vapi.ipfix_flush()
1211         self.wait_for_cflow_packet(self.collector, templates[1])
1212         self.collector.get_capture(4)
1213
1214         # disable IPFIX
1215         ipfix.disable_flowprobe_feature()
1216         self.pg_enable_capture([self.collector])
1217
1218         self.send_packets()
1219
1220         # make sure no one packet arrived in active timer span
1221         self.vapi.ipfix_flush()
1222         self.sleep(1, "wait before verifying no packets sent")
1223         self.collector.assert_nothing_captured()
1224
1225         ipfix.remove_vpp_config()
1226         self.logger.info("FFP_TEST_FINISH_0001")
1227
1228
1229 @unittest.skipUnless(config.extended, "part of extended tests")
1230 class ReenableFP(MethodHolder):
1231     """Re-enable Flowprobe feature"""
1232
1233     @classmethod
1234     def setUpClass(cls):
1235         super(ReenableFP, cls).setUpClass()
1236
1237     @classmethod
1238     def tearDownClass(cls):
1239         super(ReenableFP, cls).tearDownClass()
1240
1241     def test_0001(self):
1242         """disable flowprobe feature after first packets and re-enable
1243         after few packets"""
1244         self.logger.info("FFP_TEST_START_0001")
1245         self.pg_enable_capture(self.pg_interfaces)
1246         self.pkts = []
1247
1248         ipfix = VppCFLOW(test=self)
1249         ipfix.add_vpp_config()
1250
1251         ipfix_decoder = IPFIXDecoder()
1252         # template packet should arrive immediately
1253         self.vapi.ipfix_flush()
1254         templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1255
1256         self.create_stream()
1257         self.send_packets()
1258
1259         # make sure the one packet we expect actually showed up
1260         self.vapi.ipfix_flush()
1261         self.wait_for_cflow_packet(self.collector, templates[1], 5)
1262         self.collector.get_capture(4)
1263
1264         # disable FPP feature
1265         ipfix.disable_flowprobe_feature()
1266         self.pg_enable_capture([self.collector])
1267
1268         self.send_packets()
1269
1270         # make sure no one packet arrived in active timer span
1271         self.vapi.ipfix_flush()
1272         self.sleep(5, "wait before verifying no packets sent")
1273         self.collector.assert_nothing_captured()
1274
1275         # enable FPP feature
1276         ipfix.enable_flowprobe_feature()
1277         self.vapi.ipfix_flush()
1278         templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1279
1280         self.send_packets()
1281
1282         # make sure the next packets (templates and data) we expect actually
1283         # showed up
1284         self.vapi.ipfix_flush()
1285         self.wait_for_cflow_packet(self.collector, templates[1], 5)
1286         self.collector.get_capture(4)
1287
1288         ipfix.remove_vpp_config()
1289         self.logger.info("FFP_TEST_FINISH_0001")
1290
1291
1292 if __name__ == "__main__":
1293     unittest.main(testRunner=VppTestRunner)