flowprobe: add api messages to obtain current state
[vpp.git] / test / test_flowprobe.py
1 #!/usr/bin/env python3
2 from __future__ import print_function
3 import binascii
4 import random
5 import socket
6 import unittest
7 import time
8 import re
9
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, TCP, UDP
13 from scapy.layers.inet6 import IPv6
14
15 from config import config
16 from framework import tag_fixme_vpp_workers
17 from framework import VppTestCase, VppTestRunner
18 from framework import tag_run_solo
19 from vpp_object import VppObject
20 from vpp_pg_interface import CaptureTimeoutError
21 from util import ppp
22 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
23 from vpp_ip_route import VppIpRoute, VppRoutePath
24 from vpp_papi.macaddress import mac_ntop
25 from socket import inet_ntop
26 from vpp_papi import VppEnum
27
28
29 class VppCFLOW(VppObject):
30     """CFLOW object for IPFIX exporter and Flowprobe feature"""
31
32     def __init__(
33         self,
34         test,
35         intf="pg2",
36         active=0,
37         passive=0,
38         timeout=100,
39         mtu=1024,
40         datapath="l2",
41         layer="l2 l3 l4",
42         direction="tx",
43     ):
44         self._test = test
45         self._intf = intf
46         self._intf_obj = getattr(self._test, intf)
47         self._active = active
48         if passive == 0 or passive < active:
49             self._passive = active + 1
50         else:
51             self._passive = passive
52         self._datapath = datapath  # l2 ip4 ip6
53         self._collect = layer  # l2 l3 l4
54         self._direction = direction  # rx tx both
55         self._timeout = timeout
56         self._mtu = mtu
57         self._configured = False
58
59     def add_vpp_config(self):
60         self.enable_exporter()
61         l2_flag = 0
62         l3_flag = 0
63         l4_flag = 0
64         if "l2" in self._collect.lower():
65             l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
66         if "l3" in self._collect.lower():
67             l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
68         if "l4" in self._collect.lower():
69             l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
70         self._test.vapi.flowprobe_set_params(
71             record_flags=(l2_flag | l3_flag | l4_flag),
72             active_timer=self._active,
73             passive_timer=self._passive,
74         )
75         self.enable_flowprobe_feature()
76         self._test.vapi.cli("ipfix flush")
77         self._configured = True
78
79     def remove_vpp_config(self):
80         self.disable_exporter()
81         self.disable_flowprobe_feature()
82         self._test.vapi.cli("ipfix flush")
83         self._configured = False
84
85     def enable_exporter(self):
86         self._test.vapi.set_ipfix_exporter(
87             collector_address=self._test.pg0.remote_ip4,
88             src_address=self._test.pg0.local_ip4,
89             path_mtu=self._mtu,
90             template_interval=self._timeout,
91         )
92
93     def _enable_disable_flowprobe_feature(self, is_add):
94         which_map = {
95             "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
96             "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
97             "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
98         }
99         direction_map = {
100             "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
101             "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
102             "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
103         }
104         self._test.vapi.flowprobe_interface_add_del(
105             is_add=is_add,
106             which=which_map[self._datapath],
107             direction=direction_map[self._direction],
108             sw_if_index=self._intf_obj.sw_if_index,
109         )
110
111     def enable_flowprobe_feature(self):
112         self._enable_disable_flowprobe_feature(is_add=True)
113
114     def disable_exporter(self):
115         self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
116
117     def disable_flowprobe_feature(self):
118         self._enable_disable_flowprobe_feature(is_add=False)
119
120     def object_id(self):
121         return "ipfix-collector-%s-%s" % (self._src, self.dst)
122
123     def query_vpp_config(self):
124         return self._configured
125
126     def verify_templates(self, decoder=None, timeout=1, count=3):
127         templates = []
128         self._test.assertIn(count, (1, 2, 3))
129         for _ in range(count):
130             p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
131             self._test.assertTrue(p.haslayer(IPFIX))
132             if decoder is not None and p.haslayer(Template):
133                 templates.append(p[Template].templateID)
134                 decoder.add_template(p.getlayer(Template))
135         return templates
136
137
138 class MethodHolder(VppTestCase):
139     """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
140
141     # Test variables
142     debug_print = False
143     max_number_of_packets = 10
144     pkts = []
145
146     @classmethod
147     def setUpClass(cls):
148         """
149         Perform standard class setup (defined by class method setUpClass in
150         class VppTestCase) before running the test case, set test case related
151         variables and configure VPP.
152         """
153         super(MethodHolder, cls).setUpClass()
154         try:
155             # Create pg interfaces
156             cls.create_pg_interfaces(range(9))
157
158             # Packet sizes
159             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
160
161             # Create BD with MAC learning and unknown unicast flooding disabled
162             # and put interfaces to this BD
163             cls.vapi.bridge_domain_add_del(bd_id=1, uu_flood=1, learn=1)
164             cls.vapi.sw_interface_set_l2_bridge(
165                 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
166             )
167             cls.vapi.sw_interface_set_l2_bridge(
168                 rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
169             )
170
171             # Set up all interfaces
172             for i in cls.pg_interfaces:
173                 i.admin_up()
174
175             cls.pg0.config_ip4()
176             cls.pg0.configure_ipv4_neighbors()
177             cls.collector = cls.pg0
178
179             cls.pg1.config_ip4()
180             cls.pg1.resolve_arp()
181             cls.pg2.config_ip4()
182             cls.pg2.resolve_arp()
183             cls.pg3.config_ip4()
184             cls.pg3.resolve_arp()
185             cls.pg4.config_ip4()
186             cls.pg4.resolve_arp()
187             cls.pg7.config_ip4()
188             cls.pg8.config_ip4()
189             cls.pg8.configure_ipv4_neighbors()
190
191             cls.pg5.config_ip6()
192             cls.pg5.resolve_ndp()
193             cls.pg5.disable_ipv6_ra()
194             cls.pg6.config_ip6()
195             cls.pg6.resolve_ndp()
196             cls.pg6.disable_ipv6_ra()
197         except Exception:
198             super(MethodHolder, cls).tearDownClass()
199             raise
200
201     @classmethod
202     def tearDownClass(cls):
203         super(MethodHolder, cls).tearDownClass()
204
205     def create_stream(
206         self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
207     ):
208         """Create a packet stream to tickle the plugin
209
210         :param VppInterface src_if: Source interface for packet stream
211         :param VppInterface src_if: Dst interface for packet stream
212         """
213         if src_if is None:
214             src_if = self.pg1
215         if dst_if is None:
216             dst_if = self.pg2
217         self.pkts = []
218         if packets is None:
219             packets = random.randint(1, self.max_number_of_packets)
220         pkt_size = size
221         for p in range(0, packets):
222             if size is None:
223                 pkt_size = random.choice(self.pg_if_packet_sizes)
224             info = self.create_packet_info(src_if, dst_if)
225             payload = self.info_to_payload(info)
226             p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
227             if ip_ver == "v4":
228                 p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
229             else:
230                 p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
231             p /= UDP(sport=1234, dport=4321)
232             p /= Raw(payload)
233             info.data = p.copy()
234             self.extend_packet(p, pkt_size)
235             self.pkts.append(p)
236
237     def verify_cflow_data(self, decoder, capture, cflow):
238         octets = 0
239         packets = 0
240         for p in capture:
241             octets += p[IP].len
242             packets += 1
243         if cflow.haslayer(Data):
244             data = decoder.decode_data_set(cflow.getlayer(Set))
245             for record in data:
246                 self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
247                 self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
248
249     def send_packets(self, src_if=None, dst_if=None):
250         if src_if is None:
251             src_if = self.pg1
252         if dst_if is None:
253             dst_if = self.pg2
254         self.pg_enable_capture([dst_if])
255         src_if.add_stream(self.pkts)
256         self.pg_start()
257         return dst_if.get_capture(len(self.pkts))
258
259     def verify_cflow_data_detail(
260         self, decoder, capture, cflow, data_set={1: "octets", 2: "packets"}, ip_ver="v4"
261     ):
262         if self.debug_print:
263             print(capture[0].show())
264         if cflow.haslayer(Data):
265             data = decoder.decode_data_set(cflow.getlayer(Set))
266             if self.debug_print:
267                 print(data)
268             if ip_ver == "v4":
269                 ip_layer = capture[0][IP]
270             else:
271                 ip_layer = capture[0][IPv6]
272             if data_set is not None:
273                 for record in data:
274                     # skip flow if ingress/egress interface is 0
275                     if int(binascii.hexlify(record[10]), 16) == 0:
276                         continue
277                     if int(binascii.hexlify(record[14]), 16) == 0:
278                         continue
279
280                     for field in data_set:
281                         value = data_set[field]
282                         if value == "octets":
283                             value = ip_layer.len
284                             if ip_ver == "v6":
285                                 value += 40  # ??? is this correct
286                         elif value == "packets":
287                             value = 1
288                         elif value == "src_ip":
289                             if ip_ver == "v4":
290                                 ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
291                             else:
292                                 ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
293                             value = int(binascii.hexlify(ip), 16)
294                         elif value == "dst_ip":
295                             if ip_ver == "v4":
296                                 ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
297                             else:
298                                 ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
299                             value = int(binascii.hexlify(ip), 16)
300                         elif value == "sport":
301                             value = int(capture[0][UDP].sport)
302                         elif value == "dport":
303                             value = int(capture[0][UDP].dport)
304                         self.assertEqual(
305                             int(binascii.hexlify(record[field]), 16), value
306                         )
307
308     def verify_cflow_data_notimer(self, decoder, capture, cflows):
309         idx = 0
310         for cflow in cflows:
311             if cflow.haslayer(Data):
312                 data = decoder.decode_data_set(cflow.getlayer(Set))
313             else:
314                 raise Exception("No CFLOW data")
315
316             for rec in data:
317                 p = capture[idx]
318                 idx += 1
319                 self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
320                 self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
321         self.assertEqual(len(capture), idx)
322
323     def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
324         """wait for CFLOW packet and verify its correctness
325
326         :param timeout: how long to wait
327
328         """
329         self.logger.info("IPFIX: Waiting for CFLOW packet")
330         # self.logger.debug(self.vapi.ppcli("show flow table"))
331         p = collector_intf.wait_for_packet(timeout=timeout)
332         self.assertEqual(p[Set].setID, set_id)
333         # self.logger.debug(self.vapi.ppcli("show flow table"))
334         self.logger.debug(ppp("IPFIX: Got packet:", p))
335         return p
336
337
338 @tag_run_solo
339 @tag_fixme_vpp_workers
340 class Flowprobe(MethodHolder):
341     """Template verification, timer tests"""
342
343     @classmethod
344     def setUpClass(cls):
345         super(Flowprobe, cls).setUpClass()
346
347     @classmethod
348     def tearDownClass(cls):
349         super(Flowprobe, cls).tearDownClass()
350
351     def test_0001(self):
352         """timer less than template timeout"""
353         self.logger.info("FFP_TEST_START_0001")
354         self.pg_enable_capture(self.pg_interfaces)
355         self.pkts = []
356
357         ipfix = VppCFLOW(test=self, active=2)
358         ipfix.add_vpp_config()
359
360         ipfix_decoder = IPFIXDecoder()
361         # template packet should arrive immediately
362         templates = ipfix.verify_templates(ipfix_decoder)
363
364         self.create_stream(packets=1)
365         self.send_packets()
366         capture = self.pg2.get_capture(1)
367
368         # make sure the one packet we expect actually showed up
369         cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
370         self.verify_cflow_data(ipfix_decoder, capture, cflow)
371
372         ipfix.remove_vpp_config()
373         self.logger.info("FFP_TEST_FINISH_0001")
374
375     def test_0002(self):
376         """timer greater than template timeout"""
377         self.logger.info("FFP_TEST_START_0002")
378         self.pg_enable_capture(self.pg_interfaces)
379         self.pkts = []
380
381         ipfix = VppCFLOW(test=self, timeout=3, active=4)
382         ipfix.add_vpp_config()
383
384         ipfix_decoder = IPFIXDecoder()
385         # template packet should arrive immediately
386         ipfix.verify_templates()
387
388         self.create_stream(packets=2)
389         self.send_packets()
390         capture = self.pg2.get_capture(2)
391
392         # next set of template packet should arrive after 20 seconds
393         # template packet should arrive within 20 s
394         templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
395
396         # make sure the one packet we expect actually showed up
397         cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
398         self.verify_cflow_data(ipfix_decoder, capture, cflow)
399
400         ipfix.remove_vpp_config()
401         self.logger.info("FFP_TEST_FINISH_0002")
402
403     def test_cflow_packet(self):
404         """verify cflow packet fields"""
405         self.logger.info("FFP_TEST_START_0000")
406         self.pg_enable_capture(self.pg_interfaces)
407         self.pkts = []
408
409         ipfix = VppCFLOW(
410             test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
411         )
412         ipfix.add_vpp_config()
413
414         route_9001 = VppIpRoute(
415             self,
416             "9.0.0.0",
417             24,
418             [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
419         )
420         route_9001.add_vpp_config()
421
422         ipfix_decoder = IPFIXDecoder()
423         templates = ipfix.verify_templates(ipfix_decoder, count=1)
424
425         self.pkts = [
426             (
427                 Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
428                 / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
429                 / TCP(sport=1234, dport=4321, flags=80)
430                 / Raw(b"\xa5" * 100)
431             )
432         ]
433
434         nowUTC = int(time.time())
435         nowUNIX = nowUTC + 2208988800
436         self.send_packets(src_if=self.pg7, dst_if=self.pg8)
437
438         cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
439         self.collector.get_capture(2)
440
441         if cflow[0].haslayer(IPFIX):
442             self.assertEqual(cflow[IPFIX].version, 10)
443             self.assertEqual(cflow[IPFIX].observationDomainID, 1)
444             self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
445             self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
446         if cflow.haslayer(Data):
447             record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
448             # ingress interface
449             self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
450             # egress interface
451             self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
452             # direction
453             self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
454             # packets
455             self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
456             # src mac
457             self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
458             # dst mac
459             self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
460             flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
461             # flow start timestamp
462             self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
463             flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
464             # flow end timestamp
465             self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
466             # ethernet type
467             self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
468             # src ip
469             self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
470             # dst ip
471             self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
472             # protocol (TCP)
473             self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
474             # src port
475             self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
476             # dst port
477             self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
478             # tcp flags
479             self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
480
481         ipfix.remove_vpp_config()
482         self.logger.info("FFP_TEST_FINISH_0000")
483
484     def test_interface_dump(self):
485         """Dump interfaces with IPFIX flow record generation enabled"""
486         self.logger.info("FFP_TEST_START_0003")
487
488         # Enable feature for 3 interfaces
489         ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx")
490         ipfix1.add_vpp_config()
491
492         ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx")
493         ipfix2.enable_flowprobe_feature()
494
495         ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both")
496         ipfix3.enable_flowprobe_feature()
497
498         # When request "all", dump should contain all enabled interfaces
499         dump = self.vapi.flowprobe_interface_dump()
500         self.assertEqual(len(dump), 3)
501
502         # Verify 1st interface
503         self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index)
504         self.assertEqual(
505             dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2
506         )
507         self.assertEqual(
508             dump[0].direction,
509             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
510         )
511
512         # Verify 2nd interface
513         self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index)
514         self.assertEqual(
515             dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
516         )
517         self.assertEqual(
518             dump[1].direction,
519             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
520         )
521
522         # Verify 3rd interface
523         self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index)
524         self.assertEqual(
525             dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6
526         )
527         self.assertEqual(
528             dump[2].direction,
529             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
530         )
531
532         # When request 2nd interface, dump should contain only the specified interface
533         dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index)
534         self.assertEqual(len(dump), 1)
535
536         # Verify 2nd interface
537         self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index)
538         self.assertEqual(
539             dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
540         )
541         self.assertEqual(
542             dump[0].direction,
543             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
544         )
545
546         # When request 99th interface, dump should be empty
547         dump = self.vapi.flowprobe_interface_dump(sw_if_index=99)
548         self.assertEqual(len(dump), 0)
549
550         ipfix1.remove_vpp_config()
551         ipfix2.remove_vpp_config()
552         ipfix3.remove_vpp_config()
553         self.logger.info("FFP_TEST_FINISH_0003")
554
555     def test_get_params(self):
556         """Get IPFIX flow record generation parameters"""
557         self.logger.info("FFP_TEST_START_0004")
558
559         # Enable feature for an interface with custom parameters
560         ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4")
561         ipfix.add_vpp_config()
562
563         # Get and verify parameters
564         params = self.vapi.flowprobe_get_params()
565         self.assertEqual(params.active_timer, 20)
566         self.assertEqual(params.passive_timer, 40)
567         record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
568         record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
569         record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
570         self.assertEqual(params.record_flags, record_flags)
571
572         ipfix.remove_vpp_config()
573         self.logger.info("FFP_TEST_FINISH_0004")
574
575
576 class DatapathTestsHolder(object):
577     """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
578
579     @classmethod
580     def setUpClass(cls):
581         super(DatapathTestsHolder, cls).setUpClass()
582
583     @classmethod
584     def tearDownClass(cls):
585         super(DatapathTestsHolder, cls).tearDownClass()
586
587     def test_templatesL2(self):
588         """verify template on L2 datapath"""
589         self.logger.info("FFP_TEST_START_0000")
590         self.pg_enable_capture(self.pg_interfaces)
591
592         ipfix = VppCFLOW(
593             test=self, intf=self.intf1, layer="l2", direction=self.direction
594         )
595         ipfix.add_vpp_config()
596
597         # template packet should arrive immediately
598         self.vapi.ipfix_flush()
599         ipfix.verify_templates(timeout=3, count=1)
600         self.collector.get_capture(1)
601
602         ipfix.remove_vpp_config()
603         self.logger.info("FFP_TEST_FINISH_0000")
604
605     def test_L2onL2(self):
606         """L2 data on L2 datapath"""
607         self.logger.info("FFP_TEST_START_0001")
608         self.pg_enable_capture(self.pg_interfaces)
609         self.pkts = []
610
611         ipfix = VppCFLOW(
612             test=self, intf=self.intf1, layer="l2", direction=self.direction
613         )
614         ipfix.add_vpp_config()
615
616         ipfix_decoder = IPFIXDecoder()
617         # template packet should arrive immediately
618         templates = ipfix.verify_templates(ipfix_decoder, count=1)
619
620         self.create_stream(packets=1)
621         capture = self.send_packets()
622
623         # make sure the one packet we expect actually showed up
624         self.vapi.ipfix_flush()
625         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
626         self.verify_cflow_data_detail(
627             ipfix_decoder,
628             capture,
629             cflow,
630             {2: "packets", 256: 8, 61: (self.direction == "tx")},
631         )
632         self.collector.get_capture(2)
633
634         ipfix.remove_vpp_config()
635         self.logger.info("FFP_TEST_FINISH_0001")
636
637     def test_L3onL2(self):
638         """L3 data on L2 datapath"""
639         self.logger.info("FFP_TEST_START_0002")
640         self.pg_enable_capture(self.pg_interfaces)
641         self.pkts = []
642
643         ipfix = VppCFLOW(
644             test=self, intf=self.intf1, layer="l3", direction=self.direction
645         )
646         ipfix.add_vpp_config()
647
648         ipfix_decoder = IPFIXDecoder()
649         # template packet should arrive immediately
650         templates = ipfix.verify_templates(ipfix_decoder, count=2)
651
652         self.create_stream(packets=1)
653         capture = self.send_packets()
654
655         # make sure the one packet we expect actually showed up
656         self.vapi.ipfix_flush()
657         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
658         self.verify_cflow_data_detail(
659             ipfix_decoder,
660             capture,
661             cflow,
662             {
663                 2: "packets",
664                 4: 17,
665                 8: "src_ip",
666                 12: "dst_ip",
667                 61: (self.direction == "tx"),
668             },
669         )
670
671         self.collector.get_capture(3)
672
673         ipfix.remove_vpp_config()
674         self.logger.info("FFP_TEST_FINISH_0002")
675
676     def test_L4onL2(self):
677         """L4 data on L2 datapath"""
678         self.logger.info("FFP_TEST_START_0003")
679         self.pg_enable_capture(self.pg_interfaces)
680         self.pkts = []
681
682         ipfix = VppCFLOW(
683             test=self, intf=self.intf1, layer="l4", direction=self.direction
684         )
685         ipfix.add_vpp_config()
686
687         ipfix_decoder = IPFIXDecoder()
688         # template packet should arrive immediately
689         templates = ipfix.verify_templates(ipfix_decoder, count=2)
690
691         self.create_stream(packets=1)
692         capture = self.send_packets()
693
694         # make sure the one packet we expect actually showed up
695         self.vapi.ipfix_flush()
696         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
697         self.verify_cflow_data_detail(
698             ipfix_decoder,
699             capture,
700             cflow,
701             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
702         )
703
704         self.collector.get_capture(3)
705
706         ipfix.remove_vpp_config()
707         self.logger.info("FFP_TEST_FINISH_0003")
708
709     def test_templatesIp4(self):
710         """verify templates on IP4 datapath"""
711         self.logger.info("FFP_TEST_START_0000")
712
713         self.pg_enable_capture(self.pg_interfaces)
714
715         ipfix = VppCFLOW(
716             test=self, intf=self.intf1, datapath="ip4", direction=self.direction
717         )
718         ipfix.add_vpp_config()
719
720         # template packet should arrive immediately
721         self.vapi.ipfix_flush()
722         ipfix.verify_templates(timeout=3, count=1)
723         self.collector.get_capture(1)
724
725         ipfix.remove_vpp_config()
726
727         self.logger.info("FFP_TEST_FINISH_0000")
728
729     def test_L2onIP4(self):
730         """L2 data on IP4 datapath"""
731         self.logger.info("FFP_TEST_START_0001")
732         self.pg_enable_capture(self.pg_interfaces)
733         self.pkts = []
734
735         ipfix = VppCFLOW(
736             test=self,
737             intf=self.intf2,
738             layer="l2",
739             datapath="ip4",
740             direction=self.direction,
741         )
742         ipfix.add_vpp_config()
743
744         ipfix_decoder = IPFIXDecoder()
745         # template packet should arrive immediately
746         templates = ipfix.verify_templates(ipfix_decoder, count=1)
747
748         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
749         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
750
751         # make sure the one packet we expect actually showed up
752         self.vapi.ipfix_flush()
753         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
754         self.verify_cflow_data_detail(
755             ipfix_decoder,
756             capture,
757             cflow,
758             {2: "packets", 256: 8, 61: (self.direction == "tx")},
759         )
760
761         # expected two templates and one cflow packet
762         self.collector.get_capture(2)
763
764         ipfix.remove_vpp_config()
765         self.logger.info("FFP_TEST_FINISH_0001")
766
767     def test_L3onIP4(self):
768         """L3 data on IP4 datapath"""
769         self.logger.info("FFP_TEST_START_0002")
770         self.pg_enable_capture(self.pg_interfaces)
771         self.pkts = []
772
773         ipfix = VppCFLOW(
774             test=self,
775             intf=self.intf2,
776             layer="l3",
777             datapath="ip4",
778             direction=self.direction,
779         )
780         ipfix.add_vpp_config()
781
782         ipfix_decoder = IPFIXDecoder()
783         # template packet should arrive immediately
784         templates = ipfix.verify_templates(ipfix_decoder, count=1)
785
786         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
787         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
788
789         # make sure the one packet we expect actually showed up
790         self.vapi.ipfix_flush()
791         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
792         self.verify_cflow_data_detail(
793             ipfix_decoder,
794             capture,
795             cflow,
796             {
797                 1: "octets",
798                 2: "packets",
799                 8: "src_ip",
800                 12: "dst_ip",
801                 61: (self.direction == "tx"),
802             },
803         )
804
805         # expected two templates and one cflow packet
806         self.collector.get_capture(2)
807
808         ipfix.remove_vpp_config()
809         self.logger.info("FFP_TEST_FINISH_0002")
810
811     def test_L4onIP4(self):
812         """L4 data on IP4 datapath"""
813         self.logger.info("FFP_TEST_START_0003")
814         self.pg_enable_capture(self.pg_interfaces)
815         self.pkts = []
816
817         ipfix = VppCFLOW(
818             test=self,
819             intf=self.intf2,
820             layer="l4",
821             datapath="ip4",
822             direction=self.direction,
823         )
824         ipfix.add_vpp_config()
825
826         ipfix_decoder = IPFIXDecoder()
827         # template packet should arrive immediately
828         templates = ipfix.verify_templates(ipfix_decoder, count=1)
829
830         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
831         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
832
833         # make sure the one packet we expect actually showed up
834         self.vapi.ipfix_flush()
835         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
836         self.verify_cflow_data_detail(
837             ipfix_decoder,
838             capture,
839             cflow,
840             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
841         )
842
843         # expected two templates and one cflow packet
844         self.collector.get_capture(2)
845
846         ipfix.remove_vpp_config()
847         self.logger.info("FFP_TEST_FINISH_0003")
848
849     def test_templatesIP6(self):
850         """verify templates on IP6 datapath"""
851         self.logger.info("FFP_TEST_START_0000")
852         self.pg_enable_capture(self.pg_interfaces)
853
854         ipfix = VppCFLOW(
855             test=self, intf=self.intf1, datapath="ip6", direction=self.direction
856         )
857         ipfix.add_vpp_config()
858
859         # template packet should arrive immediately
860         ipfix.verify_templates(count=1)
861         self.collector.get_capture(1)
862
863         ipfix.remove_vpp_config()
864
865         self.logger.info("FFP_TEST_FINISH_0000")
866
867     def test_L2onIP6(self):
868         """L2 data on IP6 datapath"""
869         self.logger.info("FFP_TEST_START_0001")
870         self.pg_enable_capture(self.pg_interfaces)
871         self.pkts = []
872
873         ipfix = VppCFLOW(
874             test=self,
875             intf=self.intf3,
876             layer="l2",
877             datapath="ip6",
878             direction=self.direction,
879         )
880         ipfix.add_vpp_config()
881
882         ipfix_decoder = IPFIXDecoder()
883         # template packet should arrive immediately
884         templates = ipfix.verify_templates(ipfix_decoder, count=1)
885
886         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
887         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
888
889         # make sure the one packet we expect actually showed up
890         self.vapi.ipfix_flush()
891         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
892         self.verify_cflow_data_detail(
893             ipfix_decoder,
894             capture,
895             cflow,
896             {2: "packets", 256: 56710, 61: (self.direction == "tx")},
897             ip_ver="v6",
898         )
899
900         # expected two templates and one cflow packet
901         self.collector.get_capture(2)
902
903         ipfix.remove_vpp_config()
904         self.logger.info("FFP_TEST_FINISH_0001")
905
906     def test_L3onIP6(self):
907         """L3 data on IP6 datapath"""
908         self.logger.info("FFP_TEST_START_0002")
909         self.pg_enable_capture(self.pg_interfaces)
910         self.pkts = []
911
912         ipfix = VppCFLOW(
913             test=self,
914             intf=self.intf3,
915             layer="l3",
916             datapath="ip6",
917             direction=self.direction,
918         )
919         ipfix.add_vpp_config()
920
921         ipfix_decoder = IPFIXDecoder()
922         # template packet should arrive immediately
923         templates = ipfix.verify_templates(ipfix_decoder, count=1)
924
925         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
926         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
927
928         # make sure the one packet we expect actually showed up
929         self.vapi.ipfix_flush()
930         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
931         self.verify_cflow_data_detail(
932             ipfix_decoder,
933             capture,
934             cflow,
935             {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
936             ip_ver="v6",
937         )
938
939         # expected two templates and one cflow packet
940         self.collector.get_capture(2)
941
942         ipfix.remove_vpp_config()
943         self.logger.info("FFP_TEST_FINISH_0002")
944
945     def test_L4onIP6(self):
946         """L4 data on IP6 datapath"""
947         self.logger.info("FFP_TEST_START_0003")
948         self.pg_enable_capture(self.pg_interfaces)
949         self.pkts = []
950
951         ipfix = VppCFLOW(
952             test=self,
953             intf=self.intf3,
954             layer="l4",
955             datapath="ip6",
956             direction=self.direction,
957         )
958         ipfix.add_vpp_config()
959
960         ipfix_decoder = IPFIXDecoder()
961         # template packet should arrive immediately
962         templates = ipfix.verify_templates(ipfix_decoder, count=1)
963
964         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
965         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
966
967         # make sure the one packet we expect actually showed up
968         self.vapi.ipfix_flush()
969         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
970         self.verify_cflow_data_detail(
971             ipfix_decoder,
972             capture,
973             cflow,
974             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
975             ip_ver="v6",
976         )
977
978         # expected two templates and one cflow packet
979         self.collector.get_capture(2)
980
981         ipfix.remove_vpp_config()
982         self.logger.info("FFP_TEST_FINISH_0003")
983
984     def test_0001(self):
985         """no timers, one CFLOW packet, 9 Flows inside"""
986         self.logger.info("FFP_TEST_START_0001")
987         self.pg_enable_capture(self.pg_interfaces)
988         self.pkts = []
989
990         ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
991         ipfix.add_vpp_config()
992
993         ipfix_decoder = IPFIXDecoder()
994         # template packet should arrive immediately
995         templates = ipfix.verify_templates(ipfix_decoder)
996
997         self.create_stream(packets=9)
998         capture = self.send_packets()
999
1000         # make sure the one packet we expect actually showed up
1001         self.vapi.ipfix_flush()
1002         cflow = self.wait_for_cflow_packet(self.collector, templates[1])
1003         self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
1004         self.collector.get_capture(4)
1005
1006         ipfix.remove_vpp_config()
1007         self.logger.info("FFP_TEST_FINISH_0001")
1008
1009     def test_0002(self):
1010         """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
1011         self.logger.info("FFP_TEST_START_0002")
1012         self.pg_enable_capture(self.pg_interfaces)
1013         self.pkts = []
1014
1015         ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
1016         ipfix.add_vpp_config()
1017
1018         ipfix_decoder = IPFIXDecoder()
1019         # template packet should arrive immediately
1020         self.vapi.ipfix_flush()
1021         templates = ipfix.verify_templates(ipfix_decoder)
1022
1023         self.create_stream(packets=6)
1024         capture = self.send_packets()
1025
1026         # make sure the one packet we expect actually showed up
1027         cflows = []
1028         self.vapi.ipfix_flush()
1029         cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1030         cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1031         self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
1032         self.collector.get_capture(5)
1033
1034         ipfix.remove_vpp_config()
1035         self.logger.info("FFP_TEST_FINISH_0002")
1036
1037
1038 @tag_fixme_vpp_workers
1039 class DatapathTx(MethodHolder, DatapathTestsHolder):
1040     """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
1041
1042     intf1 = "pg2"
1043     intf2 = "pg4"
1044     intf3 = "pg6"
1045     direction = "tx"
1046
1047
1048 @tag_fixme_vpp_workers
1049 class DatapathRx(MethodHolder, DatapathTestsHolder):
1050     """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
1051
1052     intf1 = "pg1"
1053     intf2 = "pg3"
1054     intf3 = "pg5"
1055     direction = "rx"
1056
1057
1058 @unittest.skipUnless(config.extended, "part of extended tests")
1059 class DisableIPFIX(MethodHolder):
1060     """Disable IPFIX"""
1061
1062     @classmethod
1063     def setUpClass(cls):
1064         super(DisableIPFIX, cls).setUpClass()
1065
1066     @classmethod
1067     def tearDownClass(cls):
1068         super(DisableIPFIX, cls).tearDownClass()
1069
1070     def test_0001(self):
1071         """disable IPFIX after first packets"""
1072         self.logger.info("FFP_TEST_START_0001")
1073         self.pg_enable_capture(self.pg_interfaces)
1074         self.pkts = []
1075
1076         ipfix = VppCFLOW(test=self)
1077         ipfix.add_vpp_config()
1078
1079         ipfix_decoder = IPFIXDecoder()
1080         # template packet should arrive immediately
1081         templates = ipfix.verify_templates(ipfix_decoder)
1082
1083         self.create_stream()
1084         self.send_packets()
1085
1086         # make sure the one packet we expect actually showed up
1087         self.vapi.ipfix_flush()
1088         self.wait_for_cflow_packet(self.collector, templates[1])
1089         self.collector.get_capture(4)
1090
1091         # disable IPFIX
1092         ipfix.disable_exporter()
1093         self.pg_enable_capture([self.collector])
1094
1095         self.send_packets()
1096
1097         # make sure no one packet arrived in 1 minute
1098         self.vapi.ipfix_flush()
1099         self.sleep(1, "wait before verifying no packets sent")
1100         self.collector.assert_nothing_captured()
1101
1102         ipfix.remove_vpp_config()
1103         self.logger.info("FFP_TEST_FINISH_0001")
1104
1105
1106 @unittest.skipUnless(config.extended, "part of extended tests")
1107 class ReenableIPFIX(MethodHolder):
1108     """Re-enable IPFIX"""
1109
1110     @classmethod
1111     def setUpClass(cls):
1112         super(ReenableIPFIX, cls).setUpClass()
1113
1114     @classmethod
1115     def tearDownClass(cls):
1116         super(ReenableIPFIX, cls).tearDownClass()
1117
1118     def test_0011(self):
1119         """disable IPFIX after first packets and re-enable after few packets"""
1120         self.logger.info("FFP_TEST_START_0001")
1121         self.pg_enable_capture(self.pg_interfaces)
1122         self.pkts = []
1123
1124         ipfix = VppCFLOW(test=self)
1125         ipfix.add_vpp_config()
1126
1127         ipfix_decoder = IPFIXDecoder()
1128         # template packet should arrive immediately
1129         templates = ipfix.verify_templates(ipfix_decoder)
1130
1131         self.create_stream(packets=5)
1132         self.send_packets()
1133
1134         # make sure the one packet we expect actually showed up
1135         self.vapi.ipfix_flush()
1136         self.wait_for_cflow_packet(self.collector, templates[1])
1137         self.collector.get_capture(4)
1138
1139         # disable IPFIX
1140         ipfix.disable_exporter()
1141         self.vapi.ipfix_flush()
1142         self.pg_enable_capture([self.collector])
1143
1144         self.send_packets()
1145
1146         # make sure no one packet arrived in active timer span
1147         self.vapi.ipfix_flush()
1148         self.sleep(1, "wait before verifying no packets sent")
1149         self.collector.assert_nothing_captured()
1150         self.pg2.get_capture(5)
1151
1152         # enable IPFIX
1153         ipfix.enable_exporter()
1154
1155         capture = self.collector.get_capture(4)
1156         nr_templates = 0
1157         nr_data = 0
1158         for p in capture:
1159             self.assertTrue(p.haslayer(IPFIX))
1160             if p.haslayer(Template):
1161                 nr_templates += 1
1162         self.assertTrue(nr_templates, 3)
1163         for p in capture:
1164             self.assertTrue(p.haslayer(IPFIX))
1165             if p.haslayer(Data):
1166                 nr_data += 1
1167         self.assertTrue(nr_templates, 1)
1168
1169         ipfix.remove_vpp_config()
1170         self.logger.info("FFP_TEST_FINISH_0001")
1171
1172
1173 @unittest.skipUnless(config.extended, "part of extended tests")
1174 class DisableFP(MethodHolder):
1175     """Disable Flowprobe feature"""
1176
1177     @classmethod
1178     def setUpClass(cls):
1179         super(DisableFP, cls).setUpClass()
1180
1181     @classmethod
1182     def tearDownClass(cls):
1183         super(DisableFP, cls).tearDownClass()
1184
1185     def test_0001(self):
1186         """disable flowprobe feature after first packets"""
1187         self.logger.info("FFP_TEST_START_0001")
1188         self.pg_enable_capture(self.pg_interfaces)
1189         self.pkts = []
1190         ipfix = VppCFLOW(test=self)
1191         ipfix.add_vpp_config()
1192
1193         ipfix_decoder = IPFIXDecoder()
1194         # template packet should arrive immediately
1195         templates = ipfix.verify_templates(ipfix_decoder)
1196
1197         self.create_stream()
1198         self.send_packets()
1199
1200         # make sure the one packet we expect actually showed up
1201         self.vapi.ipfix_flush()
1202         self.wait_for_cflow_packet(self.collector, templates[1])
1203         self.collector.get_capture(4)
1204
1205         # disable IPFIX
1206         ipfix.disable_flowprobe_feature()
1207         self.pg_enable_capture([self.collector])
1208
1209         self.send_packets()
1210
1211         # make sure no one packet arrived in active timer span
1212         self.vapi.ipfix_flush()
1213         self.sleep(1, "wait before verifying no packets sent")
1214         self.collector.assert_nothing_captured()
1215
1216         ipfix.remove_vpp_config()
1217         self.logger.info("FFP_TEST_FINISH_0001")
1218
1219
1220 @unittest.skipUnless(config.extended, "part of extended tests")
1221 class ReenableFP(MethodHolder):
1222     """Re-enable Flowprobe feature"""
1223
1224     @classmethod
1225     def setUpClass(cls):
1226         super(ReenableFP, cls).setUpClass()
1227
1228     @classmethod
1229     def tearDownClass(cls):
1230         super(ReenableFP, cls).tearDownClass()
1231
1232     def test_0001(self):
1233         """disable flowprobe feature after first packets and re-enable
1234         after few packets"""
1235         self.logger.info("FFP_TEST_START_0001")
1236         self.pg_enable_capture(self.pg_interfaces)
1237         self.pkts = []
1238
1239         ipfix = VppCFLOW(test=self)
1240         ipfix.add_vpp_config()
1241
1242         ipfix_decoder = IPFIXDecoder()
1243         # template packet should arrive immediately
1244         self.vapi.ipfix_flush()
1245         templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1246
1247         self.create_stream()
1248         self.send_packets()
1249
1250         # make sure the one packet we expect actually showed up
1251         self.vapi.ipfix_flush()
1252         self.wait_for_cflow_packet(self.collector, templates[1], 5)
1253         self.collector.get_capture(4)
1254
1255         # disable FPP feature
1256         ipfix.disable_flowprobe_feature()
1257         self.pg_enable_capture([self.collector])
1258
1259         self.send_packets()
1260
1261         # make sure no one packet arrived in active timer span
1262         self.vapi.ipfix_flush()
1263         self.sleep(5, "wait before verifying no packets sent")
1264         self.collector.assert_nothing_captured()
1265
1266         # enable FPP feature
1267         ipfix.enable_flowprobe_feature()
1268         self.vapi.ipfix_flush()
1269         templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1270
1271         self.send_packets()
1272
1273         # make sure the next packets (templates and data) we expect actually
1274         # showed up
1275         self.vapi.ipfix_flush()
1276         self.wait_for_cflow_packet(self.collector, templates[1], 5)
1277         self.collector.get_capture(4)
1278
1279         ipfix.remove_vpp_config()
1280         self.logger.info("FFP_TEST_FINISH_0001")
1281
1282
1283 if __name__ == "__main__":
1284     unittest.main(testRunner=VppTestRunner)