ethernet: check destination mac for L3 in ethernet-input node
[vpp.git] / test / test_flowprobe.py
1 #!/usr/bin/env python3
2 from __future__ import print_function
3 import binascii
4 import random
5 import socket
6 import unittest
7 import time
8
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, TCP, UDP
12 from scapy.layers.inet6 import IPv6
13 from scapy.contrib.lacp import SlowProtocol, LACP
14
15 from config import config
16 from framework import VppTestCase
17 from asfframework import (
18     tag_fixme_vpp_workers,
19     tag_fixme_ubuntu2204,
20     tag_fixme_debian11,
21     tag_run_solo,
22     is_distro_ubuntu2204,
23     is_distro_debian11,
24     VppTestRunner,
25 )
26 from vpp_object import VppObject
27 from util import ppp
28 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
29 from vpp_ip_route import VppIpRoute, VppRoutePath
30 from vpp_papi.macaddress import mac_ntop
31 from socket import inet_ntop
32 from vpp_papi import VppEnum
33 from vpp_sub_interface import VppDot1ADSubint
34
35
36 TMPL_COMMON_FIELD_COUNT = 6
37 TMPL_L2_FIELD_COUNT = 3
38 TMPL_L3_FIELD_COUNT = 4
39 TMPL_L4_FIELD_COUNT = 3
40
41 IPFIX_TCP_FLAGS_ID = 6
42 IPFIX_SRC_TRANS_PORT_ID = 7
43 IPFIX_DST_TRANS_PORT_ID = 11
44 IPFIX_SRC_IP4_ADDR_ID = 8
45 IPFIX_DST_IP4_ADDR_ID = 12
46 IPFIX_FLOW_DIRECTION_ID = 61
47
48 TCP_F_FIN = 0x01
49 TCP_F_SYN = 0x02
50 TCP_F_RST = 0x04
51 TCP_F_PSH = 0x08
52 TCP_F_ACK = 0x10
53 TCP_F_URG = 0x20
54 TCP_F_ECE = 0x40
55 TCP_F_CWR = 0x80
56
57
58 class VppCFLOW(VppObject):
59     """CFLOW object for IPFIX exporter and Flowprobe feature"""
60
61     def __init__(
62         self,
63         test,
64         intf="pg2",
65         active=0,
66         passive=0,
67         timeout=100,
68         mtu=1024,
69         datapath="l2",
70         layer="l2 l3 l4",
71         direction="tx",
72     ):
73         self._test = test
74         self._intf = intf
75         self._intf_obj = getattr(self._test, intf)
76         self._active = active
77         if passive == 0 or passive < active:
78             self._passive = active + 1
79         else:
80             self._passive = passive
81         self._datapath = datapath  # l2 ip4 ip6
82         self._collect = layer  # l2 l3 l4
83         self._direction = direction  # rx tx both
84         self._timeout = timeout
85         self._mtu = mtu
86         self._configured = False
87
88     def add_vpp_config(self):
89         self.enable_exporter()
90         l2_flag = 0
91         l3_flag = 0
92         l4_flag = 0
93         if "l2" in self._collect.lower():
94             l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
95         if "l3" in self._collect.lower():
96             l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
97         if "l4" in self._collect.lower():
98             l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
99         self._test.vapi.flowprobe_set_params(
100             record_flags=(l2_flag | l3_flag | l4_flag),
101             active_timer=self._active,
102             passive_timer=self._passive,
103         )
104         self.enable_flowprobe_feature()
105         self._test.vapi.cli("ipfix flush")
106         self._configured = True
107
108     def remove_vpp_config(self):
109         self.disable_exporter()
110         self.disable_flowprobe_feature()
111         self._test.vapi.cli("ipfix flush")
112         self._configured = False
113
114     def enable_exporter(self):
115         self._test.vapi.set_ipfix_exporter(
116             collector_address=self._test.pg0.remote_ip4,
117             src_address=self._test.pg0.local_ip4,
118             path_mtu=self._mtu,
119             template_interval=self._timeout,
120         )
121
122     def _enable_disable_flowprobe_feature(self, is_add):
123         which_map = {
124             "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
125             "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
126             "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
127         }
128         direction_map = {
129             "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
130             "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
131             "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
132         }
133         self._test.vapi.flowprobe_interface_add_del(
134             is_add=is_add,
135             which=which_map[self._datapath],
136             direction=direction_map[self._direction],
137             sw_if_index=self._intf_obj.sw_if_index,
138         )
139
140     def enable_flowprobe_feature(self):
141         self._enable_disable_flowprobe_feature(is_add=True)
142
143     def disable_exporter(self):
144         self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
145
146     def disable_flowprobe_feature(self):
147         self._enable_disable_flowprobe_feature(is_add=False)
148
149     def object_id(self):
150         return "ipfix-collector-%s-%s" % (self._src, self.dst)
151
152     def query_vpp_config(self):
153         return self._configured
154
155     def verify_templates(self, decoder=None, timeout=1, count=3, field_count_in=None):
156         templates = []
157         self._test.assertIn(count, (1, 2, 3))
158         for _ in range(count):
159             p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
160             self._test.assertTrue(p.haslayer(IPFIX))
161             self._test.assertTrue(p.haslayer(Template))
162             if decoder is not None:
163                 templates.append(p[Template].templateID)
164                 decoder.add_template(p.getlayer(Template))
165             if field_count_in is not None:
166                 self._test.assertIn(p[Template].fieldCount, field_count_in)
167         return templates
168
169
170 class MethodHolder(VppTestCase):
171     """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
172
173     # Test variables
174     debug_print = False
175     max_number_of_packets = 10
176     pkts = []
177
178     @classmethod
179     def setUpClass(cls):
180         """
181         Perform standard class setup (defined by class method setUpClass in
182         class VppTestCase) before running the test case, set test case related
183         variables and configure VPP.
184         """
185         super(MethodHolder, cls).setUpClass()
186         if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
187             cls, "vpp"
188         ):
189             return
190         try:
191             # Create pg interfaces
192             cls.create_pg_interfaces(range(9))
193
194             # Packet sizes
195             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
196
197             # Create BD with MAC learning and unknown unicast flooding disabled
198             # and put interfaces to this BD
199             cls.vapi.bridge_domain_add_del_v2(
200                 bd_id=1, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
201             )
202             cls.vapi.sw_interface_set_l2_bridge(
203                 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
204             )
205             cls.vapi.sw_interface_set_l2_bridge(
206                 rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
207             )
208
209             # Set up all interfaces
210             for i in cls.pg_interfaces:
211                 i.admin_up()
212
213             cls.pg0.config_ip4()
214             cls.pg0.configure_ipv4_neighbors()
215             cls.collector = cls.pg0
216
217             cls.pg1.config_ip4()
218             cls.pg1.resolve_arp()
219             cls.pg2.config_ip4()
220             cls.pg2.resolve_arp()
221             cls.pg3.config_ip4()
222             cls.pg3.resolve_arp()
223             cls.pg4.config_ip4()
224             cls.pg4.resolve_arp()
225             cls.pg7.config_ip4()
226             cls.pg8.config_ip4()
227             cls.pg8.configure_ipv4_neighbors()
228
229             cls.pg5.config_ip6()
230             cls.pg5.resolve_ndp()
231             cls.pg5.disable_ipv6_ra()
232             cls.pg6.config_ip6()
233             cls.pg6.resolve_ndp()
234             cls.pg6.disable_ipv6_ra()
235         except Exception:
236             super(MethodHolder, cls).tearDownClass()
237             raise
238
239     @classmethod
240     def tearDownClass(cls):
241         super(MethodHolder, cls).tearDownClass()
242
243     def create_stream(
244         self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
245     ):
246         """Create a packet stream to tickle the plugin
247
248         :param VppInterface src_if: Source interface for packet stream
249         :param VppInterface src_if: Dst interface for packet stream
250         """
251         if src_if is None:
252             src_if = self.pg1
253         if dst_if is None:
254             dst_if = self.pg2
255         self.pkts = []
256         if packets is None:
257             packets = random.randint(1, self.max_number_of_packets)
258         pkt_size = size
259         for p in range(0, packets):
260             if size is None:
261                 pkt_size = random.choice(self.pg_if_packet_sizes)
262             info = self.create_packet_info(src_if, dst_if)
263             payload = self.info_to_payload(info)
264             p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
265             if ip_ver == "v4":
266                 p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
267             else:
268                 p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
269             p /= UDP(sport=1234, dport=4321)
270             p /= Raw(payload)
271             info.data = p.copy()
272             self.extend_packet(p, pkt_size)
273             self.pkts.append(p)
274
275     def verify_cflow_data(self, decoder, capture, cflow):
276         octets = 0
277         packets = 0
278         for p in capture:
279             octets += p[IP].len
280             packets += 1
281         if cflow.haslayer(Data):
282             data = decoder.decode_data_set(cflow.getlayer(Set))
283             for record in data:
284                 self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
285                 self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
286
287     def send_packets(self, src_if=None, dst_if=None):
288         if src_if is None:
289             src_if = self.pg1
290         if dst_if is None:
291             dst_if = self.pg2
292         self.pg_enable_capture([dst_if])
293         src_if.add_stream(self.pkts)
294         self.pg_start()
295         return dst_if.get_capture(len(self.pkts))
296
297     def verify_cflow_data_detail(
298         self,
299         decoder,
300         capture,
301         cflow,
302         data_set={1: "octets", 2: "packets"},
303         ip_ver="v4",
304         field_count=None,
305     ):
306         if self.debug_print:
307             print(capture[0].show())
308         if cflow.haslayer(Data):
309             data = decoder.decode_data_set(cflow.getlayer(Set))
310             if self.debug_print:
311                 print(data)
312             if ip_ver == "v4":
313                 ip_layer = capture[0][IP] if capture[0].haslayer(IP) else None
314             else:
315                 ip_layer = capture[0][IPv6] if capture[0].haslayer(IPv6) else None
316             if data_set is not None:
317                 for record in data:
318                     # skip flow if ingress/egress interface is 0
319                     if int(binascii.hexlify(record[10]), 16) == 0:
320                         continue
321                     if int(binascii.hexlify(record[14]), 16) == 0:
322                         continue
323
324                     for field in data_set:
325                         value = data_set[field]
326                         if value == "octets":
327                             value = ip_layer.len
328                             if ip_ver == "v6":
329                                 value += 40  # ??? is this correct
330                         elif value == "packets":
331                             value = 1
332                         elif value == "src_ip":
333                             if ip_ver == "v4":
334                                 ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
335                             else:
336                                 ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
337                             value = int(binascii.hexlify(ip), 16)
338                         elif value == "dst_ip":
339                             if ip_ver == "v4":
340                                 ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
341                             else:
342                                 ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
343                             value = int(binascii.hexlify(ip), 16)
344                         elif value == "sport":
345                             value = int(capture[0][UDP].sport)
346                         elif value == "dport":
347                             value = int(capture[0][UDP].dport)
348                         self.assertEqual(
349                             int(binascii.hexlify(record[field]), 16), value
350                         )
351             if field_count is not None:
352                 for record in data:
353                     self.assertEqual(len(record), field_count)
354
355     def verify_cflow_data_notimer(self, decoder, capture, cflows):
356         idx = 0
357         for cflow in cflows:
358             if cflow.haslayer(Data):
359                 data = decoder.decode_data_set(cflow.getlayer(Set))
360             else:
361                 raise Exception("No CFLOW data")
362
363             for rec in data:
364                 p = capture[idx]
365                 idx += 1
366                 self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
367                 self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
368         self.assertEqual(len(capture), idx)
369
370     def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
371         """wait for CFLOW packet and verify its correctness
372
373         :param timeout: how long to wait
374
375         """
376         self.logger.info("IPFIX: Waiting for CFLOW packet")
377         # self.logger.debug(self.vapi.ppcli("show flow table"))
378         p = collector_intf.wait_for_packet(timeout=timeout)
379         self.assertEqual(p[Set].setID, set_id)
380         # self.logger.debug(self.vapi.ppcli("show flow table"))
381         self.logger.debug(ppp("IPFIX: Got packet:", p))
382         return p
383
384
385 @tag_run_solo
386 @tag_fixme_vpp_workers
387 @tag_fixme_ubuntu2204
388 @tag_fixme_debian11
389 class Flowprobe(MethodHolder):
390     """Template verification, timer tests"""
391
392     @classmethod
393     def setUpClass(cls):
394         super(Flowprobe, cls).setUpClass()
395
396     @classmethod
397     def tearDownClass(cls):
398         super(Flowprobe, cls).tearDownClass()
399
400     def test_0001(self):
401         """timer less than template timeout"""
402         self.logger.info("FFP_TEST_START_0001")
403         self.pg_enable_capture(self.pg_interfaces)
404         self.pkts = []
405
406         ipfix = VppCFLOW(test=self, active=2)
407         ipfix.add_vpp_config()
408
409         ipfix_decoder = IPFIXDecoder()
410         # template packet should arrive immediately
411         templates = ipfix.verify_templates(ipfix_decoder)
412
413         self.create_stream(packets=1)
414         self.send_packets()
415         capture = self.pg2.get_capture(1)
416
417         # make sure the one packet we expect actually showed up
418         cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
419         self.verify_cflow_data(ipfix_decoder, capture, cflow)
420
421         ipfix.remove_vpp_config()
422         self.logger.info("FFP_TEST_FINISH_0001")
423
424     @unittest.skipUnless(
425         config.extended, "Test is unstable (assertion error, needs to be fixed"
426     )
427     def test_0002(self):
428         """timer greater than template timeout [UNSTABLE, FIX ME]"""
429         self.logger.info("FFP_TEST_START_0002")
430         self.pg_enable_capture(self.pg_interfaces)
431         self.pkts = []
432
433         ipfix = VppCFLOW(test=self, timeout=3, active=4)
434         ipfix.add_vpp_config()
435
436         ipfix_decoder = IPFIXDecoder()
437         # template packet should arrive immediately
438         ipfix.verify_templates()
439
440         self.create_stream(packets=2)
441         self.send_packets()
442         capture = self.pg2.get_capture(2)
443
444         # next set of template packet should arrive after 20 seconds
445         # template packet should arrive within 20 s
446         templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
447
448         # make sure the one packet we expect actually showed up
449         cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
450         self.verify_cflow_data(ipfix_decoder, capture, cflow)
451
452         ipfix.remove_vpp_config()
453         self.logger.info("FFP_TEST_FINISH_0002")
454
455     def test_cflow_packet(self):
456         """verify cflow packet fields"""
457         self.logger.info("FFP_TEST_START_0000")
458         self.pg_enable_capture(self.pg_interfaces)
459         self.pkts = []
460
461         ipfix = VppCFLOW(
462             test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
463         )
464         ipfix.add_vpp_config()
465
466         route_9001 = VppIpRoute(
467             self,
468             "9.0.0.0",
469             24,
470             [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
471         )
472         route_9001.add_vpp_config()
473
474         ipfix_decoder = IPFIXDecoder()
475         templates = ipfix.verify_templates(ipfix_decoder, count=1)
476
477         self.pkts = [
478             (
479                 Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
480                 / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
481                 / TCP(sport=1234, dport=4321, flags=80)
482                 / Raw(b"\xa5" * 100)
483             )
484         ]
485
486         nowUTC = int(time.time())
487         nowUNIX = nowUTC + 2208988800
488         self.send_packets(src_if=self.pg7, dst_if=self.pg8)
489
490         cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
491         self.collector.get_capture(2)
492
493         if cflow[0].haslayer(IPFIX):
494             self.assertEqual(cflow[IPFIX].version, 10)
495             self.assertEqual(cflow[IPFIX].observationDomainID, 1)
496             self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
497             self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
498         if cflow.haslayer(Data):
499             record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
500             # ingress interface
501             self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
502             # egress interface
503             self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
504             # direction
505             self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
506             # packets
507             self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
508             # src mac
509             self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
510             # dst mac
511             self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
512             flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
513             # flow start timestamp
514             self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
515             flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
516             # flow end timestamp
517             self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
518             # ethernet type
519             self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
520             # src ip
521             self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
522             # dst ip
523             self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
524             # protocol (TCP)
525             self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
526             # src port
527             self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
528             # dst port
529             self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
530             # tcp flags
531             self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
532
533         ipfix.remove_vpp_config()
534         self.logger.info("FFP_TEST_FINISH_0000")
535
536     def test_flow_entry_reuse(self):
537         """Verify flow entry reuse doesn't accumulate meta info"""
538         self.pg_enable_capture(self.pg_interfaces)
539         self.pkts = []
540
541         # enable ip4 datapath for an interface
542         # set active and passive timers
543         ipfix = VppCFLOW(
544             test=self,
545             active=2,
546             passive=3,
547             intf="pg3",
548             layer="l3 l4",
549             datapath="ip4",
550             direction="rx",
551             mtu=100,
552         )
553         ipfix.add_vpp_config()
554
555         # template packet should arrive immediately
556         ipfix_decoder = IPFIXDecoder()
557         templates = ipfix.verify_templates(ipfix_decoder, count=1)
558
559         # make a tcp packet
560         self.pkts = [
561             (
562                 Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac)
563                 / IP(src=self.pg3.remote_ip4, dst=self.pg4.remote_ip4)
564                 / TCP(sport=1234, dport=4321)
565                 / Raw(b"\xa5" * 50)
566             )
567         ]
568
569         # send the tcp packet two times, each time with new set of flags
570         tcp_flags = (
571             TCP_F_SYN | TCP_F_ACK,
572             TCP_F_RST | TCP_F_PSH,
573         )
574         for f in tcp_flags:
575             self.pkts[0][TCP].flags = f
576             capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
577
578             # verify meta info - packet/octet delta and tcp flags
579             cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=6)
580             self.verify_cflow_data(ipfix_decoder, capture, cflow)
581             self.verify_cflow_data_detail(
582                 ipfix_decoder,
583                 capture,
584                 cflow,
585                 {
586                     IPFIX_TCP_FLAGS_ID: f,
587                     IPFIX_SRC_TRANS_PORT_ID: 1234,
588                     IPFIX_DST_TRANS_PORT_ID: 4321,
589                 },
590             )
591
592         self.collector.get_capture(3)
593
594         # cleanup
595         ipfix.remove_vpp_config()
596
597     def test_interface_dump(self):
598         """Dump interfaces with IPFIX flow record generation enabled"""
599         self.logger.info("FFP_TEST_START_0003")
600
601         # Enable feature for 3 interfaces
602         ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx")
603         ipfix1.add_vpp_config()
604
605         ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx")
606         ipfix2.enable_flowprobe_feature()
607
608         ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both")
609         ipfix3.enable_flowprobe_feature()
610
611         # When request "all", dump should contain all enabled interfaces
612         dump = self.vapi.flowprobe_interface_dump()
613         self.assertEqual(len(dump), 3)
614
615         # Verify 1st interface
616         self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index)
617         self.assertEqual(
618             dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2
619         )
620         self.assertEqual(
621             dump[0].direction,
622             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
623         )
624
625         # Verify 2nd interface
626         self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index)
627         self.assertEqual(
628             dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
629         )
630         self.assertEqual(
631             dump[1].direction,
632             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
633         )
634
635         # Verify 3rd interface
636         self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index)
637         self.assertEqual(
638             dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6
639         )
640         self.assertEqual(
641             dump[2].direction,
642             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
643         )
644
645         # When request 2nd interface, dump should contain only the specified interface
646         dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index)
647         self.assertEqual(len(dump), 1)
648
649         # Verify 2nd interface
650         self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index)
651         self.assertEqual(
652             dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
653         )
654         self.assertEqual(
655             dump[0].direction,
656             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
657         )
658
659         # When request 99th interface, dump should be empty
660         dump = self.vapi.flowprobe_interface_dump(sw_if_index=99)
661         self.assertEqual(len(dump), 0)
662
663         ipfix1.remove_vpp_config()
664         ipfix2.remove_vpp_config()
665         ipfix3.remove_vpp_config()
666         self.logger.info("FFP_TEST_FINISH_0003")
667
668     def test_get_params(self):
669         """Get IPFIX flow record generation parameters"""
670         self.logger.info("FFP_TEST_START_0004")
671
672         # Enable feature for an interface with custom parameters
673         ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4")
674         ipfix.add_vpp_config()
675
676         # Get and verify parameters
677         params = self.vapi.flowprobe_get_params()
678         self.assertEqual(params.active_timer, 20)
679         self.assertEqual(params.passive_timer, 40)
680         record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
681         record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
682         record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
683         self.assertEqual(params.record_flags, record_flags)
684
685         ipfix.remove_vpp_config()
686         self.logger.info("FFP_TEST_FINISH_0004")
687
688
689 class DatapathTestsHolder(object):
690     """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
691
692     @classmethod
693     def setUpClass(cls):
694         super(DatapathTestsHolder, cls).setUpClass()
695
696     @classmethod
697     def tearDownClass(cls):
698         super(DatapathTestsHolder, cls).tearDownClass()
699
700     def test_templatesL2(self):
701         """verify template on L2 datapath"""
702         self.logger.info("FFP_TEST_START_0000")
703         self.pg_enable_capture(self.pg_interfaces)
704
705         ipfix = VppCFLOW(
706             test=self, intf=self.intf1, layer="l2", direction=self.direction
707         )
708         ipfix.add_vpp_config()
709
710         # template packet should arrive immediately
711         self.vapi.ipfix_flush()
712         ipfix.verify_templates(timeout=3, count=1)
713         self.collector.get_capture(1)
714
715         ipfix.remove_vpp_config()
716         self.logger.info("FFP_TEST_FINISH_0000")
717
718     def test_L2onL2(self):
719         """L2 data on L2 datapath"""
720         self.logger.info("FFP_TEST_START_0001")
721         self.pg_enable_capture(self.pg_interfaces)
722         self.pkts = []
723
724         ipfix = VppCFLOW(
725             test=self, intf=self.intf1, layer="l2", direction=self.direction
726         )
727         ipfix.add_vpp_config()
728
729         ipfix_decoder = IPFIXDecoder()
730         # template packet should arrive immediately
731         templates = ipfix.verify_templates(ipfix_decoder, count=1)
732
733         self.create_stream(packets=1)
734         capture = self.send_packets()
735
736         # make sure the one packet we expect actually showed up
737         self.vapi.ipfix_flush()
738         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
739         self.verify_cflow_data_detail(
740             ipfix_decoder,
741             capture,
742             cflow,
743             {2: "packets", 256: 8, 61: (self.direction == "tx")},
744         )
745         self.collector.get_capture(2)
746
747         ipfix.remove_vpp_config()
748         self.logger.info("FFP_TEST_FINISH_0001")
749
750     def test_L3onL2(self):
751         """L3 data on L2 datapath"""
752         self.logger.info("FFP_TEST_START_0002")
753         self.pg_enable_capture(self.pg_interfaces)
754         self.pkts = []
755
756         ipfix = VppCFLOW(
757             test=self, intf=self.intf1, layer="l3", direction=self.direction
758         )
759         ipfix.add_vpp_config()
760
761         ipfix_decoder = IPFIXDecoder()
762         # template packet should arrive immediately
763         templates = ipfix.verify_templates(ipfix_decoder, count=2)
764
765         self.create_stream(packets=1)
766         capture = self.send_packets()
767
768         # make sure the one packet we expect actually showed up
769         self.vapi.ipfix_flush()
770         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
771         self.verify_cflow_data_detail(
772             ipfix_decoder,
773             capture,
774             cflow,
775             {
776                 2: "packets",
777                 4: 17,
778                 8: "src_ip",
779                 12: "dst_ip",
780                 61: (self.direction == "tx"),
781             },
782         )
783
784         self.collector.get_capture(3)
785
786         ipfix.remove_vpp_config()
787         self.logger.info("FFP_TEST_FINISH_0002")
788
789     def test_L234onL2(self):
790         """L2/3/4 data on L2 datapath"""
791         self.pg_enable_capture(self.pg_interfaces)
792         self.pkts = []
793
794         ipfix = VppCFLOW(
795             test=self, intf=self.intf1, layer="l2 l3 l4", direction=self.direction
796         )
797         ipfix.add_vpp_config()
798
799         ipfix_decoder = IPFIXDecoder()
800         # template packet should arrive immediately
801         tmpl_l2_field_count = TMPL_COMMON_FIELD_COUNT + TMPL_L2_FIELD_COUNT
802         tmpl_ip_field_count = (
803             TMPL_COMMON_FIELD_COUNT
804             + TMPL_L2_FIELD_COUNT
805             + TMPL_L3_FIELD_COUNT
806             + TMPL_L4_FIELD_COUNT
807         )
808         templates = ipfix.verify_templates(
809             ipfix_decoder,
810             count=3,
811             field_count_in=(tmpl_l2_field_count, tmpl_ip_field_count),
812         )
813
814         # verify IPv4 and IPv6 flows
815         for ip_ver in ("v4", "v6"):
816             self.create_stream(packets=1, ip_ver=ip_ver)
817             capture = self.send_packets()
818
819             # make sure the one packet we expect actually showed up
820             self.vapi.ipfix_flush()
821             cflow = self.wait_for_cflow_packet(
822                 self.collector, templates[1 if ip_ver == "v4" else 2]
823             )
824             src_ip_id = 8 if ip_ver == "v4" else 27
825             dst_ip_id = 12 if ip_ver == "v4" else 28
826             self.verify_cflow_data_detail(
827                 ipfix_decoder,
828                 capture,
829                 cflow,
830                 {
831                     2: "packets",
832                     256: 8 if ip_ver == "v4" else 56710,
833                     4: 17,
834                     7: "sport",
835                     11: "dport",
836                     src_ip_id: "src_ip",
837                     dst_ip_id: "dst_ip",
838                     61: (self.direction == "tx"),
839                 },
840                 ip_ver=ip_ver,
841                 field_count=tmpl_ip_field_count,
842             )
843
844         # verify non-IP flow
845         self.pkts = [
846             (
847                 Ether(dst=self.pg2.local_mac, src=self.pg1.remote_mac)
848                 / SlowProtocol()
849                 / LACP()
850             )
851         ]
852         capture = self.send_packets()
853
854         # make sure the one packet we expect actually showed up
855         self.vapi.ipfix_flush()
856         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
857         self.verify_cflow_data_detail(
858             ipfix_decoder,
859             capture,
860             cflow,
861             {2: "packets", 256: 2440, 61: (self.direction == "tx")},
862             field_count=tmpl_l2_field_count,
863         )
864
865         self.collector.get_capture(6)
866
867         ipfix.remove_vpp_config()
868
869     def test_L4onL2(self):
870         """L4 data on L2 datapath"""
871         self.logger.info("FFP_TEST_START_0003")
872         self.pg_enable_capture(self.pg_interfaces)
873         self.pkts = []
874
875         ipfix = VppCFLOW(
876             test=self, intf=self.intf1, layer="l4", direction=self.direction
877         )
878         ipfix.add_vpp_config()
879
880         ipfix_decoder = IPFIXDecoder()
881         # template packet should arrive immediately
882         templates = ipfix.verify_templates(ipfix_decoder, count=2)
883
884         self.create_stream(packets=1)
885         capture = self.send_packets()
886
887         # make sure the one packet we expect actually showed up
888         self.vapi.ipfix_flush()
889         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
890         self.verify_cflow_data_detail(
891             ipfix_decoder,
892             capture,
893             cflow,
894             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
895         )
896
897         self.collector.get_capture(3)
898
899         ipfix.remove_vpp_config()
900         self.logger.info("FFP_TEST_FINISH_0003")
901
902     def test_templatesIp4(self):
903         """verify templates on IP4 datapath"""
904         self.logger.info("FFP_TEST_START_0000")
905
906         self.pg_enable_capture(self.pg_interfaces)
907
908         ipfix = VppCFLOW(
909             test=self, intf=self.intf1, datapath="ip4", direction=self.direction
910         )
911         ipfix.add_vpp_config()
912
913         # template packet should arrive immediately
914         self.vapi.ipfix_flush()
915         ipfix.verify_templates(timeout=3, count=1)
916         self.collector.get_capture(1)
917
918         ipfix.remove_vpp_config()
919
920         self.logger.info("FFP_TEST_FINISH_0000")
921
922     def test_L2onIP4(self):
923         """L2 data on IP4 datapath"""
924         self.logger.info("FFP_TEST_START_0001")
925         self.pg_enable_capture(self.pg_interfaces)
926         self.pkts = []
927
928         ipfix = VppCFLOW(
929             test=self,
930             intf=self.intf2,
931             layer="l2",
932             datapath="ip4",
933             direction=self.direction,
934         )
935         ipfix.add_vpp_config()
936
937         ipfix_decoder = IPFIXDecoder()
938         # template packet should arrive immediately
939         templates = ipfix.verify_templates(ipfix_decoder, count=1)
940
941         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
942         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
943
944         # make sure the one packet we expect actually showed up
945         self.vapi.ipfix_flush()
946         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
947         self.verify_cflow_data_detail(
948             ipfix_decoder,
949             capture,
950             cflow,
951             {2: "packets", 256: 8, 61: (self.direction == "tx")},
952         )
953
954         # expected two templates and one cflow packet
955         self.collector.get_capture(2)
956
957         ipfix.remove_vpp_config()
958         self.logger.info("FFP_TEST_FINISH_0001")
959
960     def test_L3onIP4(self):
961         """L3 data on IP4 datapath"""
962         self.logger.info("FFP_TEST_START_0002")
963         self.pg_enable_capture(self.pg_interfaces)
964         self.pkts = []
965
966         ipfix = VppCFLOW(
967             test=self,
968             intf=self.intf2,
969             layer="l3",
970             datapath="ip4",
971             direction=self.direction,
972         )
973         ipfix.add_vpp_config()
974
975         ipfix_decoder = IPFIXDecoder()
976         # template packet should arrive immediately
977         templates = ipfix.verify_templates(ipfix_decoder, count=1)
978
979         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
980         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
981
982         # make sure the one packet we expect actually showed up
983         self.vapi.ipfix_flush()
984         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
985         self.verify_cflow_data_detail(
986             ipfix_decoder,
987             capture,
988             cflow,
989             {
990                 1: "octets",
991                 2: "packets",
992                 8: "src_ip",
993                 12: "dst_ip",
994                 61: (self.direction == "tx"),
995             },
996         )
997
998         # expected two templates and one cflow packet
999         self.collector.get_capture(2)
1000
1001         ipfix.remove_vpp_config()
1002         self.logger.info("FFP_TEST_FINISH_0002")
1003
1004     def test_L4onIP4(self):
1005         """L4 data on IP4 datapath"""
1006         self.logger.info("FFP_TEST_START_0003")
1007         self.pg_enable_capture(self.pg_interfaces)
1008         self.pkts = []
1009
1010         ipfix = VppCFLOW(
1011             test=self,
1012             intf=self.intf2,
1013             layer="l4",
1014             datapath="ip4",
1015             direction=self.direction,
1016         )
1017         ipfix.add_vpp_config()
1018
1019         ipfix_decoder = IPFIXDecoder()
1020         # template packet should arrive immediately
1021         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1022
1023         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
1024         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1025
1026         # make sure the one packet we expect actually showed up
1027         self.vapi.ipfix_flush()
1028         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1029         self.verify_cflow_data_detail(
1030             ipfix_decoder,
1031             capture,
1032             cflow,
1033             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
1034         )
1035
1036         # expected two templates and one cflow packet
1037         self.collector.get_capture(2)
1038
1039         ipfix.remove_vpp_config()
1040         self.logger.info("FFP_TEST_FINISH_0003")
1041
1042     def test_templatesIP6(self):
1043         """verify templates on IP6 datapath"""
1044         self.logger.info("FFP_TEST_START_0000")
1045         self.pg_enable_capture(self.pg_interfaces)
1046
1047         ipfix = VppCFLOW(
1048             test=self, intf=self.intf1, datapath="ip6", direction=self.direction
1049         )
1050         ipfix.add_vpp_config()
1051
1052         # template packet should arrive immediately
1053         ipfix.verify_templates(count=1)
1054         self.collector.get_capture(1)
1055
1056         ipfix.remove_vpp_config()
1057
1058         self.logger.info("FFP_TEST_FINISH_0000")
1059
1060     def test_L2onIP6(self):
1061         """L2 data on IP6 datapath"""
1062         self.logger.info("FFP_TEST_START_0001")
1063         self.pg_enable_capture(self.pg_interfaces)
1064         self.pkts = []
1065
1066         ipfix = VppCFLOW(
1067             test=self,
1068             intf=self.intf3,
1069             layer="l2",
1070             datapath="ip6",
1071             direction=self.direction,
1072         )
1073         ipfix.add_vpp_config()
1074
1075         ipfix_decoder = IPFIXDecoder()
1076         # template packet should arrive immediately
1077         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1078
1079         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1080         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1081
1082         # make sure the one packet we expect actually showed up
1083         self.vapi.ipfix_flush()
1084         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1085         self.verify_cflow_data_detail(
1086             ipfix_decoder,
1087             capture,
1088             cflow,
1089             {2: "packets", 256: 56710, 61: (self.direction == "tx")},
1090             ip_ver="v6",
1091         )
1092
1093         # expected two templates and one cflow packet
1094         self.collector.get_capture(2)
1095
1096         ipfix.remove_vpp_config()
1097         self.logger.info("FFP_TEST_FINISH_0001")
1098
1099     def test_L3onIP6(self):
1100         """L3 data on IP6 datapath"""
1101         self.logger.info("FFP_TEST_START_0002")
1102         self.pg_enable_capture(self.pg_interfaces)
1103         self.pkts = []
1104
1105         ipfix = VppCFLOW(
1106             test=self,
1107             intf=self.intf3,
1108             layer="l3",
1109             datapath="ip6",
1110             direction=self.direction,
1111         )
1112         ipfix.add_vpp_config()
1113
1114         ipfix_decoder = IPFIXDecoder()
1115         # template packet should arrive immediately
1116         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1117
1118         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1119         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1120
1121         # make sure the one packet we expect actually showed up
1122         self.vapi.ipfix_flush()
1123         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1124         self.verify_cflow_data_detail(
1125             ipfix_decoder,
1126             capture,
1127             cflow,
1128             {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
1129             ip_ver="v6",
1130         )
1131
1132         # expected two templates and one cflow packet
1133         self.collector.get_capture(2)
1134
1135         ipfix.remove_vpp_config()
1136         self.logger.info("FFP_TEST_FINISH_0002")
1137
1138     def test_L4onIP6(self):
1139         """L4 data on IP6 datapath"""
1140         self.logger.info("FFP_TEST_START_0003")
1141         self.pg_enable_capture(self.pg_interfaces)
1142         self.pkts = []
1143
1144         ipfix = VppCFLOW(
1145             test=self,
1146             intf=self.intf3,
1147             layer="l4",
1148             datapath="ip6",
1149             direction=self.direction,
1150         )
1151         ipfix.add_vpp_config()
1152
1153         ipfix_decoder = IPFIXDecoder()
1154         # template packet should arrive immediately
1155         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1156
1157         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1158         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1159
1160         # make sure the one packet we expect actually showed up
1161         self.vapi.ipfix_flush()
1162         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1163         self.verify_cflow_data_detail(
1164             ipfix_decoder,
1165             capture,
1166             cflow,
1167             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
1168             ip_ver="v6",
1169         )
1170
1171         # expected two templates and one cflow packet
1172         self.collector.get_capture(2)
1173
1174         ipfix.remove_vpp_config()
1175         self.logger.info("FFP_TEST_FINISH_0003")
1176
1177     def test_0001(self):
1178         """no timers, one CFLOW packet, 9 Flows inside"""
1179         self.logger.info("FFP_TEST_START_0001")
1180         self.pg_enable_capture(self.pg_interfaces)
1181         self.pkts = []
1182
1183         ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
1184         ipfix.add_vpp_config()
1185
1186         ipfix_decoder = IPFIXDecoder()
1187         # template packet should arrive immediately
1188         templates = ipfix.verify_templates(ipfix_decoder)
1189
1190         self.create_stream(packets=9)
1191         capture = self.send_packets()
1192
1193         # make sure the one packet we expect actually showed up
1194         self.vapi.ipfix_flush()
1195         cflow = self.wait_for_cflow_packet(self.collector, templates[1])
1196         self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
1197         self.collector.get_capture(4)
1198
1199         ipfix.remove_vpp_config()
1200         self.logger.info("FFP_TEST_FINISH_0001")
1201
1202     def test_0002(self):
1203         """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
1204         self.logger.info("FFP_TEST_START_0002")
1205         self.pg_enable_capture(self.pg_interfaces)
1206         self.pkts = []
1207
1208         ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
1209         ipfix.add_vpp_config()
1210
1211         ipfix_decoder = IPFIXDecoder()
1212         # template packet should arrive immediately
1213         self.vapi.ipfix_flush()
1214         templates = ipfix.verify_templates(ipfix_decoder)
1215
1216         self.create_stream(packets=6)
1217         capture = self.send_packets()
1218
1219         # make sure the one packet we expect actually showed up
1220         cflows = []
1221         self.vapi.ipfix_flush()
1222         cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1223         cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1224         self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
1225         self.collector.get_capture(5)
1226
1227         ipfix.remove_vpp_config()
1228         self.logger.info("FFP_TEST_FINISH_0002")
1229
1230
1231 class DatapathTx(MethodHolder, DatapathTestsHolder):
1232     """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
1233
1234     intf1 = "pg2"
1235     intf2 = "pg4"
1236     intf3 = "pg6"
1237     direction = "tx"
1238
1239     def test_rewritten_traffic(self):
1240         """Rewritten traffic (from subif to ipfix if)"""
1241         self.pg_enable_capture(self.pg_interfaces)
1242         self.pkts = []
1243
1244         # prepare a sub-interface
1245         subif = VppDot1ADSubint(self, self.pg7, 0, 300, 400)
1246         subif.admin_up()
1247         subif.config_ip4()
1248
1249         # enable ip4 datapath for an interface
1250         ipfix = VppCFLOW(
1251             test=self,
1252             intf="pg8",
1253             datapath="ip4",
1254             layer="l2 l3 l4",
1255             direction=self.direction,
1256         )
1257         ipfix.add_vpp_config()
1258
1259         # template packet should arrive immediately
1260         ipfix_decoder = IPFIXDecoder()
1261         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1262
1263         # forward some traffic through the ipfix interface
1264         route = VppIpRoute(
1265             self,
1266             "9.0.0.0",
1267             24,
1268             [VppRoutePath(self.pg8.remote_ip4, self.pg8.sw_if_index)],
1269         )
1270         route.add_vpp_config()
1271
1272         # prepare an IPv4 packet (subif => ipfix interface)
1273         pkt = (
1274             Ether(src=subif.remote_mac, dst=self.pg7.local_mac)
1275             / IP(src=subif.remote_ip4, dst="9.0.0.1")
1276             / UDP(sport=1234, dport=4321)
1277             / Raw(b"\xa5" * 123)
1278         )
1279         self.pkts = [
1280             subif.add_dot1ad_layer(pkt, 300, 400),
1281         ]
1282
1283         # send the packet
1284         capture = self.send_packets(self.pg7, self.pg8)
1285
1286         # wait for a flow and verify it
1287         self.vapi.ipfix_flush()
1288         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1289         self.verify_cflow_data(ipfix_decoder, capture, cflow)
1290         self.verify_cflow_data_detail(
1291             ipfix_decoder,
1292             capture,
1293             cflow,
1294             {
1295                 IPFIX_SRC_IP4_ADDR_ID: "src_ip",
1296                 IPFIX_DST_IP4_ADDR_ID: "dst_ip",
1297                 IPFIX_SRC_TRANS_PORT_ID: "sport",
1298                 IPFIX_DST_TRANS_PORT_ID: "dport",
1299                 IPFIX_FLOW_DIRECTION_ID: (self.direction == "tx"),
1300             },
1301         )
1302
1303         self.collector.get_capture(2)
1304
1305         # cleanup
1306         route.remove_vpp_config()
1307         subif.remove_vpp_config()
1308         ipfix.remove_vpp_config()
1309
1310
1311 class DatapathRx(MethodHolder, DatapathTestsHolder):
1312     """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
1313
1314     intf1 = "pg1"
1315     intf2 = "pg3"
1316     intf3 = "pg5"
1317     direction = "rx"
1318
1319
1320 @unittest.skipUnless(config.extended, "part of extended tests")
1321 class DisableIPFIX(MethodHolder):
1322     """Disable IPFIX"""
1323
1324     @classmethod
1325     def setUpClass(cls):
1326         super(DisableIPFIX, cls).setUpClass()
1327
1328     @classmethod
1329     def tearDownClass(cls):
1330         super(DisableIPFIX, cls).tearDownClass()
1331
1332     def test_0001(self):
1333         """disable IPFIX after first packets"""
1334         self.logger.info("FFP_TEST_START_0001")
1335         self.pg_enable_capture(self.pg_interfaces)
1336         self.pkts = []
1337
1338         ipfix = VppCFLOW(test=self)
1339         ipfix.add_vpp_config()
1340
1341         ipfix_decoder = IPFIXDecoder()
1342         # template packet should arrive immediately
1343         templates = ipfix.verify_templates(ipfix_decoder)
1344
1345         self.create_stream()
1346         self.send_packets()
1347
1348         # make sure the one packet we expect actually showed up
1349         self.vapi.ipfix_flush()
1350         self.wait_for_cflow_packet(self.collector, templates[1])
1351         self.collector.get_capture(4)
1352
1353         # disable IPFIX
1354         ipfix.disable_exporter()
1355         self.pg_enable_capture([self.collector])
1356
1357         self.send_packets()
1358
1359         # make sure no one packet arrived in 1 minute
1360         self.vapi.ipfix_flush()
1361         self.sleep(1, "wait before verifying no packets sent")
1362         self.collector.assert_nothing_captured()
1363
1364         ipfix.remove_vpp_config()
1365         self.logger.info("FFP_TEST_FINISH_0001")
1366
1367
1368 @unittest.skipUnless(config.extended, "part of extended tests")
1369 class ReenableIPFIX(MethodHolder):
1370     """Re-enable IPFIX"""
1371
1372     @classmethod
1373     def setUpClass(cls):
1374         super(ReenableIPFIX, cls).setUpClass()
1375
1376     @classmethod
1377     def tearDownClass(cls):
1378         super(ReenableIPFIX, cls).tearDownClass()
1379
1380     def test_0011(self):
1381         """disable IPFIX after first packets and re-enable after few packets"""
1382         self.logger.info("FFP_TEST_START_0001")
1383         self.pg_enable_capture(self.pg_interfaces)
1384         self.pkts = []
1385
1386         ipfix = VppCFLOW(test=self)
1387         ipfix.add_vpp_config()
1388
1389         ipfix_decoder = IPFIXDecoder()
1390         # template packet should arrive immediately
1391         templates = ipfix.verify_templates(ipfix_decoder)
1392
1393         self.create_stream(packets=5)
1394         self.send_packets()
1395
1396         # make sure the one packet we expect actually showed up
1397         self.vapi.ipfix_flush()
1398         self.wait_for_cflow_packet(self.collector, templates[1])
1399         self.collector.get_capture(4)
1400
1401         # disable IPFIX
1402         ipfix.disable_exporter()
1403         self.vapi.ipfix_flush()
1404         self.pg_enable_capture([self.collector])
1405
1406         self.send_packets()
1407
1408         # make sure no one packet arrived in active timer span
1409         self.vapi.ipfix_flush()
1410         self.sleep(1, "wait before verifying no packets sent")
1411         self.collector.assert_nothing_captured()
1412         self.pg2.get_capture(5)
1413
1414         # enable IPFIX
1415         ipfix.enable_exporter()
1416
1417         capture = self.collector.get_capture(4)
1418         nr_templates = 0
1419         nr_data = 0
1420         for p in capture:
1421             self.assertTrue(p.haslayer(IPFIX))
1422             if p.haslayer(Template):
1423                 nr_templates += 1
1424         self.assertTrue(nr_templates, 3)
1425         for p in capture:
1426             self.assertTrue(p.haslayer(IPFIX))
1427             if p.haslayer(Data):
1428                 nr_data += 1
1429         self.assertTrue(nr_templates, 1)
1430
1431         ipfix.remove_vpp_config()
1432         self.logger.info("FFP_TEST_FINISH_0001")
1433
1434
1435 @unittest.skipUnless(config.extended, "part of extended tests")
1436 class DisableFP(MethodHolder):
1437     """Disable Flowprobe feature"""
1438
1439     @classmethod
1440     def setUpClass(cls):
1441         super(DisableFP, cls).setUpClass()
1442
1443     @classmethod
1444     def tearDownClass(cls):
1445         super(DisableFP, cls).tearDownClass()
1446
1447     def test_0001(self):
1448         """disable flowprobe feature after first packets"""
1449         self.logger.info("FFP_TEST_START_0001")
1450         self.pg_enable_capture(self.pg_interfaces)
1451         self.pkts = []
1452         ipfix = VppCFLOW(test=self)
1453         ipfix.add_vpp_config()
1454
1455         ipfix_decoder = IPFIXDecoder()
1456         # template packet should arrive immediately
1457         templates = ipfix.verify_templates(ipfix_decoder)
1458
1459         self.create_stream()
1460         self.send_packets()
1461
1462         # make sure the one packet we expect actually showed up
1463         self.vapi.ipfix_flush()
1464         self.wait_for_cflow_packet(self.collector, templates[1])
1465         self.collector.get_capture(4)
1466
1467         # disable IPFIX
1468         ipfix.disable_flowprobe_feature()
1469         self.pg_enable_capture([self.collector])
1470
1471         self.send_packets()
1472
1473         # make sure no one packet arrived in active timer span
1474         self.vapi.ipfix_flush()
1475         self.sleep(1, "wait before verifying no packets sent")
1476         self.collector.assert_nothing_captured()
1477
1478         # enable FPP feature so the remove_vpp_config() doesn't fail
1479         # due to missing feature on interface.
1480         ipfix.enable_flowprobe_feature()
1481
1482         ipfix.remove_vpp_config()
1483         self.logger.info("FFP_TEST_FINISH_0001")
1484
1485     def test_no_leftover_flows_after_disabling(self):
1486         """disable flowprobe feature and expect no leftover flows"""
1487         self.pg_enable_capture(self.pg_interfaces)
1488         self.pkts = []
1489
1490         # enable ip4 datapath for an interface
1491         # set active and passive timers
1492         ipfix = VppCFLOW(
1493             test=self,
1494             active=3,
1495             passive=4,
1496             intf="pg3",
1497             layer="l3",
1498             datapath="ip4",
1499             direction="rx",
1500             mtu=100,
1501         )
1502         ipfix.add_vpp_config()
1503
1504         # template packet should arrive immediately
1505         ipfix.verify_templates(count=1)
1506
1507         # send some ip4 packets
1508         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5)
1509         self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1510
1511         # disable feature for the interface
1512         # currently stored ip4 flows should be removed
1513         ipfix.disable_flowprobe_feature()
1514
1515         # no leftover ip4 flows are expected
1516         self.pg_enable_capture([self.collector])
1517         self.sleep(12, "wait for leftover ip4 flows during three passive intervals")
1518         self.collector.assert_nothing_captured()
1519
1520         # re-enable feature for the interface
1521         ipfix.enable_flowprobe_feature()
1522
1523         # template packet should arrive immediately
1524         ipfix_decoder = IPFIXDecoder()
1525         self.vapi.ipfix_flush()
1526         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1527
1528         # send some ip4 packets
1529         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5)
1530         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1531
1532         # verify meta info - packet/octet delta
1533         self.vapi.ipfix_flush()
1534         cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=8)
1535         self.verify_cflow_data(ipfix_decoder, capture, cflow)
1536
1537         self.collector.get_capture(2)
1538
1539         # cleanup
1540         ipfix.remove_vpp_config()
1541
1542
1543 @unittest.skipUnless(config.extended, "part of extended tests")
1544 class ReenableFP(MethodHolder):
1545     """Re-enable Flowprobe feature"""
1546
1547     @classmethod
1548     def setUpClass(cls):
1549         super(ReenableFP, cls).setUpClass()
1550
1551     @classmethod
1552     def tearDownClass(cls):
1553         super(ReenableFP, cls).tearDownClass()
1554
1555     def test_0001(self):
1556         """disable flowprobe feature after first packets and re-enable
1557         after few packets"""
1558         self.logger.info("FFP_TEST_START_0001")
1559         self.pg_enable_capture(self.pg_interfaces)
1560         self.pkts = []
1561
1562         ipfix = VppCFLOW(test=self)
1563         ipfix.add_vpp_config()
1564
1565         ipfix_decoder = IPFIXDecoder()
1566         # template packet should arrive immediately
1567         self.vapi.ipfix_flush()
1568         templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1569
1570         self.create_stream()
1571         self.send_packets()
1572
1573         # make sure the one packet we expect actually showed up
1574         self.vapi.ipfix_flush()
1575         self.wait_for_cflow_packet(self.collector, templates[1], 5)
1576         self.collector.get_capture(4)
1577
1578         # disable FPP feature
1579         ipfix.disable_flowprobe_feature()
1580         self.pg_enable_capture([self.collector])
1581
1582         self.send_packets()
1583
1584         # make sure no one packet arrived in active timer span
1585         self.vapi.ipfix_flush()
1586         self.sleep(5, "wait before verifying no packets sent")
1587         self.collector.assert_nothing_captured()
1588
1589         # enable FPP feature
1590         ipfix.enable_flowprobe_feature()
1591         self.vapi.ipfix_flush()
1592         templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1593
1594         self.send_packets()
1595
1596         # make sure the next packets (templates and data) we expect actually
1597         # showed up
1598         self.vapi.ipfix_flush()
1599         self.wait_for_cflow_packet(self.collector, templates[1], 5)
1600         self.collector.get_capture(4)
1601
1602         ipfix.remove_vpp_config()
1603         self.logger.info("FFP_TEST_FINISH_0001")
1604
1605
1606 if __name__ == "__main__":
1607     unittest.main(testRunner=VppTestRunner)