dev: add change_max_rx_frame_size capability
[vpp.git] / test / test_flowprobe.py
1 #!/usr/bin/env python3
2 from __future__ import print_function
3 import binascii
4 import random
5 import socket
6 import unittest
7 import time
8
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, TCP, UDP
12 from scapy.layers.inet6 import IPv6
13 from scapy.contrib.lacp import SlowProtocol, LACP
14
15 from config import config
16 from framework import VppTestCase
17 from asfframework import (
18     tag_fixme_vpp_workers,
19     tag_fixme_ubuntu2204,
20     tag_fixme_debian11,
21     tag_run_solo,
22     is_distro_ubuntu2204,
23     is_distro_debian11,
24     VppTestRunner,
25 )
26 from vpp_object import VppObject
27 from util import ppp
28 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
29 from vpp_ip_route import VppIpRoute, VppRoutePath
30 from vpp_papi.macaddress import mac_ntop
31 from socket import inet_ntop
32 from vpp_papi import VppEnum
33 from vpp_sub_interface import VppDot1ADSubint
34
35
36 TMPL_COMMON_FIELD_COUNT = 6
37 TMPL_L2_FIELD_COUNT = 3
38 TMPL_L3_FIELD_COUNT = 4
39 TMPL_L4_FIELD_COUNT = 3
40
41 IPFIX_TCP_FLAGS_ID = 6
42 IPFIX_SRC_TRANS_PORT_ID = 7
43 IPFIX_DST_TRANS_PORT_ID = 11
44 IPFIX_SRC_IP4_ADDR_ID = 8
45 IPFIX_DST_IP4_ADDR_ID = 12
46 IPFIX_FLOW_DIRECTION_ID = 61
47
48 TCP_F_FIN = 0x01
49 TCP_F_SYN = 0x02
50 TCP_F_RST = 0x04
51 TCP_F_PSH = 0x08
52 TCP_F_ACK = 0x10
53 TCP_F_URG = 0x20
54 TCP_F_ECE = 0x40
55 TCP_F_CWR = 0x80
56
57
58 class VppCFLOW(VppObject):
59     """CFLOW object for IPFIX exporter and Flowprobe feature"""
60
61     def __init__(
62         self,
63         test,
64         intf="pg2",
65         active=0,
66         passive=0,
67         timeout=100,
68         mtu=1024,
69         datapath="l2",
70         layer="l2 l3 l4",
71         direction="tx",
72     ):
73         self._test = test
74         self._intf = intf
75         self._intf_obj = getattr(self._test, intf)
76         self._active = active
77         if passive == 0 or passive < active:
78             self._passive = active + 1
79         else:
80             self._passive = passive
81         self._datapath = datapath  # l2 ip4 ip6
82         self._collect = layer  # l2 l3 l4
83         self._direction = direction  # rx tx both
84         self._timeout = timeout
85         self._mtu = mtu
86         self._configured = False
87
88     def add_vpp_config(self):
89         self.enable_exporter()
90         l2_flag = 0
91         l3_flag = 0
92         l4_flag = 0
93         if "l2" in self._collect.lower():
94             l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
95         if "l3" in self._collect.lower():
96             l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
97         if "l4" in self._collect.lower():
98             l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
99         self._test.vapi.flowprobe_set_params(
100             record_flags=(l2_flag | l3_flag | l4_flag),
101             active_timer=self._active,
102             passive_timer=self._passive,
103         )
104         self.enable_flowprobe_feature()
105         self._test.vapi.cli("ipfix flush")
106         self._configured = True
107
108     def remove_vpp_config(self):
109         self.disable_exporter()
110         self.disable_flowprobe_feature()
111         self._test.vapi.cli("ipfix flush")
112         self._configured = False
113
114     def enable_exporter(self):
115         self._test.vapi.set_ipfix_exporter(
116             collector_address=self._test.pg0.remote_ip4,
117             src_address=self._test.pg0.local_ip4,
118             path_mtu=self._mtu,
119             template_interval=self._timeout,
120         )
121
122     def _enable_disable_flowprobe_feature(self, is_add):
123         which_map = {
124             "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
125             "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
126             "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
127         }
128         direction_map = {
129             "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
130             "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
131             "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
132         }
133         self._test.vapi.flowprobe_interface_add_del(
134             is_add=is_add,
135             which=which_map[self._datapath],
136             direction=direction_map[self._direction],
137             sw_if_index=self._intf_obj.sw_if_index,
138         )
139
140     def enable_flowprobe_feature(self):
141         self._enable_disable_flowprobe_feature(is_add=True)
142
143     def disable_exporter(self):
144         self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
145
146     def disable_flowprobe_feature(self):
147         self._enable_disable_flowprobe_feature(is_add=False)
148
149     def object_id(self):
150         return "ipfix-collector-%s-%s" % (self._src, self.dst)
151
152     def query_vpp_config(self):
153         return self._configured
154
155     def verify_templates(self, decoder=None, timeout=1, count=3, field_count_in=None):
156         templates = []
157         self._test.assertIn(count, (1, 2, 3))
158         for _ in range(count):
159             p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
160             self._test.assertTrue(p.haslayer(IPFIX))
161             self._test.assertTrue(p.haslayer(Template))
162             if decoder is not None:
163                 templates.append(p[Template].templateID)
164                 decoder.add_template(p.getlayer(Template))
165             if field_count_in is not None:
166                 self._test.assertIn(p[Template].fieldCount, field_count_in)
167         return templates
168
169
170 class MethodHolder(VppTestCase):
171     """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
172
173     # Test variables
174     debug_print = False
175     max_number_of_packets = 10
176     pkts = []
177
178     @classmethod
179     def setUpClass(cls):
180         """
181         Perform standard class setup (defined by class method setUpClass in
182         class VppTestCase) before running the test case, set test case related
183         variables and configure VPP.
184         """
185         super(MethodHolder, cls).setUpClass()
186         if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
187             cls, "vpp"
188         ):
189             return
190         try:
191             # Create pg interfaces
192             cls.create_pg_interfaces(range(9))
193
194             # Packet sizes
195             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
196
197             # Create BD with MAC learning and unknown unicast flooding disabled
198             # and put interfaces to this BD
199             cls.vapi.bridge_domain_add_del_v2(
200                 bd_id=1, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
201             )
202             cls.vapi.sw_interface_set_l2_bridge(
203                 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
204             )
205             cls.vapi.sw_interface_set_l2_bridge(
206                 rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
207             )
208
209             # Set up all interfaces
210             for i in cls.pg_interfaces:
211                 i.admin_up()
212
213             cls.pg0.config_ip4()
214             cls.pg0.configure_ipv4_neighbors()
215             cls.collector = cls.pg0
216
217             cls.pg1.config_ip4()
218             cls.pg1.resolve_arp()
219             cls.pg2.config_ip4()
220             cls.pg2.resolve_arp()
221             cls.pg3.config_ip4()
222             cls.pg3.resolve_arp()
223             cls.pg4.config_ip4()
224             cls.pg4.resolve_arp()
225             cls.pg7.config_ip4()
226             cls.pg8.config_ip4()
227             cls.pg8.configure_ipv4_neighbors()
228
229             cls.pg5.config_ip6()
230             cls.pg5.resolve_ndp()
231             cls.pg5.disable_ipv6_ra()
232             cls.pg6.config_ip6()
233             cls.pg6.resolve_ndp()
234             cls.pg6.disable_ipv6_ra()
235         except Exception:
236             super(MethodHolder, cls).tearDownClass()
237             raise
238
239     @classmethod
240     def tearDownClass(cls):
241         super(MethodHolder, cls).tearDownClass()
242
243     def create_stream(
244         self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
245     ):
246         """Create a packet stream to tickle the plugin
247
248         :param VppInterface src_if: Source interface for packet stream
249         :param VppInterface src_if: Dst interface for packet stream
250         """
251         if src_if is None:
252             src_if = self.pg1
253         if dst_if is None:
254             dst_if = self.pg2
255         self.pkts = []
256         if packets is None:
257             packets = random.randint(1, self.max_number_of_packets)
258         pkt_size = size
259         for p in range(0, packets):
260             if size is None:
261                 pkt_size = random.choice(self.pg_if_packet_sizes)
262             info = self.create_packet_info(src_if, dst_if)
263             payload = self.info_to_payload(info)
264             p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
265             if ip_ver == "v4":
266                 p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
267             else:
268                 p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
269             p /= UDP(sport=1234, dport=4321)
270             p /= Raw(payload)
271             info.data = p.copy()
272             self.extend_packet(p, pkt_size)
273             self.pkts.append(p)
274
275     def verify_cflow_data(self, decoder, capture, cflow):
276         octets = 0
277         packets = 0
278         for p in capture:
279             octets += p[IP].len
280             packets += 1
281         if cflow.haslayer(Data):
282             data = decoder.decode_data_set(cflow.getlayer(Set))
283             for record in data:
284                 self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
285                 self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
286
287     def send_packets(self, src_if=None, dst_if=None):
288         if src_if is None:
289             src_if = self.pg1
290         if dst_if is None:
291             dst_if = self.pg2
292         self.pg_enable_capture([dst_if])
293         src_if.add_stream(self.pkts)
294         self.pg_start()
295         return dst_if.get_capture(len(self.pkts))
296
297     def verify_cflow_data_detail(
298         self,
299         decoder,
300         capture,
301         cflow,
302         data_set={1: "octets", 2: "packets"},
303         ip_ver="v4",
304         field_count=None,
305     ):
306         if self.debug_print:
307             print(capture[0].show())
308         if cflow.haslayer(Data):
309             data = decoder.decode_data_set(cflow.getlayer(Set))
310             if self.debug_print:
311                 print(data)
312             if ip_ver == "v4":
313                 ip_layer = capture[0][IP] if capture[0].haslayer(IP) else None
314             else:
315                 ip_layer = capture[0][IPv6] if capture[0].haslayer(IPv6) else None
316             if data_set is not None:
317                 for record in data:
318                     # skip flow if ingress/egress interface is 0
319                     if int(binascii.hexlify(record[10]), 16) == 0:
320                         continue
321                     if int(binascii.hexlify(record[14]), 16) == 0:
322                         continue
323
324                     for field in data_set:
325                         value = data_set[field]
326                         if value == "octets":
327                             value = ip_layer.len
328                             if ip_ver == "v6":
329                                 value += 40  # ??? is this correct
330                         elif value == "packets":
331                             value = 1
332                         elif value == "src_ip":
333                             if ip_ver == "v4":
334                                 ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
335                             else:
336                                 ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
337                             value = int(binascii.hexlify(ip), 16)
338                         elif value == "dst_ip":
339                             if ip_ver == "v4":
340                                 ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
341                             else:
342                                 ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
343                             value = int(binascii.hexlify(ip), 16)
344                         elif value == "sport":
345                             value = int(capture[0][UDP].sport)
346                         elif value == "dport":
347                             value = int(capture[0][UDP].dport)
348                         self.assertEqual(
349                             int(binascii.hexlify(record[field]), 16), value
350                         )
351             if field_count is not None:
352                 for record in data:
353                     self.assertEqual(len(record), field_count)
354
355     def verify_cflow_data_notimer(self, decoder, capture, cflows):
356         idx = 0
357         for cflow in cflows:
358             if cflow.haslayer(Data):
359                 data = decoder.decode_data_set(cflow.getlayer(Set))
360             else:
361                 raise Exception("No CFLOW data")
362
363             for rec in data:
364                 p = capture[idx]
365                 idx += 1
366                 self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
367                 self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
368         self.assertEqual(len(capture), idx)
369
370     def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
371         """wait for CFLOW packet and verify its correctness
372
373         :param timeout: how long to wait
374
375         """
376         self.logger.info("IPFIX: Waiting for CFLOW packet")
377         # self.logger.debug(self.vapi.ppcli("show flow table"))
378         p = collector_intf.wait_for_packet(timeout=timeout)
379         self.assertEqual(p[Set].setID, set_id)
380         # self.logger.debug(self.vapi.ppcli("show flow table"))
381         self.logger.debug(ppp("IPFIX: Got packet:", p))
382         return p
383
384
385 @tag_run_solo
386 @tag_fixme_vpp_workers
387 @tag_fixme_ubuntu2204
388 @tag_fixme_debian11
389 class Flowprobe(MethodHolder):
390     """Template verification, timer tests"""
391
392     @classmethod
393     def setUpClass(cls):
394         super(Flowprobe, cls).setUpClass()
395
396     @classmethod
397     def tearDownClass(cls):
398         super(Flowprobe, cls).tearDownClass()
399
400     def test_0001(self):
401         """timer less than template timeout"""
402         self.logger.info("FFP_TEST_START_0001")
403         self.pg_enable_capture(self.pg_interfaces)
404         self.pkts = []
405
406         ipfix = VppCFLOW(test=self, active=2)
407         ipfix.add_vpp_config()
408
409         ipfix_decoder = IPFIXDecoder()
410         # template packet should arrive immediately
411         templates = ipfix.verify_templates(ipfix_decoder)
412
413         self.create_stream(packets=1)
414         self.send_packets()
415         capture = self.pg2.get_capture(1)
416
417         # make sure the one packet we expect actually showed up
418         cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
419         self.verify_cflow_data(ipfix_decoder, capture, cflow)
420
421         ipfix.remove_vpp_config()
422         self.logger.info("FFP_TEST_FINISH_0001")
423
424     def test_0002(self):
425         """timer greater than template timeout"""
426         self.logger.info("FFP_TEST_START_0002")
427         self.pg_enable_capture(self.pg_interfaces)
428         self.pkts = []
429
430         ipfix = VppCFLOW(test=self, timeout=3, active=4)
431         ipfix.add_vpp_config()
432
433         ipfix_decoder = IPFIXDecoder()
434         # template packet should arrive immediately
435         ipfix.verify_templates()
436
437         self.create_stream(packets=2)
438         self.send_packets()
439         capture = self.pg2.get_capture(2)
440
441         # next set of template packet should arrive after 20 seconds
442         # template packet should arrive within 20 s
443         templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
444
445         # make sure the one packet we expect actually showed up
446         cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
447         self.verify_cflow_data(ipfix_decoder, capture, cflow)
448
449         ipfix.remove_vpp_config()
450         self.logger.info("FFP_TEST_FINISH_0002")
451
452     def test_cflow_packet(self):
453         """verify cflow packet fields"""
454         self.logger.info("FFP_TEST_START_0000")
455         self.pg_enable_capture(self.pg_interfaces)
456         self.pkts = []
457
458         ipfix = VppCFLOW(
459             test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
460         )
461         ipfix.add_vpp_config()
462
463         route_9001 = VppIpRoute(
464             self,
465             "9.0.0.0",
466             24,
467             [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
468         )
469         route_9001.add_vpp_config()
470
471         ipfix_decoder = IPFIXDecoder()
472         templates = ipfix.verify_templates(ipfix_decoder, count=1)
473
474         self.pkts = [
475             (
476                 Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
477                 / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
478                 / TCP(sport=1234, dport=4321, flags=80)
479                 / Raw(b"\xa5" * 100)
480             )
481         ]
482
483         nowUTC = int(time.time())
484         nowUNIX = nowUTC + 2208988800
485         self.send_packets(src_if=self.pg7, dst_if=self.pg8)
486
487         cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
488         self.collector.get_capture(2)
489
490         if cflow[0].haslayer(IPFIX):
491             self.assertEqual(cflow[IPFIX].version, 10)
492             self.assertEqual(cflow[IPFIX].observationDomainID, 1)
493             self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
494             self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
495         if cflow.haslayer(Data):
496             record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
497             # ingress interface
498             self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
499             # egress interface
500             self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
501             # direction
502             self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
503             # packets
504             self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
505             # src mac
506             self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
507             # dst mac
508             self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
509             flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
510             # flow start timestamp
511             self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
512             flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
513             # flow end timestamp
514             self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
515             # ethernet type
516             self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
517             # src ip
518             self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
519             # dst ip
520             self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
521             # protocol (TCP)
522             self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
523             # src port
524             self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
525             # dst port
526             self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
527             # tcp flags
528             self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
529
530         ipfix.remove_vpp_config()
531         self.logger.info("FFP_TEST_FINISH_0000")
532
533     def test_flow_entry_reuse(self):
534         """Verify flow entry reuse doesn't accumulate meta info"""
535         self.pg_enable_capture(self.pg_interfaces)
536         self.pkts = []
537
538         # enable ip4 datapath for an interface
539         # set active and passive timers
540         ipfix = VppCFLOW(
541             test=self,
542             active=2,
543             passive=3,
544             intf="pg3",
545             layer="l3 l4",
546             datapath="ip4",
547             direction="rx",
548             mtu=100,
549         )
550         ipfix.add_vpp_config()
551
552         # template packet should arrive immediately
553         ipfix_decoder = IPFIXDecoder()
554         templates = ipfix.verify_templates(ipfix_decoder, count=1)
555
556         # make a tcp packet
557         self.pkts = [
558             (
559                 Ether(src=self.pg3.remote_mac, dst=self.pg4.local_mac)
560                 / IP(src=self.pg3.remote_ip4, dst=self.pg4.remote_ip4)
561                 / TCP(sport=1234, dport=4321)
562                 / Raw(b"\xa5" * 50)
563             )
564         ]
565
566         # send the tcp packet two times, each time with new set of flags
567         tcp_flags = (
568             TCP_F_SYN | TCP_F_ACK,
569             TCP_F_RST | TCP_F_PSH,
570         )
571         for f in tcp_flags:
572             self.pkts[0][TCP].flags = f
573             capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
574
575             # verify meta info - packet/octet delta and tcp flags
576             cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=6)
577             self.verify_cflow_data(ipfix_decoder, capture, cflow)
578             self.verify_cflow_data_detail(
579                 ipfix_decoder,
580                 capture,
581                 cflow,
582                 {
583                     IPFIX_TCP_FLAGS_ID: f,
584                     IPFIX_SRC_TRANS_PORT_ID: 1234,
585                     IPFIX_DST_TRANS_PORT_ID: 4321,
586                 },
587             )
588
589         self.collector.get_capture(3)
590
591         # cleanup
592         ipfix.remove_vpp_config()
593
594     def test_interface_dump(self):
595         """Dump interfaces with IPFIX flow record generation enabled"""
596         self.logger.info("FFP_TEST_START_0003")
597
598         # Enable feature for 3 interfaces
599         ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx")
600         ipfix1.add_vpp_config()
601
602         ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx")
603         ipfix2.enable_flowprobe_feature()
604
605         ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both")
606         ipfix3.enable_flowprobe_feature()
607
608         # When request "all", dump should contain all enabled interfaces
609         dump = self.vapi.flowprobe_interface_dump()
610         self.assertEqual(len(dump), 3)
611
612         # Verify 1st interface
613         self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index)
614         self.assertEqual(
615             dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2
616         )
617         self.assertEqual(
618             dump[0].direction,
619             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
620         )
621
622         # Verify 2nd interface
623         self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index)
624         self.assertEqual(
625             dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
626         )
627         self.assertEqual(
628             dump[1].direction,
629             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
630         )
631
632         # Verify 3rd interface
633         self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index)
634         self.assertEqual(
635             dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6
636         )
637         self.assertEqual(
638             dump[2].direction,
639             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
640         )
641
642         # When request 2nd interface, dump should contain only the specified interface
643         dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index)
644         self.assertEqual(len(dump), 1)
645
646         # Verify 2nd interface
647         self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index)
648         self.assertEqual(
649             dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
650         )
651         self.assertEqual(
652             dump[0].direction,
653             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
654         )
655
656         # When request 99th interface, dump should be empty
657         dump = self.vapi.flowprobe_interface_dump(sw_if_index=99)
658         self.assertEqual(len(dump), 0)
659
660         ipfix1.remove_vpp_config()
661         ipfix2.remove_vpp_config()
662         ipfix3.remove_vpp_config()
663         self.logger.info("FFP_TEST_FINISH_0003")
664
665     def test_get_params(self):
666         """Get IPFIX flow record generation parameters"""
667         self.logger.info("FFP_TEST_START_0004")
668
669         # Enable feature for an interface with custom parameters
670         ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4")
671         ipfix.add_vpp_config()
672
673         # Get and verify parameters
674         params = self.vapi.flowprobe_get_params()
675         self.assertEqual(params.active_timer, 20)
676         self.assertEqual(params.passive_timer, 40)
677         record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
678         record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
679         record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
680         self.assertEqual(params.record_flags, record_flags)
681
682         ipfix.remove_vpp_config()
683         self.logger.info("FFP_TEST_FINISH_0004")
684
685
686 class DatapathTestsHolder(object):
687     """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
688
689     @classmethod
690     def setUpClass(cls):
691         super(DatapathTestsHolder, cls).setUpClass()
692
693     @classmethod
694     def tearDownClass(cls):
695         super(DatapathTestsHolder, cls).tearDownClass()
696
697     def test_templatesL2(self):
698         """verify template on L2 datapath"""
699         self.logger.info("FFP_TEST_START_0000")
700         self.pg_enable_capture(self.pg_interfaces)
701
702         ipfix = VppCFLOW(
703             test=self, intf=self.intf1, layer="l2", direction=self.direction
704         )
705         ipfix.add_vpp_config()
706
707         # template packet should arrive immediately
708         self.vapi.ipfix_flush()
709         ipfix.verify_templates(timeout=3, count=1)
710         self.collector.get_capture(1)
711
712         ipfix.remove_vpp_config()
713         self.logger.info("FFP_TEST_FINISH_0000")
714
715     def test_L2onL2(self):
716         """L2 data on L2 datapath"""
717         self.logger.info("FFP_TEST_START_0001")
718         self.pg_enable_capture(self.pg_interfaces)
719         self.pkts = []
720
721         ipfix = VppCFLOW(
722             test=self, intf=self.intf1, layer="l2", direction=self.direction
723         )
724         ipfix.add_vpp_config()
725
726         ipfix_decoder = IPFIXDecoder()
727         # template packet should arrive immediately
728         templates = ipfix.verify_templates(ipfix_decoder, count=1)
729
730         self.create_stream(packets=1)
731         capture = self.send_packets()
732
733         # make sure the one packet we expect actually showed up
734         self.vapi.ipfix_flush()
735         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
736         self.verify_cflow_data_detail(
737             ipfix_decoder,
738             capture,
739             cflow,
740             {2: "packets", 256: 8, 61: (self.direction == "tx")},
741         )
742         self.collector.get_capture(2)
743
744         ipfix.remove_vpp_config()
745         self.logger.info("FFP_TEST_FINISH_0001")
746
747     def test_L3onL2(self):
748         """L3 data on L2 datapath"""
749         self.logger.info("FFP_TEST_START_0002")
750         self.pg_enable_capture(self.pg_interfaces)
751         self.pkts = []
752
753         ipfix = VppCFLOW(
754             test=self, intf=self.intf1, layer="l3", direction=self.direction
755         )
756         ipfix.add_vpp_config()
757
758         ipfix_decoder = IPFIXDecoder()
759         # template packet should arrive immediately
760         templates = ipfix.verify_templates(ipfix_decoder, count=2)
761
762         self.create_stream(packets=1)
763         capture = self.send_packets()
764
765         # make sure the one packet we expect actually showed up
766         self.vapi.ipfix_flush()
767         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
768         self.verify_cflow_data_detail(
769             ipfix_decoder,
770             capture,
771             cflow,
772             {
773                 2: "packets",
774                 4: 17,
775                 8: "src_ip",
776                 12: "dst_ip",
777                 61: (self.direction == "tx"),
778             },
779         )
780
781         self.collector.get_capture(3)
782
783         ipfix.remove_vpp_config()
784         self.logger.info("FFP_TEST_FINISH_0002")
785
786     def test_L234onL2(self):
787         """L2/3/4 data on L2 datapath"""
788         self.pg_enable_capture(self.pg_interfaces)
789         self.pkts = []
790
791         ipfix = VppCFLOW(
792             test=self, intf=self.intf1, layer="l2 l3 l4", direction=self.direction
793         )
794         ipfix.add_vpp_config()
795
796         ipfix_decoder = IPFIXDecoder()
797         # template packet should arrive immediately
798         tmpl_l2_field_count = TMPL_COMMON_FIELD_COUNT + TMPL_L2_FIELD_COUNT
799         tmpl_ip_field_count = (
800             TMPL_COMMON_FIELD_COUNT
801             + TMPL_L2_FIELD_COUNT
802             + TMPL_L3_FIELD_COUNT
803             + TMPL_L4_FIELD_COUNT
804         )
805         templates = ipfix.verify_templates(
806             ipfix_decoder,
807             count=3,
808             field_count_in=(tmpl_l2_field_count, tmpl_ip_field_count),
809         )
810
811         # verify IPv4 and IPv6 flows
812         for ip_ver in ("v4", "v6"):
813             self.create_stream(packets=1, ip_ver=ip_ver)
814             capture = self.send_packets()
815
816             # make sure the one packet we expect actually showed up
817             self.vapi.ipfix_flush()
818             cflow = self.wait_for_cflow_packet(
819                 self.collector, templates[1 if ip_ver == "v4" else 2]
820             )
821             src_ip_id = 8 if ip_ver == "v4" else 27
822             dst_ip_id = 12 if ip_ver == "v4" else 28
823             self.verify_cflow_data_detail(
824                 ipfix_decoder,
825                 capture,
826                 cflow,
827                 {
828                     2: "packets",
829                     256: 8 if ip_ver == "v4" else 56710,
830                     4: 17,
831                     7: "sport",
832                     11: "dport",
833                     src_ip_id: "src_ip",
834                     dst_ip_id: "dst_ip",
835                     61: (self.direction == "tx"),
836                 },
837                 ip_ver=ip_ver,
838                 field_count=tmpl_ip_field_count,
839             )
840
841         # verify non-IP flow
842         self.pkts = [
843             (
844                 Ether(dst=self.pg2.local_mac, src=self.pg1.remote_mac)
845                 / SlowProtocol()
846                 / LACP()
847             )
848         ]
849         capture = self.send_packets()
850
851         # make sure the one packet we expect actually showed up
852         self.vapi.ipfix_flush()
853         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
854         self.verify_cflow_data_detail(
855             ipfix_decoder,
856             capture,
857             cflow,
858             {2: "packets", 256: 2440, 61: (self.direction == "tx")},
859             field_count=tmpl_l2_field_count,
860         )
861
862         self.collector.get_capture(6)
863
864         ipfix.remove_vpp_config()
865
866     def test_L4onL2(self):
867         """L4 data on L2 datapath"""
868         self.logger.info("FFP_TEST_START_0003")
869         self.pg_enable_capture(self.pg_interfaces)
870         self.pkts = []
871
872         ipfix = VppCFLOW(
873             test=self, intf=self.intf1, layer="l4", direction=self.direction
874         )
875         ipfix.add_vpp_config()
876
877         ipfix_decoder = IPFIXDecoder()
878         # template packet should arrive immediately
879         templates = ipfix.verify_templates(ipfix_decoder, count=2)
880
881         self.create_stream(packets=1)
882         capture = self.send_packets()
883
884         # make sure the one packet we expect actually showed up
885         self.vapi.ipfix_flush()
886         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
887         self.verify_cflow_data_detail(
888             ipfix_decoder,
889             capture,
890             cflow,
891             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
892         )
893
894         self.collector.get_capture(3)
895
896         ipfix.remove_vpp_config()
897         self.logger.info("FFP_TEST_FINISH_0003")
898
899     def test_templatesIp4(self):
900         """verify templates on IP4 datapath"""
901         self.logger.info("FFP_TEST_START_0000")
902
903         self.pg_enable_capture(self.pg_interfaces)
904
905         ipfix = VppCFLOW(
906             test=self, intf=self.intf1, datapath="ip4", direction=self.direction
907         )
908         ipfix.add_vpp_config()
909
910         # template packet should arrive immediately
911         self.vapi.ipfix_flush()
912         ipfix.verify_templates(timeout=3, count=1)
913         self.collector.get_capture(1)
914
915         ipfix.remove_vpp_config()
916
917         self.logger.info("FFP_TEST_FINISH_0000")
918
919     def test_L2onIP4(self):
920         """L2 data on IP4 datapath"""
921         self.logger.info("FFP_TEST_START_0001")
922         self.pg_enable_capture(self.pg_interfaces)
923         self.pkts = []
924
925         ipfix = VppCFLOW(
926             test=self,
927             intf=self.intf2,
928             layer="l2",
929             datapath="ip4",
930             direction=self.direction,
931         )
932         ipfix.add_vpp_config()
933
934         ipfix_decoder = IPFIXDecoder()
935         # template packet should arrive immediately
936         templates = ipfix.verify_templates(ipfix_decoder, count=1)
937
938         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
939         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
940
941         # make sure the one packet we expect actually showed up
942         self.vapi.ipfix_flush()
943         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
944         self.verify_cflow_data_detail(
945             ipfix_decoder,
946             capture,
947             cflow,
948             {2: "packets", 256: 8, 61: (self.direction == "tx")},
949         )
950
951         # expected two templates and one cflow packet
952         self.collector.get_capture(2)
953
954         ipfix.remove_vpp_config()
955         self.logger.info("FFP_TEST_FINISH_0001")
956
957     def test_L3onIP4(self):
958         """L3 data on IP4 datapath"""
959         self.logger.info("FFP_TEST_START_0002")
960         self.pg_enable_capture(self.pg_interfaces)
961         self.pkts = []
962
963         ipfix = VppCFLOW(
964             test=self,
965             intf=self.intf2,
966             layer="l3",
967             datapath="ip4",
968             direction=self.direction,
969         )
970         ipfix.add_vpp_config()
971
972         ipfix_decoder = IPFIXDecoder()
973         # template packet should arrive immediately
974         templates = ipfix.verify_templates(ipfix_decoder, count=1)
975
976         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
977         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
978
979         # make sure the one packet we expect actually showed up
980         self.vapi.ipfix_flush()
981         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
982         self.verify_cflow_data_detail(
983             ipfix_decoder,
984             capture,
985             cflow,
986             {
987                 1: "octets",
988                 2: "packets",
989                 8: "src_ip",
990                 12: "dst_ip",
991                 61: (self.direction == "tx"),
992             },
993         )
994
995         # expected two templates and one cflow packet
996         self.collector.get_capture(2)
997
998         ipfix.remove_vpp_config()
999         self.logger.info("FFP_TEST_FINISH_0002")
1000
1001     def test_L4onIP4(self):
1002         """L4 data on IP4 datapath"""
1003         self.logger.info("FFP_TEST_START_0003")
1004         self.pg_enable_capture(self.pg_interfaces)
1005         self.pkts = []
1006
1007         ipfix = VppCFLOW(
1008             test=self,
1009             intf=self.intf2,
1010             layer="l4",
1011             datapath="ip4",
1012             direction=self.direction,
1013         )
1014         ipfix.add_vpp_config()
1015
1016         ipfix_decoder = IPFIXDecoder()
1017         # template packet should arrive immediately
1018         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1019
1020         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
1021         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1022
1023         # make sure the one packet we expect actually showed up
1024         self.vapi.ipfix_flush()
1025         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1026         self.verify_cflow_data_detail(
1027             ipfix_decoder,
1028             capture,
1029             cflow,
1030             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
1031         )
1032
1033         # expected two templates and one cflow packet
1034         self.collector.get_capture(2)
1035
1036         ipfix.remove_vpp_config()
1037         self.logger.info("FFP_TEST_FINISH_0003")
1038
1039     def test_templatesIP6(self):
1040         """verify templates on IP6 datapath"""
1041         self.logger.info("FFP_TEST_START_0000")
1042         self.pg_enable_capture(self.pg_interfaces)
1043
1044         ipfix = VppCFLOW(
1045             test=self, intf=self.intf1, datapath="ip6", direction=self.direction
1046         )
1047         ipfix.add_vpp_config()
1048
1049         # template packet should arrive immediately
1050         ipfix.verify_templates(count=1)
1051         self.collector.get_capture(1)
1052
1053         ipfix.remove_vpp_config()
1054
1055         self.logger.info("FFP_TEST_FINISH_0000")
1056
1057     def test_L2onIP6(self):
1058         """L2 data on IP6 datapath"""
1059         self.logger.info("FFP_TEST_START_0001")
1060         self.pg_enable_capture(self.pg_interfaces)
1061         self.pkts = []
1062
1063         ipfix = VppCFLOW(
1064             test=self,
1065             intf=self.intf3,
1066             layer="l2",
1067             datapath="ip6",
1068             direction=self.direction,
1069         )
1070         ipfix.add_vpp_config()
1071
1072         ipfix_decoder = IPFIXDecoder()
1073         # template packet should arrive immediately
1074         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1075
1076         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1077         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1078
1079         # make sure the one packet we expect actually showed up
1080         self.vapi.ipfix_flush()
1081         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1082         self.verify_cflow_data_detail(
1083             ipfix_decoder,
1084             capture,
1085             cflow,
1086             {2: "packets", 256: 56710, 61: (self.direction == "tx")},
1087             ip_ver="v6",
1088         )
1089
1090         # expected two templates and one cflow packet
1091         self.collector.get_capture(2)
1092
1093         ipfix.remove_vpp_config()
1094         self.logger.info("FFP_TEST_FINISH_0001")
1095
1096     def test_L3onIP6(self):
1097         """L3 data on IP6 datapath"""
1098         self.logger.info("FFP_TEST_START_0002")
1099         self.pg_enable_capture(self.pg_interfaces)
1100         self.pkts = []
1101
1102         ipfix = VppCFLOW(
1103             test=self,
1104             intf=self.intf3,
1105             layer="l3",
1106             datapath="ip6",
1107             direction=self.direction,
1108         )
1109         ipfix.add_vpp_config()
1110
1111         ipfix_decoder = IPFIXDecoder()
1112         # template packet should arrive immediately
1113         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1114
1115         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1116         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1117
1118         # make sure the one packet we expect actually showed up
1119         self.vapi.ipfix_flush()
1120         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1121         self.verify_cflow_data_detail(
1122             ipfix_decoder,
1123             capture,
1124             cflow,
1125             {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
1126             ip_ver="v6",
1127         )
1128
1129         # expected two templates and one cflow packet
1130         self.collector.get_capture(2)
1131
1132         ipfix.remove_vpp_config()
1133         self.logger.info("FFP_TEST_FINISH_0002")
1134
1135     def test_L4onIP6(self):
1136         """L4 data on IP6 datapath"""
1137         self.logger.info("FFP_TEST_START_0003")
1138         self.pg_enable_capture(self.pg_interfaces)
1139         self.pkts = []
1140
1141         ipfix = VppCFLOW(
1142             test=self,
1143             intf=self.intf3,
1144             layer="l4",
1145             datapath="ip6",
1146             direction=self.direction,
1147         )
1148         ipfix.add_vpp_config()
1149
1150         ipfix_decoder = IPFIXDecoder()
1151         # template packet should arrive immediately
1152         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1153
1154         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1155         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1156
1157         # make sure the one packet we expect actually showed up
1158         self.vapi.ipfix_flush()
1159         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1160         self.verify_cflow_data_detail(
1161             ipfix_decoder,
1162             capture,
1163             cflow,
1164             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
1165             ip_ver="v6",
1166         )
1167
1168         # expected two templates and one cflow packet
1169         self.collector.get_capture(2)
1170
1171         ipfix.remove_vpp_config()
1172         self.logger.info("FFP_TEST_FINISH_0003")
1173
1174     def test_0001(self):
1175         """no timers, one CFLOW packet, 9 Flows inside"""
1176         self.logger.info("FFP_TEST_START_0001")
1177         self.pg_enable_capture(self.pg_interfaces)
1178         self.pkts = []
1179
1180         ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
1181         ipfix.add_vpp_config()
1182
1183         ipfix_decoder = IPFIXDecoder()
1184         # template packet should arrive immediately
1185         templates = ipfix.verify_templates(ipfix_decoder)
1186
1187         self.create_stream(packets=9)
1188         capture = self.send_packets()
1189
1190         # make sure the one packet we expect actually showed up
1191         self.vapi.ipfix_flush()
1192         cflow = self.wait_for_cflow_packet(self.collector, templates[1])
1193         self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
1194         self.collector.get_capture(4)
1195
1196         ipfix.remove_vpp_config()
1197         self.logger.info("FFP_TEST_FINISH_0001")
1198
1199     def test_0002(self):
1200         """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
1201         self.logger.info("FFP_TEST_START_0002")
1202         self.pg_enable_capture(self.pg_interfaces)
1203         self.pkts = []
1204
1205         ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
1206         ipfix.add_vpp_config()
1207
1208         ipfix_decoder = IPFIXDecoder()
1209         # template packet should arrive immediately
1210         self.vapi.ipfix_flush()
1211         templates = ipfix.verify_templates(ipfix_decoder)
1212
1213         self.create_stream(packets=6)
1214         capture = self.send_packets()
1215
1216         # make sure the one packet we expect actually showed up
1217         cflows = []
1218         self.vapi.ipfix_flush()
1219         cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1220         cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1221         self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
1222         self.collector.get_capture(5)
1223
1224         ipfix.remove_vpp_config()
1225         self.logger.info("FFP_TEST_FINISH_0002")
1226
1227
1228 @tag_fixme_vpp_workers
1229 class DatapathTx(MethodHolder, DatapathTestsHolder):
1230     """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
1231
1232     intf1 = "pg2"
1233     intf2 = "pg4"
1234     intf3 = "pg6"
1235     direction = "tx"
1236
1237     def test_rewritten_traffic(self):
1238         """Rewritten traffic (from subif to ipfix if)"""
1239         self.pg_enable_capture(self.pg_interfaces)
1240         self.pkts = []
1241
1242         # prepare a sub-interface
1243         subif = VppDot1ADSubint(self, self.pg7, 0, 300, 400)
1244         subif.admin_up()
1245         subif.config_ip4()
1246
1247         # enable ip4 datapath for an interface
1248         ipfix = VppCFLOW(
1249             test=self,
1250             intf="pg8",
1251             datapath="ip4",
1252             layer="l2 l3 l4",
1253             direction=self.direction,
1254         )
1255         ipfix.add_vpp_config()
1256
1257         # template packet should arrive immediately
1258         ipfix_decoder = IPFIXDecoder()
1259         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1260
1261         # forward some traffic through the ipfix interface
1262         route = VppIpRoute(
1263             self,
1264             "9.0.0.0",
1265             24,
1266             [VppRoutePath(self.pg8.remote_ip4, self.pg8.sw_if_index)],
1267         )
1268         route.add_vpp_config()
1269
1270         # prepare an IPv4 packet (subif => ipfix interface)
1271         pkt = (
1272             Ether(src=subif.remote_mac, dst=self.pg7.local_mac)
1273             / IP(src=subif.remote_ip4, dst="9.0.0.1")
1274             / UDP(sport=1234, dport=4321)
1275             / Raw(b"\xa5" * 123)
1276         )
1277         self.pkts = [
1278             subif.add_dot1ad_layer(pkt, 300, 400),
1279         ]
1280
1281         # send the packet
1282         capture = self.send_packets(self.pg7, self.pg8)
1283
1284         # wait for a flow and verify it
1285         self.vapi.ipfix_flush()
1286         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1287         self.verify_cflow_data(ipfix_decoder, capture, cflow)
1288         self.verify_cflow_data_detail(
1289             ipfix_decoder,
1290             capture,
1291             cflow,
1292             {
1293                 IPFIX_SRC_IP4_ADDR_ID: "src_ip",
1294                 IPFIX_DST_IP4_ADDR_ID: "dst_ip",
1295                 IPFIX_SRC_TRANS_PORT_ID: "sport",
1296                 IPFIX_DST_TRANS_PORT_ID: "dport",
1297                 IPFIX_FLOW_DIRECTION_ID: (self.direction == "tx"),
1298             },
1299         )
1300
1301         self.collector.get_capture(2)
1302
1303         # cleanup
1304         route.remove_vpp_config()
1305         subif.remove_vpp_config()
1306         ipfix.remove_vpp_config()
1307
1308
1309 @tag_fixme_vpp_workers
1310 class DatapathRx(MethodHolder, DatapathTestsHolder):
1311     """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
1312
1313     intf1 = "pg1"
1314     intf2 = "pg3"
1315     intf3 = "pg5"
1316     direction = "rx"
1317
1318
1319 @unittest.skipUnless(config.extended, "part of extended tests")
1320 class DisableIPFIX(MethodHolder):
1321     """Disable IPFIX"""
1322
1323     @classmethod
1324     def setUpClass(cls):
1325         super(DisableIPFIX, cls).setUpClass()
1326
1327     @classmethod
1328     def tearDownClass(cls):
1329         super(DisableIPFIX, cls).tearDownClass()
1330
1331     def test_0001(self):
1332         """disable IPFIX after first packets"""
1333         self.logger.info("FFP_TEST_START_0001")
1334         self.pg_enable_capture(self.pg_interfaces)
1335         self.pkts = []
1336
1337         ipfix = VppCFLOW(test=self)
1338         ipfix.add_vpp_config()
1339
1340         ipfix_decoder = IPFIXDecoder()
1341         # template packet should arrive immediately
1342         templates = ipfix.verify_templates(ipfix_decoder)
1343
1344         self.create_stream()
1345         self.send_packets()
1346
1347         # make sure the one packet we expect actually showed up
1348         self.vapi.ipfix_flush()
1349         self.wait_for_cflow_packet(self.collector, templates[1])
1350         self.collector.get_capture(4)
1351
1352         # disable IPFIX
1353         ipfix.disable_exporter()
1354         self.pg_enable_capture([self.collector])
1355
1356         self.send_packets()
1357
1358         # make sure no one packet arrived in 1 minute
1359         self.vapi.ipfix_flush()
1360         self.sleep(1, "wait before verifying no packets sent")
1361         self.collector.assert_nothing_captured()
1362
1363         ipfix.remove_vpp_config()
1364         self.logger.info("FFP_TEST_FINISH_0001")
1365
1366
1367 @unittest.skipUnless(config.extended, "part of extended tests")
1368 class ReenableIPFIX(MethodHolder):
1369     """Re-enable IPFIX"""
1370
1371     @classmethod
1372     def setUpClass(cls):
1373         super(ReenableIPFIX, cls).setUpClass()
1374
1375     @classmethod
1376     def tearDownClass(cls):
1377         super(ReenableIPFIX, cls).tearDownClass()
1378
1379     def test_0011(self):
1380         """disable IPFIX after first packets and re-enable after few packets"""
1381         self.logger.info("FFP_TEST_START_0001")
1382         self.pg_enable_capture(self.pg_interfaces)
1383         self.pkts = []
1384
1385         ipfix = VppCFLOW(test=self)
1386         ipfix.add_vpp_config()
1387
1388         ipfix_decoder = IPFIXDecoder()
1389         # template packet should arrive immediately
1390         templates = ipfix.verify_templates(ipfix_decoder)
1391
1392         self.create_stream(packets=5)
1393         self.send_packets()
1394
1395         # make sure the one packet we expect actually showed up
1396         self.vapi.ipfix_flush()
1397         self.wait_for_cflow_packet(self.collector, templates[1])
1398         self.collector.get_capture(4)
1399
1400         # disable IPFIX
1401         ipfix.disable_exporter()
1402         self.vapi.ipfix_flush()
1403         self.pg_enable_capture([self.collector])
1404
1405         self.send_packets()
1406
1407         # make sure no one packet arrived in active timer span
1408         self.vapi.ipfix_flush()
1409         self.sleep(1, "wait before verifying no packets sent")
1410         self.collector.assert_nothing_captured()
1411         self.pg2.get_capture(5)
1412
1413         # enable IPFIX
1414         ipfix.enable_exporter()
1415
1416         capture = self.collector.get_capture(4)
1417         nr_templates = 0
1418         nr_data = 0
1419         for p in capture:
1420             self.assertTrue(p.haslayer(IPFIX))
1421             if p.haslayer(Template):
1422                 nr_templates += 1
1423         self.assertTrue(nr_templates, 3)
1424         for p in capture:
1425             self.assertTrue(p.haslayer(IPFIX))
1426             if p.haslayer(Data):
1427                 nr_data += 1
1428         self.assertTrue(nr_templates, 1)
1429
1430         ipfix.remove_vpp_config()
1431         self.logger.info("FFP_TEST_FINISH_0001")
1432
1433
1434 @unittest.skipUnless(config.extended, "part of extended tests")
1435 class DisableFP(MethodHolder):
1436     """Disable Flowprobe feature"""
1437
1438     @classmethod
1439     def setUpClass(cls):
1440         super(DisableFP, cls).setUpClass()
1441
1442     @classmethod
1443     def tearDownClass(cls):
1444         super(DisableFP, cls).tearDownClass()
1445
1446     def test_0001(self):
1447         """disable flowprobe feature after first packets"""
1448         self.logger.info("FFP_TEST_START_0001")
1449         self.pg_enable_capture(self.pg_interfaces)
1450         self.pkts = []
1451         ipfix = VppCFLOW(test=self)
1452         ipfix.add_vpp_config()
1453
1454         ipfix_decoder = IPFIXDecoder()
1455         # template packet should arrive immediately
1456         templates = ipfix.verify_templates(ipfix_decoder)
1457
1458         self.create_stream()
1459         self.send_packets()
1460
1461         # make sure the one packet we expect actually showed up
1462         self.vapi.ipfix_flush()
1463         self.wait_for_cflow_packet(self.collector, templates[1])
1464         self.collector.get_capture(4)
1465
1466         # disable IPFIX
1467         ipfix.disable_flowprobe_feature()
1468         self.pg_enable_capture([self.collector])
1469
1470         self.send_packets()
1471
1472         # make sure no one packet arrived in active timer span
1473         self.vapi.ipfix_flush()
1474         self.sleep(1, "wait before verifying no packets sent")
1475         self.collector.assert_nothing_captured()
1476
1477         # enable FPP feature so the remove_vpp_config() doesn't fail
1478         # due to missing feature on interface.
1479         ipfix.enable_flowprobe_feature()
1480
1481         ipfix.remove_vpp_config()
1482         self.logger.info("FFP_TEST_FINISH_0001")
1483
1484     def test_no_leftover_flows_after_disabling(self):
1485         """disable flowprobe feature and expect no leftover flows"""
1486         self.pg_enable_capture(self.pg_interfaces)
1487         self.pkts = []
1488
1489         # enable ip4 datapath for an interface
1490         # set active and passive timers
1491         ipfix = VppCFLOW(
1492             test=self,
1493             active=3,
1494             passive=4,
1495             intf="pg3",
1496             layer="l3",
1497             datapath="ip4",
1498             direction="rx",
1499             mtu=100,
1500         )
1501         ipfix.add_vpp_config()
1502
1503         # template packet should arrive immediately
1504         ipfix.verify_templates(count=1)
1505
1506         # send some ip4 packets
1507         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5)
1508         self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1509
1510         # disable feature for the interface
1511         # currently stored ip4 flows should be removed
1512         ipfix.disable_flowprobe_feature()
1513
1514         # no leftover ip4 flows are expected
1515         self.pg_enable_capture([self.collector])
1516         self.sleep(12, "wait for leftover ip4 flows during three passive intervals")
1517         self.collector.assert_nothing_captured()
1518
1519         # re-enable feature for the interface
1520         ipfix.enable_flowprobe_feature()
1521
1522         # template packet should arrive immediately
1523         ipfix_decoder = IPFIXDecoder()
1524         self.vapi.ipfix_flush()
1525         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1526
1527         # send some ip4 packets
1528         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5)
1529         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1530
1531         # verify meta info - packet/octet delta
1532         self.vapi.ipfix_flush()
1533         cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=8)
1534         self.verify_cflow_data(ipfix_decoder, capture, cflow)
1535
1536         self.collector.get_capture(2)
1537
1538         # cleanup
1539         ipfix.remove_vpp_config()
1540
1541
1542 @unittest.skipUnless(config.extended, "part of extended tests")
1543 class ReenableFP(MethodHolder):
1544     """Re-enable Flowprobe feature"""
1545
1546     @classmethod
1547     def setUpClass(cls):
1548         super(ReenableFP, cls).setUpClass()
1549
1550     @classmethod
1551     def tearDownClass(cls):
1552         super(ReenableFP, cls).tearDownClass()
1553
1554     def test_0001(self):
1555         """disable flowprobe feature after first packets and re-enable
1556         after few packets"""
1557         self.logger.info("FFP_TEST_START_0001")
1558         self.pg_enable_capture(self.pg_interfaces)
1559         self.pkts = []
1560
1561         ipfix = VppCFLOW(test=self)
1562         ipfix.add_vpp_config()
1563
1564         ipfix_decoder = IPFIXDecoder()
1565         # template packet should arrive immediately
1566         self.vapi.ipfix_flush()
1567         templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1568
1569         self.create_stream()
1570         self.send_packets()
1571
1572         # make sure the one packet we expect actually showed up
1573         self.vapi.ipfix_flush()
1574         self.wait_for_cflow_packet(self.collector, templates[1], 5)
1575         self.collector.get_capture(4)
1576
1577         # disable FPP feature
1578         ipfix.disable_flowprobe_feature()
1579         self.pg_enable_capture([self.collector])
1580
1581         self.send_packets()
1582
1583         # make sure no one packet arrived in active timer span
1584         self.vapi.ipfix_flush()
1585         self.sleep(5, "wait before verifying no packets sent")
1586         self.collector.assert_nothing_captured()
1587
1588         # enable FPP feature
1589         ipfix.enable_flowprobe_feature()
1590         self.vapi.ipfix_flush()
1591         templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1592
1593         self.send_packets()
1594
1595         # make sure the next packets (templates and data) we expect actually
1596         # showed up
1597         self.vapi.ipfix_flush()
1598         self.wait_for_cflow_packet(self.collector, templates[1], 5)
1599         self.collector.get_capture(4)
1600
1601         ipfix.remove_vpp_config()
1602         self.logger.info("FFP_TEST_FINISH_0001")
1603
1604
1605 if __name__ == "__main__":
1606     unittest.main(testRunner=VppTestRunner)