flowprobe: add support for reporting on inbound packets
[vpp.git] / test / test_flowprobe.py
1 #!/usr/bin/env python3
2 from __future__ import print_function
3 import binascii
4 import random
5 import socket
6 import unittest
7 import time
8 import re
9
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, TCP, UDP
13 from scapy.layers.inet6 import IPv6
14
15 from config import config
16 from framework import tag_fixme_vpp_workers
17 from framework import VppTestCase, VppTestRunner
18 from framework import tag_run_solo
19 from vpp_object import VppObject
20 from vpp_pg_interface import CaptureTimeoutError
21 from util import ppp
22 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
23 from vpp_ip_route import VppIpRoute, VppRoutePath
24 from vpp_papi.macaddress import mac_ntop
25 from socket import inet_ntop
26 from vpp_papi import VppEnum
27
28
29 class VppCFLOW(VppObject):
30     """CFLOW object for IPFIX exporter and Flowprobe feature"""
31
32     def __init__(
33         self,
34         test,
35         intf="pg2",
36         active=0,
37         passive=0,
38         timeout=100,
39         mtu=1024,
40         datapath="l2",
41         layer="l2 l3 l4",
42         direction="tx",
43     ):
44         self._test = test
45         self._intf = intf
46         self._intf_obj = getattr(self._test, intf)
47         self._active = active
48         if passive == 0 or passive < active:
49             self._passive = active + 1
50         else:
51             self._passive = passive
52         self._datapath = datapath  # l2 ip4 ip6
53         self._collect = layer  # l2 l3 l4
54         self._direction = direction  # rx tx both
55         self._timeout = timeout
56         self._mtu = mtu
57         self._configured = False
58
59     def add_vpp_config(self):
60         self.enable_exporter()
61         l2_flag = 0
62         l3_flag = 0
63         l4_flag = 0
64         if "l2" in self._collect.lower():
65             l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
66         if "l3" in self._collect.lower():
67             l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
68         if "l4" in self._collect.lower():
69             l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
70         self._test.vapi.flowprobe_params(
71             record_flags=(l2_flag | l3_flag | l4_flag),
72             active_timer=self._active,
73             passive_timer=self._passive,
74         )
75         self.enable_flowprobe_feature()
76         self._test.vapi.cli("ipfix flush")
77         self._configured = True
78
79     def remove_vpp_config(self):
80         self.disable_exporter()
81         self.disable_flowprobe_feature()
82         self._test.vapi.cli("ipfix flush")
83         self._configured = False
84
85     def enable_exporter(self):
86         self._test.vapi.set_ipfix_exporter(
87             collector_address=self._test.pg0.remote_ip4,
88             src_address=self._test.pg0.local_ip4,
89             path_mtu=self._mtu,
90             template_interval=self._timeout,
91         )
92
93     def _enable_disable_flowprobe_feature(self, is_add):
94         which_map = {
95             "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
96             "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
97             "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
98         }
99         direction_map = {
100             "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
101             "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
102             "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
103         }
104         self._test.vapi.flowprobe_interface_add_del(
105             is_add=is_add,
106             which=which_map[self._datapath],
107             direction=direction_map[self._direction],
108             sw_if_index=self._intf_obj.sw_if_index,
109         )
110
111     def enable_flowprobe_feature(self):
112         self._enable_disable_flowprobe_feature(is_add=True)
113
114     def disable_exporter(self):
115         self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
116
117     def disable_flowprobe_feature(self):
118         self._enable_disable_flowprobe_feature(is_add=False)
119
120     def object_id(self):
121         return "ipfix-collector-%s-%s" % (self._src, self.dst)
122
123     def query_vpp_config(self):
124         return self._configured
125
126     def verify_templates(self, decoder=None, timeout=1, count=3):
127         templates = []
128         self._test.assertIn(count, (1, 2, 3))
129         for _ in range(count):
130             p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
131             self._test.assertTrue(p.haslayer(IPFIX))
132             if decoder is not None and p.haslayer(Template):
133                 templates.append(p[Template].templateID)
134                 decoder.add_template(p.getlayer(Template))
135         return templates
136
137
138 class MethodHolder(VppTestCase):
139     """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
140
141     # Test variables
142     debug_print = False
143     max_number_of_packets = 10
144     pkts = []
145
146     @classmethod
147     def setUpClass(cls):
148         """
149         Perform standard class setup (defined by class method setUpClass in
150         class VppTestCase) before running the test case, set test case related
151         variables and configure VPP.
152         """
153         super(MethodHolder, cls).setUpClass()
154         try:
155             # Create pg interfaces
156             cls.create_pg_interfaces(range(9))
157
158             # Packet sizes
159             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
160
161             # Create BD with MAC learning and unknown unicast flooding disabled
162             # and put interfaces to this BD
163             cls.vapi.bridge_domain_add_del(bd_id=1, uu_flood=1, learn=1)
164             cls.vapi.sw_interface_set_l2_bridge(
165                 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
166             )
167             cls.vapi.sw_interface_set_l2_bridge(
168                 rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
169             )
170
171             # Set up all interfaces
172             for i in cls.pg_interfaces:
173                 i.admin_up()
174
175             cls.pg0.config_ip4()
176             cls.pg0.configure_ipv4_neighbors()
177             cls.collector = cls.pg0
178
179             cls.pg1.config_ip4()
180             cls.pg1.resolve_arp()
181             cls.pg2.config_ip4()
182             cls.pg2.resolve_arp()
183             cls.pg3.config_ip4()
184             cls.pg3.resolve_arp()
185             cls.pg4.config_ip4()
186             cls.pg4.resolve_arp()
187             cls.pg7.config_ip4()
188             cls.pg8.config_ip4()
189             cls.pg8.configure_ipv4_neighbors()
190
191             cls.pg5.config_ip6()
192             cls.pg5.resolve_ndp()
193             cls.pg5.disable_ipv6_ra()
194             cls.pg6.config_ip6()
195             cls.pg6.resolve_ndp()
196             cls.pg6.disable_ipv6_ra()
197         except Exception:
198             super(MethodHolder, cls).tearDownClass()
199             raise
200
201     @classmethod
202     def tearDownClass(cls):
203         super(MethodHolder, cls).tearDownClass()
204
205     def create_stream(
206         self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
207     ):
208         """Create a packet stream to tickle the plugin
209
210         :param VppInterface src_if: Source interface for packet stream
211         :param VppInterface src_if: Dst interface for packet stream
212         """
213         if src_if is None:
214             src_if = self.pg1
215         if dst_if is None:
216             dst_if = self.pg2
217         self.pkts = []
218         if packets is None:
219             packets = random.randint(1, self.max_number_of_packets)
220         pkt_size = size
221         for p in range(0, packets):
222             if size is None:
223                 pkt_size = random.choice(self.pg_if_packet_sizes)
224             info = self.create_packet_info(src_if, dst_if)
225             payload = self.info_to_payload(info)
226             p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
227             if ip_ver == "v4":
228                 p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
229             else:
230                 p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
231             p /= UDP(sport=1234, dport=4321)
232             p /= Raw(payload)
233             info.data = p.copy()
234             self.extend_packet(p, pkt_size)
235             self.pkts.append(p)
236
237     def verify_cflow_data(self, decoder, capture, cflow):
238         octets = 0
239         packets = 0
240         for p in capture:
241             octets += p[IP].len
242             packets += 1
243         if cflow.haslayer(Data):
244             data = decoder.decode_data_set(cflow.getlayer(Set))
245             for record in data:
246                 self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
247                 self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
248
249     def send_packets(self, src_if=None, dst_if=None):
250         if src_if is None:
251             src_if = self.pg1
252         if dst_if is None:
253             dst_if = self.pg2
254         self.pg_enable_capture([dst_if])
255         src_if.add_stream(self.pkts)
256         self.pg_start()
257         return dst_if.get_capture(len(self.pkts))
258
259     def verify_cflow_data_detail(
260         self, decoder, capture, cflow, data_set={1: "octets", 2: "packets"}, ip_ver="v4"
261     ):
262         if self.debug_print:
263             print(capture[0].show())
264         if cflow.haslayer(Data):
265             data = decoder.decode_data_set(cflow.getlayer(Set))
266             if self.debug_print:
267                 print(data)
268             if ip_ver == "v4":
269                 ip_layer = capture[0][IP]
270             else:
271                 ip_layer = capture[0][IPv6]
272             if data_set is not None:
273                 for record in data:
274                     # skip flow if ingress/egress interface is 0
275                     if int(binascii.hexlify(record[10]), 16) == 0:
276                         continue
277                     if int(binascii.hexlify(record[14]), 16) == 0:
278                         continue
279
280                     for field in data_set:
281                         value = data_set[field]
282                         if value == "octets":
283                             value = ip_layer.len
284                             if ip_ver == "v6":
285                                 value += 40  # ??? is this correct
286                         elif value == "packets":
287                             value = 1
288                         elif value == "src_ip":
289                             if ip_ver == "v4":
290                                 ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
291                             else:
292                                 ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
293                             value = int(binascii.hexlify(ip), 16)
294                         elif value == "dst_ip":
295                             if ip_ver == "v4":
296                                 ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
297                             else:
298                                 ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
299                             value = int(binascii.hexlify(ip), 16)
300                         elif value == "sport":
301                             value = int(capture[0][UDP].sport)
302                         elif value == "dport":
303                             value = int(capture[0][UDP].dport)
304                         self.assertEqual(
305                             int(binascii.hexlify(record[field]), 16), value
306                         )
307
308     def verify_cflow_data_notimer(self, decoder, capture, cflows):
309         idx = 0
310         for cflow in cflows:
311             if cflow.haslayer(Data):
312                 data = decoder.decode_data_set(cflow.getlayer(Set))
313             else:
314                 raise Exception("No CFLOW data")
315
316             for rec in data:
317                 p = capture[idx]
318                 idx += 1
319                 self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
320                 self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
321         self.assertEqual(len(capture), idx)
322
323     def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
324         """wait for CFLOW packet and verify its correctness
325
326         :param timeout: how long to wait
327
328         """
329         self.logger.info("IPFIX: Waiting for CFLOW packet")
330         # self.logger.debug(self.vapi.ppcli("show flow table"))
331         p = collector_intf.wait_for_packet(timeout=timeout)
332         self.assertEqual(p[Set].setID, set_id)
333         # self.logger.debug(self.vapi.ppcli("show flow table"))
334         self.logger.debug(ppp("IPFIX: Got packet:", p))
335         return p
336
337
338 @tag_run_solo
339 @tag_fixme_vpp_workers
340 class Flowprobe(MethodHolder):
341     """Template verification, timer tests"""
342
343     @classmethod
344     def setUpClass(cls):
345         super(Flowprobe, cls).setUpClass()
346
347     @classmethod
348     def tearDownClass(cls):
349         super(Flowprobe, cls).tearDownClass()
350
351     def test_0001(self):
352         """timer less than template timeout"""
353         self.logger.info("FFP_TEST_START_0001")
354         self.pg_enable_capture(self.pg_interfaces)
355         self.pkts = []
356
357         ipfix = VppCFLOW(test=self, active=2)
358         ipfix.add_vpp_config()
359
360         ipfix_decoder = IPFIXDecoder()
361         # template packet should arrive immediately
362         templates = ipfix.verify_templates(ipfix_decoder)
363
364         self.create_stream(packets=1)
365         self.send_packets()
366         capture = self.pg2.get_capture(1)
367
368         # make sure the one packet we expect actually showed up
369         cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
370         self.verify_cflow_data(ipfix_decoder, capture, cflow)
371
372         ipfix.remove_vpp_config()
373         self.logger.info("FFP_TEST_FINISH_0001")
374
375     def test_0002(self):
376         """timer greater than template timeout"""
377         self.logger.info("FFP_TEST_START_0002")
378         self.pg_enable_capture(self.pg_interfaces)
379         self.pkts = []
380
381         ipfix = VppCFLOW(test=self, timeout=3, active=4)
382         ipfix.add_vpp_config()
383
384         ipfix_decoder = IPFIXDecoder()
385         # template packet should arrive immediately
386         ipfix.verify_templates()
387
388         self.create_stream(packets=2)
389         self.send_packets()
390         capture = self.pg2.get_capture(2)
391
392         # next set of template packet should arrive after 20 seconds
393         # template packet should arrive within 20 s
394         templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
395
396         # make sure the one packet we expect actually showed up
397         cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
398         self.verify_cflow_data(ipfix_decoder, capture, cflow)
399
400         ipfix.remove_vpp_config()
401         self.logger.info("FFP_TEST_FINISH_0002")
402
403     def test_cflow_packet(self):
404         """verify cflow packet fields"""
405         self.logger.info("FFP_TEST_START_0000")
406         self.pg_enable_capture(self.pg_interfaces)
407         self.pkts = []
408
409         ipfix = VppCFLOW(
410             test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
411         )
412         ipfix.add_vpp_config()
413
414         route_9001 = VppIpRoute(
415             self,
416             "9.0.0.0",
417             24,
418             [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
419         )
420         route_9001.add_vpp_config()
421
422         ipfix_decoder = IPFIXDecoder()
423         templates = ipfix.verify_templates(ipfix_decoder, count=1)
424
425         self.pkts = [
426             (
427                 Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
428                 / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
429                 / TCP(sport=1234, dport=4321, flags=80)
430                 / Raw(b"\xa5" * 100)
431             )
432         ]
433
434         nowUTC = int(time.time())
435         nowUNIX = nowUTC + 2208988800
436         self.send_packets(src_if=self.pg7, dst_if=self.pg8)
437
438         cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
439         self.collector.get_capture(2)
440
441         if cflow[0].haslayer(IPFIX):
442             self.assertEqual(cflow[IPFIX].version, 10)
443             self.assertEqual(cflow[IPFIX].observationDomainID, 1)
444             self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
445             self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
446         if cflow.haslayer(Data):
447             record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
448             # ingress interface
449             self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
450             # egress interface
451             self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
452             # direction
453             self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
454             # packets
455             self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
456             # src mac
457             self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
458             # dst mac
459             self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
460             flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
461             # flow start timestamp
462             self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
463             flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
464             # flow end timestamp
465             self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
466             # ethernet type
467             self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
468             # src ip
469             self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
470             # dst ip
471             self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
472             # protocol (TCP)
473             self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
474             # src port
475             self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
476             # dst port
477             self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
478             # tcp flags
479             self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
480
481         ipfix.remove_vpp_config()
482         self.logger.info("FFP_TEST_FINISH_0000")
483
484
485 class DatapathTestsHolder(object):
486     """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
487
488     @classmethod
489     def setUpClass(cls):
490         super(DatapathTestsHolder, cls).setUpClass()
491
492     @classmethod
493     def tearDownClass(cls):
494         super(DatapathTestsHolder, cls).tearDownClass()
495
496     def test_templatesL2(self):
497         """verify template on L2 datapath"""
498         self.logger.info("FFP_TEST_START_0000")
499         self.pg_enable_capture(self.pg_interfaces)
500
501         ipfix = VppCFLOW(
502             test=self, intf=self.intf1, layer="l2", direction=self.direction
503         )
504         ipfix.add_vpp_config()
505
506         # template packet should arrive immediately
507         self.vapi.ipfix_flush()
508         ipfix.verify_templates(timeout=3, count=1)
509         self.collector.get_capture(1)
510
511         ipfix.remove_vpp_config()
512         self.logger.info("FFP_TEST_FINISH_0000")
513
514     def test_L2onL2(self):
515         """L2 data on L2 datapath"""
516         self.logger.info("FFP_TEST_START_0001")
517         self.pg_enable_capture(self.pg_interfaces)
518         self.pkts = []
519
520         ipfix = VppCFLOW(
521             test=self, intf=self.intf1, layer="l2", direction=self.direction
522         )
523         ipfix.add_vpp_config()
524
525         ipfix_decoder = IPFIXDecoder()
526         # template packet should arrive immediately
527         templates = ipfix.verify_templates(ipfix_decoder, count=1)
528
529         self.create_stream(packets=1)
530         capture = self.send_packets()
531
532         # make sure the one packet we expect actually showed up
533         self.vapi.ipfix_flush()
534         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
535         self.verify_cflow_data_detail(
536             ipfix_decoder,
537             capture,
538             cflow,
539             {2: "packets", 256: 8, 61: (self.direction == "tx")},
540         )
541         self.collector.get_capture(2)
542
543         ipfix.remove_vpp_config()
544         self.logger.info("FFP_TEST_FINISH_0001")
545
546     def test_L3onL2(self):
547         """L3 data on L2 datapath"""
548         self.logger.info("FFP_TEST_START_0002")
549         self.pg_enable_capture(self.pg_interfaces)
550         self.pkts = []
551
552         ipfix = VppCFLOW(
553             test=self, intf=self.intf1, layer="l3", direction=self.direction
554         )
555         ipfix.add_vpp_config()
556
557         ipfix_decoder = IPFIXDecoder()
558         # template packet should arrive immediately
559         templates = ipfix.verify_templates(ipfix_decoder, count=2)
560
561         self.create_stream(packets=1)
562         capture = self.send_packets()
563
564         # make sure the one packet we expect actually showed up
565         self.vapi.ipfix_flush()
566         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
567         self.verify_cflow_data_detail(
568             ipfix_decoder,
569             capture,
570             cflow,
571             {
572                 2: "packets",
573                 4: 17,
574                 8: "src_ip",
575                 12: "dst_ip",
576                 61: (self.direction == "tx"),
577             },
578         )
579
580         self.collector.get_capture(3)
581
582         ipfix.remove_vpp_config()
583         self.logger.info("FFP_TEST_FINISH_0002")
584
585     def test_L4onL2(self):
586         """L4 data on L2 datapath"""
587         self.logger.info("FFP_TEST_START_0003")
588         self.pg_enable_capture(self.pg_interfaces)
589         self.pkts = []
590
591         ipfix = VppCFLOW(
592             test=self, intf=self.intf1, layer="l4", direction=self.direction
593         )
594         ipfix.add_vpp_config()
595
596         ipfix_decoder = IPFIXDecoder()
597         # template packet should arrive immediately
598         templates = ipfix.verify_templates(ipfix_decoder, count=2)
599
600         self.create_stream(packets=1)
601         capture = self.send_packets()
602
603         # make sure the one packet we expect actually showed up
604         self.vapi.ipfix_flush()
605         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
606         self.verify_cflow_data_detail(
607             ipfix_decoder,
608             capture,
609             cflow,
610             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
611         )
612
613         self.collector.get_capture(3)
614
615         ipfix.remove_vpp_config()
616         self.logger.info("FFP_TEST_FINISH_0003")
617
618     def test_templatesIp4(self):
619         """verify templates on IP4 datapath"""
620         self.logger.info("FFP_TEST_START_0000")
621
622         self.pg_enable_capture(self.pg_interfaces)
623
624         ipfix = VppCFLOW(
625             test=self, intf=self.intf1, datapath="ip4", direction=self.direction
626         )
627         ipfix.add_vpp_config()
628
629         # template packet should arrive immediately
630         self.vapi.ipfix_flush()
631         ipfix.verify_templates(timeout=3, count=1)
632         self.collector.get_capture(1)
633
634         ipfix.remove_vpp_config()
635
636         self.logger.info("FFP_TEST_FINISH_0000")
637
638     def test_L2onIP4(self):
639         """L2 data on IP4 datapath"""
640         self.logger.info("FFP_TEST_START_0001")
641         self.pg_enable_capture(self.pg_interfaces)
642         self.pkts = []
643
644         ipfix = VppCFLOW(
645             test=self,
646             intf=self.intf2,
647             layer="l2",
648             datapath="ip4",
649             direction=self.direction,
650         )
651         ipfix.add_vpp_config()
652
653         ipfix_decoder = IPFIXDecoder()
654         # template packet should arrive immediately
655         templates = ipfix.verify_templates(ipfix_decoder, count=1)
656
657         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
658         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
659
660         # make sure the one packet we expect actually showed up
661         self.vapi.ipfix_flush()
662         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
663         self.verify_cflow_data_detail(
664             ipfix_decoder,
665             capture,
666             cflow,
667             {2: "packets", 256: 8, 61: (self.direction == "tx")},
668         )
669
670         # expected two templates and one cflow packet
671         self.collector.get_capture(2)
672
673         ipfix.remove_vpp_config()
674         self.logger.info("FFP_TEST_FINISH_0001")
675
676     def test_L3onIP4(self):
677         """L3 data on IP4 datapath"""
678         self.logger.info("FFP_TEST_START_0002")
679         self.pg_enable_capture(self.pg_interfaces)
680         self.pkts = []
681
682         ipfix = VppCFLOW(
683             test=self,
684             intf=self.intf2,
685             layer="l3",
686             datapath="ip4",
687             direction=self.direction,
688         )
689         ipfix.add_vpp_config()
690
691         ipfix_decoder = IPFIXDecoder()
692         # template packet should arrive immediately
693         templates = ipfix.verify_templates(ipfix_decoder, count=1)
694
695         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
696         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
697
698         # make sure the one packet we expect actually showed up
699         self.vapi.ipfix_flush()
700         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
701         self.verify_cflow_data_detail(
702             ipfix_decoder,
703             capture,
704             cflow,
705             {
706                 1: "octets",
707                 2: "packets",
708                 8: "src_ip",
709                 12: "dst_ip",
710                 61: (self.direction == "tx"),
711             },
712         )
713
714         # expected two templates and one cflow packet
715         self.collector.get_capture(2)
716
717         ipfix.remove_vpp_config()
718         self.logger.info("FFP_TEST_FINISH_0002")
719
720     def test_L4onIP4(self):
721         """L4 data on IP4 datapath"""
722         self.logger.info("FFP_TEST_START_0003")
723         self.pg_enable_capture(self.pg_interfaces)
724         self.pkts = []
725
726         ipfix = VppCFLOW(
727             test=self,
728             intf=self.intf2,
729             layer="l4",
730             datapath="ip4",
731             direction=self.direction,
732         )
733         ipfix.add_vpp_config()
734
735         ipfix_decoder = IPFIXDecoder()
736         # template packet should arrive immediately
737         templates = ipfix.verify_templates(ipfix_decoder, count=1)
738
739         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
740         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
741
742         # make sure the one packet we expect actually showed up
743         self.vapi.ipfix_flush()
744         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
745         self.verify_cflow_data_detail(
746             ipfix_decoder,
747             capture,
748             cflow,
749             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
750         )
751
752         # expected two templates and one cflow packet
753         self.collector.get_capture(2)
754
755         ipfix.remove_vpp_config()
756         self.logger.info("FFP_TEST_FINISH_0003")
757
758     def test_templatesIP6(self):
759         """verify templates on IP6 datapath"""
760         self.logger.info("FFP_TEST_START_0000")
761         self.pg_enable_capture(self.pg_interfaces)
762
763         ipfix = VppCFLOW(
764             test=self, intf=self.intf1, datapath="ip6", direction=self.direction
765         )
766         ipfix.add_vpp_config()
767
768         # template packet should arrive immediately
769         ipfix.verify_templates(count=1)
770         self.collector.get_capture(1)
771
772         ipfix.remove_vpp_config()
773
774         self.logger.info("FFP_TEST_FINISH_0000")
775
776     def test_L2onIP6(self):
777         """L2 data on IP6 datapath"""
778         self.logger.info("FFP_TEST_START_0001")
779         self.pg_enable_capture(self.pg_interfaces)
780         self.pkts = []
781
782         ipfix = VppCFLOW(
783             test=self,
784             intf=self.intf3,
785             layer="l2",
786             datapath="ip6",
787             direction=self.direction,
788         )
789         ipfix.add_vpp_config()
790
791         ipfix_decoder = IPFIXDecoder()
792         # template packet should arrive immediately
793         templates = ipfix.verify_templates(ipfix_decoder, count=1)
794
795         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
796         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
797
798         # make sure the one packet we expect actually showed up
799         self.vapi.ipfix_flush()
800         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
801         self.verify_cflow_data_detail(
802             ipfix_decoder,
803             capture,
804             cflow,
805             {2: "packets", 256: 56710, 61: (self.direction == "tx")},
806             ip_ver="v6",
807         )
808
809         # expected two templates and one cflow packet
810         self.collector.get_capture(2)
811
812         ipfix.remove_vpp_config()
813         self.logger.info("FFP_TEST_FINISH_0001")
814
815     def test_L3onIP6(self):
816         """L3 data on IP6 datapath"""
817         self.logger.info("FFP_TEST_START_0002")
818         self.pg_enable_capture(self.pg_interfaces)
819         self.pkts = []
820
821         ipfix = VppCFLOW(
822             test=self,
823             intf=self.intf3,
824             layer="l3",
825             datapath="ip6",
826             direction=self.direction,
827         )
828         ipfix.add_vpp_config()
829
830         ipfix_decoder = IPFIXDecoder()
831         # template packet should arrive immediately
832         templates = ipfix.verify_templates(ipfix_decoder, count=1)
833
834         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
835         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
836
837         # make sure the one packet we expect actually showed up
838         self.vapi.ipfix_flush()
839         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
840         self.verify_cflow_data_detail(
841             ipfix_decoder,
842             capture,
843             cflow,
844             {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
845             ip_ver="v6",
846         )
847
848         # expected two templates and one cflow packet
849         self.collector.get_capture(2)
850
851         ipfix.remove_vpp_config()
852         self.logger.info("FFP_TEST_FINISH_0002")
853
854     def test_L4onIP6(self):
855         """L4 data on IP6 datapath"""
856         self.logger.info("FFP_TEST_START_0003")
857         self.pg_enable_capture(self.pg_interfaces)
858         self.pkts = []
859
860         ipfix = VppCFLOW(
861             test=self,
862             intf=self.intf3,
863             layer="l4",
864             datapath="ip6",
865             direction=self.direction,
866         )
867         ipfix.add_vpp_config()
868
869         ipfix_decoder = IPFIXDecoder()
870         # template packet should arrive immediately
871         templates = ipfix.verify_templates(ipfix_decoder, count=1)
872
873         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
874         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
875
876         # make sure the one packet we expect actually showed up
877         self.vapi.ipfix_flush()
878         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
879         self.verify_cflow_data_detail(
880             ipfix_decoder,
881             capture,
882             cflow,
883             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
884             ip_ver="v6",
885         )
886
887         # expected two templates and one cflow packet
888         self.collector.get_capture(2)
889
890         ipfix.remove_vpp_config()
891         self.logger.info("FFP_TEST_FINISH_0003")
892
893     def test_0001(self):
894         """no timers, one CFLOW packet, 9 Flows inside"""
895         self.logger.info("FFP_TEST_START_0001")
896         self.pg_enable_capture(self.pg_interfaces)
897         self.pkts = []
898
899         ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
900         ipfix.add_vpp_config()
901
902         ipfix_decoder = IPFIXDecoder()
903         # template packet should arrive immediately
904         templates = ipfix.verify_templates(ipfix_decoder)
905
906         self.create_stream(packets=9)
907         capture = self.send_packets()
908
909         # make sure the one packet we expect actually showed up
910         self.vapi.ipfix_flush()
911         cflow = self.wait_for_cflow_packet(self.collector, templates[1])
912         self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
913         self.collector.get_capture(4)
914
915         ipfix.remove_vpp_config()
916         self.logger.info("FFP_TEST_FINISH_0001")
917
918     def test_0002(self):
919         """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
920         self.logger.info("FFP_TEST_START_0002")
921         self.pg_enable_capture(self.pg_interfaces)
922         self.pkts = []
923
924         ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
925         ipfix.add_vpp_config()
926
927         ipfix_decoder = IPFIXDecoder()
928         # template packet should arrive immediately
929         self.vapi.ipfix_flush()
930         templates = ipfix.verify_templates(ipfix_decoder)
931
932         self.create_stream(packets=6)
933         capture = self.send_packets()
934
935         # make sure the one packet we expect actually showed up
936         cflows = []
937         self.vapi.ipfix_flush()
938         cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
939         cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
940         self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
941         self.collector.get_capture(5)
942
943         ipfix.remove_vpp_config()
944         self.logger.info("FFP_TEST_FINISH_0002")
945
946
947 @tag_fixme_vpp_workers
948 class DatapathTx(MethodHolder, DatapathTestsHolder):
949     """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
950
951     intf1 = "pg2"
952     intf2 = "pg4"
953     intf3 = "pg6"
954     direction = "tx"
955
956
957 @tag_fixme_vpp_workers
958 class DatapathRx(MethodHolder, DatapathTestsHolder):
959     """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
960
961     intf1 = "pg1"
962     intf2 = "pg3"
963     intf3 = "pg5"
964     direction = "rx"
965
966
967 @unittest.skipUnless(config.extended, "part of extended tests")
968 class DisableIPFIX(MethodHolder):
969     """Disable IPFIX"""
970
971     @classmethod
972     def setUpClass(cls):
973         super(DisableIPFIX, cls).setUpClass()
974
975     @classmethod
976     def tearDownClass(cls):
977         super(DisableIPFIX, cls).tearDownClass()
978
979     def test_0001(self):
980         """disable IPFIX after first packets"""
981         self.logger.info("FFP_TEST_START_0001")
982         self.pg_enable_capture(self.pg_interfaces)
983         self.pkts = []
984
985         ipfix = VppCFLOW(test=self)
986         ipfix.add_vpp_config()
987
988         ipfix_decoder = IPFIXDecoder()
989         # template packet should arrive immediately
990         templates = ipfix.verify_templates(ipfix_decoder)
991
992         self.create_stream()
993         self.send_packets()
994
995         # make sure the one packet we expect actually showed up
996         self.vapi.ipfix_flush()
997         self.wait_for_cflow_packet(self.collector, templates[1])
998         self.collector.get_capture(4)
999
1000         # disable IPFIX
1001         ipfix.disable_exporter()
1002         self.pg_enable_capture([self.collector])
1003
1004         self.send_packets()
1005
1006         # make sure no one packet arrived in 1 minute
1007         self.vapi.ipfix_flush()
1008         self.sleep(1, "wait before verifying no packets sent")
1009         self.collector.assert_nothing_captured()
1010
1011         ipfix.remove_vpp_config()
1012         self.logger.info("FFP_TEST_FINISH_0001")
1013
1014
1015 @unittest.skipUnless(config.extended, "part of extended tests")
1016 class ReenableIPFIX(MethodHolder):
1017     """Re-enable IPFIX"""
1018
1019     @classmethod
1020     def setUpClass(cls):
1021         super(ReenableIPFIX, cls).setUpClass()
1022
1023     @classmethod
1024     def tearDownClass(cls):
1025         super(ReenableIPFIX, cls).tearDownClass()
1026
1027     def test_0011(self):
1028         """disable IPFIX after first packets and re-enable after few packets"""
1029         self.logger.info("FFP_TEST_START_0001")
1030         self.pg_enable_capture(self.pg_interfaces)
1031         self.pkts = []
1032
1033         ipfix = VppCFLOW(test=self)
1034         ipfix.add_vpp_config()
1035
1036         ipfix_decoder = IPFIXDecoder()
1037         # template packet should arrive immediately
1038         templates = ipfix.verify_templates(ipfix_decoder)
1039
1040         self.create_stream(packets=5)
1041         self.send_packets()
1042
1043         # make sure the one packet we expect actually showed up
1044         self.vapi.ipfix_flush()
1045         self.wait_for_cflow_packet(self.collector, templates[1])
1046         self.collector.get_capture(4)
1047
1048         # disable IPFIX
1049         ipfix.disable_exporter()
1050         self.vapi.ipfix_flush()
1051         self.pg_enable_capture([self.collector])
1052
1053         self.send_packets()
1054
1055         # make sure no one packet arrived in active timer span
1056         self.vapi.ipfix_flush()
1057         self.sleep(1, "wait before verifying no packets sent")
1058         self.collector.assert_nothing_captured()
1059         self.pg2.get_capture(5)
1060
1061         # enable IPFIX
1062         ipfix.enable_exporter()
1063
1064         capture = self.collector.get_capture(4)
1065         nr_templates = 0
1066         nr_data = 0
1067         for p in capture:
1068             self.assertTrue(p.haslayer(IPFIX))
1069             if p.haslayer(Template):
1070                 nr_templates += 1
1071         self.assertTrue(nr_templates, 3)
1072         for p in capture:
1073             self.assertTrue(p.haslayer(IPFIX))
1074             if p.haslayer(Data):
1075                 nr_data += 1
1076         self.assertTrue(nr_templates, 1)
1077
1078         ipfix.remove_vpp_config()
1079         self.logger.info("FFP_TEST_FINISH_0001")
1080
1081
1082 @unittest.skipUnless(config.extended, "part of extended tests")
1083 class DisableFP(MethodHolder):
1084     """Disable Flowprobe feature"""
1085
1086     @classmethod
1087     def setUpClass(cls):
1088         super(DisableFP, cls).setUpClass()
1089
1090     @classmethod
1091     def tearDownClass(cls):
1092         super(DisableFP, cls).tearDownClass()
1093
1094     def test_0001(self):
1095         """disable flowprobe feature after first packets"""
1096         self.logger.info("FFP_TEST_START_0001")
1097         self.pg_enable_capture(self.pg_interfaces)
1098         self.pkts = []
1099         ipfix = VppCFLOW(test=self)
1100         ipfix.add_vpp_config()
1101
1102         ipfix_decoder = IPFIXDecoder()
1103         # template packet should arrive immediately
1104         templates = ipfix.verify_templates(ipfix_decoder)
1105
1106         self.create_stream()
1107         self.send_packets()
1108
1109         # make sure the one packet we expect actually showed up
1110         self.vapi.ipfix_flush()
1111         self.wait_for_cflow_packet(self.collector, templates[1])
1112         self.collector.get_capture(4)
1113
1114         # disable IPFIX
1115         ipfix.disable_flowprobe_feature()
1116         self.pg_enable_capture([self.collector])
1117
1118         self.send_packets()
1119
1120         # make sure no one packet arrived in active timer span
1121         self.vapi.ipfix_flush()
1122         self.sleep(1, "wait before verifying no packets sent")
1123         self.collector.assert_nothing_captured()
1124
1125         ipfix.remove_vpp_config()
1126         self.logger.info("FFP_TEST_FINISH_0001")
1127
1128
1129 @unittest.skipUnless(config.extended, "part of extended tests")
1130 class ReenableFP(MethodHolder):
1131     """Re-enable Flowprobe feature"""
1132
1133     @classmethod
1134     def setUpClass(cls):
1135         super(ReenableFP, cls).setUpClass()
1136
1137     @classmethod
1138     def tearDownClass(cls):
1139         super(ReenableFP, cls).tearDownClass()
1140
1141     def test_0001(self):
1142         """disable flowprobe feature after first packets and re-enable
1143         after few packets"""
1144         self.logger.info("FFP_TEST_START_0001")
1145         self.pg_enable_capture(self.pg_interfaces)
1146         self.pkts = []
1147
1148         ipfix = VppCFLOW(test=self)
1149         ipfix.add_vpp_config()
1150
1151         ipfix_decoder = IPFIXDecoder()
1152         # template packet should arrive immediately
1153         self.vapi.ipfix_flush()
1154         templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1155
1156         self.create_stream()
1157         self.send_packets()
1158
1159         # make sure the one packet we expect actually showed up
1160         self.vapi.ipfix_flush()
1161         self.wait_for_cflow_packet(self.collector, templates[1], 5)
1162         self.collector.get_capture(4)
1163
1164         # disable FPP feature
1165         ipfix.disable_flowprobe_feature()
1166         self.pg_enable_capture([self.collector])
1167
1168         self.send_packets()
1169
1170         # make sure no one packet arrived in active timer span
1171         self.vapi.ipfix_flush()
1172         self.sleep(5, "wait before verifying no packets sent")
1173         self.collector.assert_nothing_captured()
1174
1175         # enable FPP feature
1176         ipfix.enable_flowprobe_feature()
1177         self.vapi.ipfix_flush()
1178         templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1179
1180         self.send_packets()
1181
1182         # make sure the next packets (templates and data) we expect actually
1183         # showed up
1184         self.vapi.ipfix_flush()
1185         self.wait_for_cflow_packet(self.collector, templates[1], 5)
1186         self.collector.get_capture(4)
1187
1188         ipfix.remove_vpp_config()
1189         self.logger.info("FFP_TEST_FINISH_0001")
1190
1191
1192 if __name__ == "__main__":
1193     unittest.main(testRunner=VppTestRunner)