tls: propagate transport closed notifications
[vpp.git] / test / test_flowprobe.py
1 #!/usr/bin/env python3
2 from __future__ import print_function
3 import binascii
4 import random
5 import socket
6 import unittest
7 import time
8
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, TCP, UDP
12 from scapy.layers.inet6 import IPv6
13 from scapy.contrib.lacp import SlowProtocol, LACP
14
15 from config import config
16 from framework import VppTestCase
17 from asfframework import (
18     tag_fixme_vpp_workers,
19     tag_fixme_ubuntu2204,
20     tag_fixme_debian11,
21     tag_run_solo,
22     is_distro_ubuntu2204,
23     is_distro_debian11,
24     VppTestRunner,
25 )
26 from vpp_object import VppObject
27 from util import ppp
28 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
29 from vpp_ip_route import VppIpRoute, VppRoutePath
30 from vpp_papi.macaddress import mac_ntop
31 from socket import inet_ntop
32 from vpp_papi import VppEnum
33
34
35 TMPL_COMMON_FIELD_COUNT = 6
36 TMPL_L2_FIELD_COUNT = 3
37 TMPL_L3_FIELD_COUNT = 4
38 TMPL_L4_FIELD_COUNT = 3
39
40 IPFIX_TCP_FLAGS_ID = 6
41 IPFIX_SRC_TRANS_PORT_ID = 7
42 IPFIX_DST_TRANS_PORT_ID = 11
43
44 TCP_F_FIN = 0x01
45 TCP_F_SYN = 0x02
46 TCP_F_RST = 0x04
47 TCP_F_PSH = 0x08
48 TCP_F_ACK = 0x10
49 TCP_F_URG = 0x20
50 TCP_F_ECE = 0x40
51 TCP_F_CWR = 0x80
52
53
54 class VppCFLOW(VppObject):
55     """CFLOW object for IPFIX exporter and Flowprobe feature"""
56
57     def __init__(
58         self,
59         test,
60         intf="pg2",
61         active=0,
62         passive=0,
63         timeout=100,
64         mtu=1024,
65         datapath="l2",
66         layer="l2 l3 l4",
67         direction="tx",
68     ):
69         self._test = test
70         self._intf = intf
71         self._intf_obj = getattr(self._test, intf)
72         self._active = active
73         if passive == 0 or passive < active:
74             self._passive = active + 1
75         else:
76             self._passive = passive
77         self._datapath = datapath  # l2 ip4 ip6
78         self._collect = layer  # l2 l3 l4
79         self._direction = direction  # rx tx both
80         self._timeout = timeout
81         self._mtu = mtu
82         self._configured = False
83
84     def add_vpp_config(self):
85         self.enable_exporter()
86         l2_flag = 0
87         l3_flag = 0
88         l4_flag = 0
89         if "l2" in self._collect.lower():
90             l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
91         if "l3" in self._collect.lower():
92             l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
93         if "l4" in self._collect.lower():
94             l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
95         self._test.vapi.flowprobe_set_params(
96             record_flags=(l2_flag | l3_flag | l4_flag),
97             active_timer=self._active,
98             passive_timer=self._passive,
99         )
100         self.enable_flowprobe_feature()
101         self._test.vapi.cli("ipfix flush")
102         self._configured = True
103
104     def remove_vpp_config(self):
105         self.disable_exporter()
106         self.disable_flowprobe_feature()
107         self._test.vapi.cli("ipfix flush")
108         self._configured = False
109
110     def enable_exporter(self):
111         self._test.vapi.set_ipfix_exporter(
112             collector_address=self._test.pg0.remote_ip4,
113             src_address=self._test.pg0.local_ip4,
114             path_mtu=self._mtu,
115             template_interval=self._timeout,
116         )
117
118     def _enable_disable_flowprobe_feature(self, is_add):
119         which_map = {
120             "l2": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2,
121             "ip4": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4,
122             "ip6": VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6,
123         }
124         direction_map = {
125             "rx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
126             "tx": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
127             "both": VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
128         }
129         self._test.vapi.flowprobe_interface_add_del(
130             is_add=is_add,
131             which=which_map[self._datapath],
132             direction=direction_map[self._direction],
133             sw_if_index=self._intf_obj.sw_if_index,
134         )
135
136     def enable_flowprobe_feature(self):
137         self._enable_disable_flowprobe_feature(is_add=True)
138
139     def disable_exporter(self):
140         self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
141
142     def disable_flowprobe_feature(self):
143         self._enable_disable_flowprobe_feature(is_add=False)
144
145     def object_id(self):
146         return "ipfix-collector-%s-%s" % (self._src, self.dst)
147
148     def query_vpp_config(self):
149         return self._configured
150
151     def verify_templates(self, decoder=None, timeout=1, count=3, field_count_in=None):
152         templates = []
153         self._test.assertIn(count, (1, 2, 3))
154         for _ in range(count):
155             p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
156             self._test.assertTrue(p.haslayer(IPFIX))
157             self._test.assertTrue(p.haslayer(Template))
158             if decoder is not None:
159                 templates.append(p[Template].templateID)
160                 decoder.add_template(p.getlayer(Template))
161             if field_count_in is not None:
162                 self._test.assertIn(p[Template].fieldCount, field_count_in)
163         return templates
164
165
166 class MethodHolder(VppTestCase):
167     """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
168
169     # Test variables
170     debug_print = False
171     max_number_of_packets = 10
172     pkts = []
173
174     @classmethod
175     def setUpClass(cls):
176         """
177         Perform standard class setup (defined by class method setUpClass in
178         class VppTestCase) before running the test case, set test case related
179         variables and configure VPP.
180         """
181         super(MethodHolder, cls).setUpClass()
182         if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
183             cls, "vpp"
184         ):
185             return
186         try:
187             # Create pg interfaces
188             cls.create_pg_interfaces(range(9))
189
190             # Packet sizes
191             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
192
193             # Create BD with MAC learning and unknown unicast flooding disabled
194             # and put interfaces to this BD
195             cls.vapi.bridge_domain_add_del_v2(
196                 bd_id=1, uu_flood=1, learn=1, flood=1, forward=1, is_add=1
197             )
198             cls.vapi.sw_interface_set_l2_bridge(
199                 rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
200             )
201             cls.vapi.sw_interface_set_l2_bridge(
202                 rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
203             )
204
205             # Set up all interfaces
206             for i in cls.pg_interfaces:
207                 i.admin_up()
208
209             cls.pg0.config_ip4()
210             cls.pg0.configure_ipv4_neighbors()
211             cls.collector = cls.pg0
212
213             cls.pg1.config_ip4()
214             cls.pg1.resolve_arp()
215             cls.pg2.config_ip4()
216             cls.pg2.resolve_arp()
217             cls.pg3.config_ip4()
218             cls.pg3.resolve_arp()
219             cls.pg4.config_ip4()
220             cls.pg4.resolve_arp()
221             cls.pg7.config_ip4()
222             cls.pg8.config_ip4()
223             cls.pg8.configure_ipv4_neighbors()
224
225             cls.pg5.config_ip6()
226             cls.pg5.resolve_ndp()
227             cls.pg5.disable_ipv6_ra()
228             cls.pg6.config_ip6()
229             cls.pg6.resolve_ndp()
230             cls.pg6.disable_ipv6_ra()
231         except Exception:
232             super(MethodHolder, cls).tearDownClass()
233             raise
234
235     @classmethod
236     def tearDownClass(cls):
237         super(MethodHolder, cls).tearDownClass()
238
239     def create_stream(
240         self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
241     ):
242         """Create a packet stream to tickle the plugin
243
244         :param VppInterface src_if: Source interface for packet stream
245         :param VppInterface src_if: Dst interface for packet stream
246         """
247         if src_if is None:
248             src_if = self.pg1
249         if dst_if is None:
250             dst_if = self.pg2
251         self.pkts = []
252         if packets is None:
253             packets = random.randint(1, self.max_number_of_packets)
254         pkt_size = size
255         for p in range(0, packets):
256             if size is None:
257                 pkt_size = random.choice(self.pg_if_packet_sizes)
258             info = self.create_packet_info(src_if, dst_if)
259             payload = self.info_to_payload(info)
260             p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
261             if ip_ver == "v4":
262                 p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
263             else:
264                 p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
265             p /= UDP(sport=1234, dport=4321)
266             p /= Raw(payload)
267             info.data = p.copy()
268             self.extend_packet(p, pkt_size)
269             self.pkts.append(p)
270
271     def verify_cflow_data(self, decoder, capture, cflow):
272         octets = 0
273         packets = 0
274         for p in capture:
275             octets += p[IP].len
276             packets += 1
277         if cflow.haslayer(Data):
278             data = decoder.decode_data_set(cflow.getlayer(Set))
279             for record in data:
280                 self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
281                 self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
282
283     def send_packets(self, src_if=None, dst_if=None):
284         if src_if is None:
285             src_if = self.pg1
286         if dst_if is None:
287             dst_if = self.pg2
288         self.pg_enable_capture([dst_if])
289         src_if.add_stream(self.pkts)
290         self.pg_start()
291         return dst_if.get_capture(len(self.pkts))
292
293     def verify_cflow_data_detail(
294         self,
295         decoder,
296         capture,
297         cflow,
298         data_set={1: "octets", 2: "packets"},
299         ip_ver="v4",
300         field_count=None,
301     ):
302         if self.debug_print:
303             print(capture[0].show())
304         if cflow.haslayer(Data):
305             data = decoder.decode_data_set(cflow.getlayer(Set))
306             if self.debug_print:
307                 print(data)
308             if ip_ver == "v4":
309                 ip_layer = capture[0][IP] if capture[0].haslayer(IP) else None
310             else:
311                 ip_layer = capture[0][IPv6] if capture[0].haslayer(IPv6) else None
312             if data_set is not None:
313                 for record in data:
314                     # skip flow if ingress/egress interface is 0
315                     if int(binascii.hexlify(record[10]), 16) == 0:
316                         continue
317                     if int(binascii.hexlify(record[14]), 16) == 0:
318                         continue
319
320                     for field in data_set:
321                         value = data_set[field]
322                         if value == "octets":
323                             value = ip_layer.len
324                             if ip_ver == "v6":
325                                 value += 40  # ??? is this correct
326                         elif value == "packets":
327                             value = 1
328                         elif value == "src_ip":
329                             if ip_ver == "v4":
330                                 ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
331                             else:
332                                 ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
333                             value = int(binascii.hexlify(ip), 16)
334                         elif value == "dst_ip":
335                             if ip_ver == "v4":
336                                 ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
337                             else:
338                                 ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
339                             value = int(binascii.hexlify(ip), 16)
340                         elif value == "sport":
341                             value = int(capture[0][UDP].sport)
342                         elif value == "dport":
343                             value = int(capture[0][UDP].dport)
344                         self.assertEqual(
345                             int(binascii.hexlify(record[field]), 16), value
346                         )
347             if field_count is not None:
348                 for record in data:
349                     self.assertEqual(len(record), field_count)
350
351     def verify_cflow_data_notimer(self, decoder, capture, cflows):
352         idx = 0
353         for cflow in cflows:
354             if cflow.haslayer(Data):
355                 data = decoder.decode_data_set(cflow.getlayer(Set))
356             else:
357                 raise Exception("No CFLOW data")
358
359             for rec in data:
360                 p = capture[idx]
361                 idx += 1
362                 self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
363                 self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
364         self.assertEqual(len(capture), idx)
365
366     def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
367         """wait for CFLOW packet and verify its correctness
368
369         :param timeout: how long to wait
370
371         """
372         self.logger.info("IPFIX: Waiting for CFLOW packet")
373         # self.logger.debug(self.vapi.ppcli("show flow table"))
374         p = collector_intf.wait_for_packet(timeout=timeout)
375         self.assertEqual(p[Set].setID, set_id)
376         # self.logger.debug(self.vapi.ppcli("show flow table"))
377         self.logger.debug(ppp("IPFIX: Got packet:", p))
378         return p
379
380
381 @tag_run_solo
382 @tag_fixme_vpp_workers
383 @tag_fixme_ubuntu2204
384 @tag_fixme_debian11
385 class Flowprobe(MethodHolder):
386     """Template verification, timer tests"""
387
388     @classmethod
389     def setUpClass(cls):
390         super(Flowprobe, cls).setUpClass()
391
392     @classmethod
393     def tearDownClass(cls):
394         super(Flowprobe, cls).tearDownClass()
395
396     def test_0001(self):
397         """timer less than template timeout"""
398         self.logger.info("FFP_TEST_START_0001")
399         self.pg_enable_capture(self.pg_interfaces)
400         self.pkts = []
401
402         ipfix = VppCFLOW(test=self, active=2)
403         ipfix.add_vpp_config()
404
405         ipfix_decoder = IPFIXDecoder()
406         # template packet should arrive immediately
407         templates = ipfix.verify_templates(ipfix_decoder)
408
409         self.create_stream(packets=1)
410         self.send_packets()
411         capture = self.pg2.get_capture(1)
412
413         # make sure the one packet we expect actually showed up
414         cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
415         self.verify_cflow_data(ipfix_decoder, capture, cflow)
416
417         ipfix.remove_vpp_config()
418         self.logger.info("FFP_TEST_FINISH_0001")
419
420     def test_0002(self):
421         """timer greater than template timeout"""
422         self.logger.info("FFP_TEST_START_0002")
423         self.pg_enable_capture(self.pg_interfaces)
424         self.pkts = []
425
426         ipfix = VppCFLOW(test=self, timeout=3, active=4)
427         ipfix.add_vpp_config()
428
429         ipfix_decoder = IPFIXDecoder()
430         # template packet should arrive immediately
431         ipfix.verify_templates()
432
433         self.create_stream(packets=2)
434         self.send_packets()
435         capture = self.pg2.get_capture(2)
436
437         # next set of template packet should arrive after 20 seconds
438         # template packet should arrive within 20 s
439         templates = ipfix.verify_templates(ipfix_decoder, timeout=5)
440
441         # make sure the one packet we expect actually showed up
442         cflow = self.wait_for_cflow_packet(self.collector, templates[1], 15)
443         self.verify_cflow_data(ipfix_decoder, capture, cflow)
444
445         ipfix.remove_vpp_config()
446         self.logger.info("FFP_TEST_FINISH_0002")
447
448     def test_cflow_packet(self):
449         """verify cflow packet fields"""
450         self.logger.info("FFP_TEST_START_0000")
451         self.pg_enable_capture(self.pg_interfaces)
452         self.pkts = []
453
454         ipfix = VppCFLOW(
455             test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
456         )
457         ipfix.add_vpp_config()
458
459         route_9001 = VppIpRoute(
460             self,
461             "9.0.0.0",
462             24,
463             [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
464         )
465         route_9001.add_vpp_config()
466
467         ipfix_decoder = IPFIXDecoder()
468         templates = ipfix.verify_templates(ipfix_decoder, count=1)
469
470         self.pkts = [
471             (
472                 Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
473                 / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
474                 / TCP(sport=1234, dport=4321, flags=80)
475                 / Raw(b"\xa5" * 100)
476             )
477         ]
478
479         nowUTC = int(time.time())
480         nowUNIX = nowUTC + 2208988800
481         self.send_packets(src_if=self.pg7, dst_if=self.pg8)
482
483         cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
484         self.collector.get_capture(2)
485
486         if cflow[0].haslayer(IPFIX):
487             self.assertEqual(cflow[IPFIX].version, 10)
488             self.assertEqual(cflow[IPFIX].observationDomainID, 1)
489             self.assertEqual(cflow[IPFIX].sequenceNumber, 0)
490             self.assertAlmostEqual(cflow[IPFIX].exportTime, nowUTC, delta=5)
491         if cflow.haslayer(Data):
492             record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
493             # ingress interface
494             self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
495             # egress interface
496             self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
497             # direction
498             self.assertEqual(int(binascii.hexlify(record[61]), 16), 1)
499             # packets
500             self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
501             # src mac
502             self.assertEqual(mac_ntop(record[56]), self.pg8.local_mac)
503             # dst mac
504             self.assertEqual(mac_ntop(record[80]), self.pg8.remote_mac)
505             flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
506             # flow start timestamp
507             self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
508             flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
509             # flow end timestamp
510             self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
511             # ethernet type
512             self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
513             # src ip
514             self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
515             # dst ip
516             self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
517             # protocol (TCP)
518             self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
519             # src port
520             self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
521             # dst port
522             self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
523             # tcp flags
524             self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
525
526         ipfix.remove_vpp_config()
527         self.logger.info("FFP_TEST_FINISH_0000")
528
529     def test_flow_entry_reuse(self):
530         """Verify flow entry reuse doesn't accumulate meta info"""
531         self.pg_enable_capture(self.pg_interfaces)
532         self.pkts = []
533
534         # enable ip4 datapath for an interface
535         # set active and passive timers
536         ipfix = VppCFLOW(
537             test=self,
538             active=2,
539             passive=3,
540             intf="pg3",
541             layer="l3 l4",
542             datapath="ip4",
543             direction="rx",
544             mtu=100,
545         )
546         ipfix.add_vpp_config()
547
548         # template packet should arrive immediately
549         ipfix_decoder = IPFIXDecoder()
550         templates = ipfix.verify_templates(ipfix_decoder, count=1)
551
552         # make a tcp packet
553         self.pkts = [
554             (
555                 Ether(src=self.pg3.remote_mac, dst=self.pg4.local_mac)
556                 / IP(src=self.pg3.remote_ip4, dst=self.pg4.remote_ip4)
557                 / TCP(sport=1234, dport=4321)
558                 / Raw(b"\xa5" * 50)
559             )
560         ]
561
562         # send the tcp packet two times, each time with new set of flags
563         tcp_flags = (
564             TCP_F_SYN | TCP_F_ACK,
565             TCP_F_RST | TCP_F_PSH,
566         )
567         for f in tcp_flags:
568             self.pkts[0][TCP].flags = f
569             capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
570
571             # verify meta info - packet/octet delta and tcp flags
572             cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=6)
573             self.verify_cflow_data(ipfix_decoder, capture, cflow)
574             self.verify_cflow_data_detail(
575                 ipfix_decoder,
576                 capture,
577                 cflow,
578                 {
579                     IPFIX_TCP_FLAGS_ID: f,
580                     IPFIX_SRC_TRANS_PORT_ID: 1234,
581                     IPFIX_DST_TRANS_PORT_ID: 4321,
582                 },
583             )
584
585         self.collector.get_capture(3)
586
587         # cleanup
588         ipfix.remove_vpp_config()
589
590     def test_interface_dump(self):
591         """Dump interfaces with IPFIX flow record generation enabled"""
592         self.logger.info("FFP_TEST_START_0003")
593
594         # Enable feature for 3 interfaces
595         ipfix1 = VppCFLOW(test=self, intf="pg1", datapath="l2", direction="rx")
596         ipfix1.add_vpp_config()
597
598         ipfix2 = VppCFLOW(test=self, intf="pg2", datapath="ip4", direction="tx")
599         ipfix2.enable_flowprobe_feature()
600
601         ipfix3 = VppCFLOW(test=self, intf="pg3", datapath="ip6", direction="both")
602         ipfix3.enable_flowprobe_feature()
603
604         # When request "all", dump should contain all enabled interfaces
605         dump = self.vapi.flowprobe_interface_dump()
606         self.assertEqual(len(dump), 3)
607
608         # Verify 1st interface
609         self.assertEqual(dump[0].sw_if_index, self.pg1.sw_if_index)
610         self.assertEqual(
611             dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_L2
612         )
613         self.assertEqual(
614             dump[0].direction,
615             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_RX,
616         )
617
618         # Verify 2nd interface
619         self.assertEqual(dump[1].sw_if_index, self.pg2.sw_if_index)
620         self.assertEqual(
621             dump[1].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
622         )
623         self.assertEqual(
624             dump[1].direction,
625             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
626         )
627
628         # Verify 3rd interface
629         self.assertEqual(dump[2].sw_if_index, self.pg3.sw_if_index)
630         self.assertEqual(
631             dump[2].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP6
632         )
633         self.assertEqual(
634             dump[2].direction,
635             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_BOTH,
636         )
637
638         # When request 2nd interface, dump should contain only the specified interface
639         dump = self.vapi.flowprobe_interface_dump(sw_if_index=self.pg2.sw_if_index)
640         self.assertEqual(len(dump), 1)
641
642         # Verify 2nd interface
643         self.assertEqual(dump[0].sw_if_index, self.pg2.sw_if_index)
644         self.assertEqual(
645             dump[0].which, VppEnum.vl_api_flowprobe_which_t.FLOWPROBE_WHICH_IP4
646         )
647         self.assertEqual(
648             dump[0].direction,
649             VppEnum.vl_api_flowprobe_direction_t.FLOWPROBE_DIRECTION_TX,
650         )
651
652         # When request 99th interface, dump should be empty
653         dump = self.vapi.flowprobe_interface_dump(sw_if_index=99)
654         self.assertEqual(len(dump), 0)
655
656         ipfix1.remove_vpp_config()
657         ipfix2.remove_vpp_config()
658         ipfix3.remove_vpp_config()
659         self.logger.info("FFP_TEST_FINISH_0003")
660
661     def test_get_params(self):
662         """Get IPFIX flow record generation parameters"""
663         self.logger.info("FFP_TEST_START_0004")
664
665         # Enable feature for an interface with custom parameters
666         ipfix = VppCFLOW(test=self, active=20, passive=40, layer="l2 l3 l4")
667         ipfix.add_vpp_config()
668
669         # Get and verify parameters
670         params = self.vapi.flowprobe_get_params()
671         self.assertEqual(params.active_timer, 20)
672         self.assertEqual(params.passive_timer, 40)
673         record_flags = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
674         record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
675         record_flags |= VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
676         self.assertEqual(params.record_flags, record_flags)
677
678         ipfix.remove_vpp_config()
679         self.logger.info("FFP_TEST_FINISH_0004")
680
681
682 class DatapathTestsHolder(object):
683     """collect information on Ethernet, IP4 and IP6 datapath (no timers)"""
684
685     @classmethod
686     def setUpClass(cls):
687         super(DatapathTestsHolder, cls).setUpClass()
688
689     @classmethod
690     def tearDownClass(cls):
691         super(DatapathTestsHolder, cls).tearDownClass()
692
693     def test_templatesL2(self):
694         """verify template on L2 datapath"""
695         self.logger.info("FFP_TEST_START_0000")
696         self.pg_enable_capture(self.pg_interfaces)
697
698         ipfix = VppCFLOW(
699             test=self, intf=self.intf1, layer="l2", direction=self.direction
700         )
701         ipfix.add_vpp_config()
702
703         # template packet should arrive immediately
704         self.vapi.ipfix_flush()
705         ipfix.verify_templates(timeout=3, count=1)
706         self.collector.get_capture(1)
707
708         ipfix.remove_vpp_config()
709         self.logger.info("FFP_TEST_FINISH_0000")
710
711     def test_L2onL2(self):
712         """L2 data on L2 datapath"""
713         self.logger.info("FFP_TEST_START_0001")
714         self.pg_enable_capture(self.pg_interfaces)
715         self.pkts = []
716
717         ipfix = VppCFLOW(
718             test=self, intf=self.intf1, layer="l2", direction=self.direction
719         )
720         ipfix.add_vpp_config()
721
722         ipfix_decoder = IPFIXDecoder()
723         # template packet should arrive immediately
724         templates = ipfix.verify_templates(ipfix_decoder, count=1)
725
726         self.create_stream(packets=1)
727         capture = self.send_packets()
728
729         # make sure the one packet we expect actually showed up
730         self.vapi.ipfix_flush()
731         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
732         self.verify_cflow_data_detail(
733             ipfix_decoder,
734             capture,
735             cflow,
736             {2: "packets", 256: 8, 61: (self.direction == "tx")},
737         )
738         self.collector.get_capture(2)
739
740         ipfix.remove_vpp_config()
741         self.logger.info("FFP_TEST_FINISH_0001")
742
743     def test_L3onL2(self):
744         """L3 data on L2 datapath"""
745         self.logger.info("FFP_TEST_START_0002")
746         self.pg_enable_capture(self.pg_interfaces)
747         self.pkts = []
748
749         ipfix = VppCFLOW(
750             test=self, intf=self.intf1, layer="l3", direction=self.direction
751         )
752         ipfix.add_vpp_config()
753
754         ipfix_decoder = IPFIXDecoder()
755         # template packet should arrive immediately
756         templates = ipfix.verify_templates(ipfix_decoder, count=2)
757
758         self.create_stream(packets=1)
759         capture = self.send_packets()
760
761         # make sure the one packet we expect actually showed up
762         self.vapi.ipfix_flush()
763         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
764         self.verify_cflow_data_detail(
765             ipfix_decoder,
766             capture,
767             cflow,
768             {
769                 2: "packets",
770                 4: 17,
771                 8: "src_ip",
772                 12: "dst_ip",
773                 61: (self.direction == "tx"),
774             },
775         )
776
777         self.collector.get_capture(3)
778
779         ipfix.remove_vpp_config()
780         self.logger.info("FFP_TEST_FINISH_0002")
781
782     def test_L234onL2(self):
783         """L2/3/4 data on L2 datapath"""
784         self.pg_enable_capture(self.pg_interfaces)
785         self.pkts = []
786
787         ipfix = VppCFLOW(
788             test=self, intf=self.intf1, layer="l2 l3 l4", direction=self.direction
789         )
790         ipfix.add_vpp_config()
791
792         ipfix_decoder = IPFIXDecoder()
793         # template packet should arrive immediately
794         tmpl_l2_field_count = TMPL_COMMON_FIELD_COUNT + TMPL_L2_FIELD_COUNT
795         tmpl_ip_field_count = (
796             TMPL_COMMON_FIELD_COUNT
797             + TMPL_L2_FIELD_COUNT
798             + TMPL_L3_FIELD_COUNT
799             + TMPL_L4_FIELD_COUNT
800         )
801         templates = ipfix.verify_templates(
802             ipfix_decoder,
803             count=3,
804             field_count_in=(tmpl_l2_field_count, tmpl_ip_field_count),
805         )
806
807         # verify IPv4 and IPv6 flows
808         for ip_ver in ("v4", "v6"):
809             self.create_stream(packets=1, ip_ver=ip_ver)
810             capture = self.send_packets()
811
812             # make sure the one packet we expect actually showed up
813             self.vapi.ipfix_flush()
814             cflow = self.wait_for_cflow_packet(
815                 self.collector, templates[1 if ip_ver == "v4" else 2]
816             )
817             src_ip_id = 8 if ip_ver == "v4" else 27
818             dst_ip_id = 12 if ip_ver == "v4" else 28
819             self.verify_cflow_data_detail(
820                 ipfix_decoder,
821                 capture,
822                 cflow,
823                 {
824                     2: "packets",
825                     256: 8 if ip_ver == "v4" else 56710,
826                     4: 17,
827                     7: "sport",
828                     11: "dport",
829                     src_ip_id: "src_ip",
830                     dst_ip_id: "dst_ip",
831                     61: (self.direction == "tx"),
832                 },
833                 ip_ver=ip_ver,
834                 field_count=tmpl_ip_field_count,
835             )
836
837         # verify non-IP flow
838         self.pkts = [
839             (
840                 Ether(dst=self.pg2.local_mac, src=self.pg1.remote_mac)
841                 / SlowProtocol()
842                 / LACP()
843             )
844         ]
845         capture = self.send_packets()
846
847         # make sure the one packet we expect actually showed up
848         self.vapi.ipfix_flush()
849         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
850         self.verify_cflow_data_detail(
851             ipfix_decoder,
852             capture,
853             cflow,
854             {2: "packets", 256: 2440, 61: (self.direction == "tx")},
855             field_count=tmpl_l2_field_count,
856         )
857
858         self.collector.get_capture(6)
859
860         ipfix.remove_vpp_config()
861
862     def test_L4onL2(self):
863         """L4 data on L2 datapath"""
864         self.logger.info("FFP_TEST_START_0003")
865         self.pg_enable_capture(self.pg_interfaces)
866         self.pkts = []
867
868         ipfix = VppCFLOW(
869             test=self, intf=self.intf1, layer="l4", direction=self.direction
870         )
871         ipfix.add_vpp_config()
872
873         ipfix_decoder = IPFIXDecoder()
874         # template packet should arrive immediately
875         templates = ipfix.verify_templates(ipfix_decoder, count=2)
876
877         self.create_stream(packets=1)
878         capture = self.send_packets()
879
880         # make sure the one packet we expect actually showed up
881         self.vapi.ipfix_flush()
882         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
883         self.verify_cflow_data_detail(
884             ipfix_decoder,
885             capture,
886             cflow,
887             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
888         )
889
890         self.collector.get_capture(3)
891
892         ipfix.remove_vpp_config()
893         self.logger.info("FFP_TEST_FINISH_0003")
894
895     def test_templatesIp4(self):
896         """verify templates on IP4 datapath"""
897         self.logger.info("FFP_TEST_START_0000")
898
899         self.pg_enable_capture(self.pg_interfaces)
900
901         ipfix = VppCFLOW(
902             test=self, intf=self.intf1, datapath="ip4", direction=self.direction
903         )
904         ipfix.add_vpp_config()
905
906         # template packet should arrive immediately
907         self.vapi.ipfix_flush()
908         ipfix.verify_templates(timeout=3, count=1)
909         self.collector.get_capture(1)
910
911         ipfix.remove_vpp_config()
912
913         self.logger.info("FFP_TEST_FINISH_0000")
914
915     def test_L2onIP4(self):
916         """L2 data on IP4 datapath"""
917         self.logger.info("FFP_TEST_START_0001")
918         self.pg_enable_capture(self.pg_interfaces)
919         self.pkts = []
920
921         ipfix = VppCFLOW(
922             test=self,
923             intf=self.intf2,
924             layer="l2",
925             datapath="ip4",
926             direction=self.direction,
927         )
928         ipfix.add_vpp_config()
929
930         ipfix_decoder = IPFIXDecoder()
931         # template packet should arrive immediately
932         templates = ipfix.verify_templates(ipfix_decoder, count=1)
933
934         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
935         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
936
937         # make sure the one packet we expect actually showed up
938         self.vapi.ipfix_flush()
939         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
940         self.verify_cflow_data_detail(
941             ipfix_decoder,
942             capture,
943             cflow,
944             {2: "packets", 256: 8, 61: (self.direction == "tx")},
945         )
946
947         # expected two templates and one cflow packet
948         self.collector.get_capture(2)
949
950         ipfix.remove_vpp_config()
951         self.logger.info("FFP_TEST_FINISH_0001")
952
953     def test_L3onIP4(self):
954         """L3 data on IP4 datapath"""
955         self.logger.info("FFP_TEST_START_0002")
956         self.pg_enable_capture(self.pg_interfaces)
957         self.pkts = []
958
959         ipfix = VppCFLOW(
960             test=self,
961             intf=self.intf2,
962             layer="l3",
963             datapath="ip4",
964             direction=self.direction,
965         )
966         ipfix.add_vpp_config()
967
968         ipfix_decoder = IPFIXDecoder()
969         # template packet should arrive immediately
970         templates = ipfix.verify_templates(ipfix_decoder, count=1)
971
972         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
973         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
974
975         # make sure the one packet we expect actually showed up
976         self.vapi.ipfix_flush()
977         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
978         self.verify_cflow_data_detail(
979             ipfix_decoder,
980             capture,
981             cflow,
982             {
983                 1: "octets",
984                 2: "packets",
985                 8: "src_ip",
986                 12: "dst_ip",
987                 61: (self.direction == "tx"),
988             },
989         )
990
991         # expected two templates and one cflow packet
992         self.collector.get_capture(2)
993
994         ipfix.remove_vpp_config()
995         self.logger.info("FFP_TEST_FINISH_0002")
996
997     def test_L4onIP4(self):
998         """L4 data on IP4 datapath"""
999         self.logger.info("FFP_TEST_START_0003")
1000         self.pg_enable_capture(self.pg_interfaces)
1001         self.pkts = []
1002
1003         ipfix = VppCFLOW(
1004             test=self,
1005             intf=self.intf2,
1006             layer="l4",
1007             datapath="ip4",
1008             direction=self.direction,
1009         )
1010         ipfix.add_vpp_config()
1011
1012         ipfix_decoder = IPFIXDecoder()
1013         # template packet should arrive immediately
1014         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1015
1016         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1)
1017         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1018
1019         # make sure the one packet we expect actually showed up
1020         self.vapi.ipfix_flush()
1021         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1022         self.verify_cflow_data_detail(
1023             ipfix_decoder,
1024             capture,
1025             cflow,
1026             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
1027         )
1028
1029         # expected two templates and one cflow packet
1030         self.collector.get_capture(2)
1031
1032         ipfix.remove_vpp_config()
1033         self.logger.info("FFP_TEST_FINISH_0003")
1034
1035     def test_templatesIP6(self):
1036         """verify templates on IP6 datapath"""
1037         self.logger.info("FFP_TEST_START_0000")
1038         self.pg_enable_capture(self.pg_interfaces)
1039
1040         ipfix = VppCFLOW(
1041             test=self, intf=self.intf1, datapath="ip6", direction=self.direction
1042         )
1043         ipfix.add_vpp_config()
1044
1045         # template packet should arrive immediately
1046         ipfix.verify_templates(count=1)
1047         self.collector.get_capture(1)
1048
1049         ipfix.remove_vpp_config()
1050
1051         self.logger.info("FFP_TEST_FINISH_0000")
1052
1053     def test_L2onIP6(self):
1054         """L2 data on IP6 datapath"""
1055         self.logger.info("FFP_TEST_START_0001")
1056         self.pg_enable_capture(self.pg_interfaces)
1057         self.pkts = []
1058
1059         ipfix = VppCFLOW(
1060             test=self,
1061             intf=self.intf3,
1062             layer="l2",
1063             datapath="ip6",
1064             direction=self.direction,
1065         )
1066         ipfix.add_vpp_config()
1067
1068         ipfix_decoder = IPFIXDecoder()
1069         # template packet should arrive immediately
1070         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1071
1072         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1073         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1074
1075         # make sure the one packet we expect actually showed up
1076         self.vapi.ipfix_flush()
1077         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1078         self.verify_cflow_data_detail(
1079             ipfix_decoder,
1080             capture,
1081             cflow,
1082             {2: "packets", 256: 56710, 61: (self.direction == "tx")},
1083             ip_ver="v6",
1084         )
1085
1086         # expected two templates and one cflow packet
1087         self.collector.get_capture(2)
1088
1089         ipfix.remove_vpp_config()
1090         self.logger.info("FFP_TEST_FINISH_0001")
1091
1092     def test_L3onIP6(self):
1093         """L3 data on IP6 datapath"""
1094         self.logger.info("FFP_TEST_START_0002")
1095         self.pg_enable_capture(self.pg_interfaces)
1096         self.pkts = []
1097
1098         ipfix = VppCFLOW(
1099             test=self,
1100             intf=self.intf3,
1101             layer="l3",
1102             datapath="ip6",
1103             direction=self.direction,
1104         )
1105         ipfix.add_vpp_config()
1106
1107         ipfix_decoder = IPFIXDecoder()
1108         # template packet should arrive immediately
1109         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1110
1111         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1112         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1113
1114         # make sure the one packet we expect actually showed up
1115         self.vapi.ipfix_flush()
1116         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1117         self.verify_cflow_data_detail(
1118             ipfix_decoder,
1119             capture,
1120             cflow,
1121             {2: "packets", 27: "src_ip", 28: "dst_ip", 61: (self.direction == "tx")},
1122             ip_ver="v6",
1123         )
1124
1125         # expected two templates and one cflow packet
1126         self.collector.get_capture(2)
1127
1128         ipfix.remove_vpp_config()
1129         self.logger.info("FFP_TEST_FINISH_0002")
1130
1131     def test_L4onIP6(self):
1132         """L4 data on IP6 datapath"""
1133         self.logger.info("FFP_TEST_START_0003")
1134         self.pg_enable_capture(self.pg_interfaces)
1135         self.pkts = []
1136
1137         ipfix = VppCFLOW(
1138             test=self,
1139             intf=self.intf3,
1140             layer="l4",
1141             datapath="ip6",
1142             direction=self.direction,
1143         )
1144         ipfix.add_vpp_config()
1145
1146         ipfix_decoder = IPFIXDecoder()
1147         # template packet should arrive immediately
1148         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1149
1150         self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
1151         capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
1152
1153         # make sure the one packet we expect actually showed up
1154         self.vapi.ipfix_flush()
1155         cflow = self.wait_for_cflow_packet(self.collector, templates[0])
1156         self.verify_cflow_data_detail(
1157             ipfix_decoder,
1158             capture,
1159             cflow,
1160             {2: "packets", 7: "sport", 11: "dport", 61: (self.direction == "tx")},
1161             ip_ver="v6",
1162         )
1163
1164         # expected two templates and one cflow packet
1165         self.collector.get_capture(2)
1166
1167         ipfix.remove_vpp_config()
1168         self.logger.info("FFP_TEST_FINISH_0003")
1169
1170     def test_0001(self):
1171         """no timers, one CFLOW packet, 9 Flows inside"""
1172         self.logger.info("FFP_TEST_START_0001")
1173         self.pg_enable_capture(self.pg_interfaces)
1174         self.pkts = []
1175
1176         ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction)
1177         ipfix.add_vpp_config()
1178
1179         ipfix_decoder = IPFIXDecoder()
1180         # template packet should arrive immediately
1181         templates = ipfix.verify_templates(ipfix_decoder)
1182
1183         self.create_stream(packets=9)
1184         capture = self.send_packets()
1185
1186         # make sure the one packet we expect actually showed up
1187         self.vapi.ipfix_flush()
1188         cflow = self.wait_for_cflow_packet(self.collector, templates[1])
1189         self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow])
1190         self.collector.get_capture(4)
1191
1192         ipfix.remove_vpp_config()
1193         self.logger.info("FFP_TEST_FINISH_0001")
1194
1195     def test_0002(self):
1196         """no timers, two CFLOW packets (mtu=260), 3 Flows in each"""
1197         self.logger.info("FFP_TEST_START_0002")
1198         self.pg_enable_capture(self.pg_interfaces)
1199         self.pkts = []
1200
1201         ipfix = VppCFLOW(test=self, intf=self.intf1, direction=self.direction, mtu=260)
1202         ipfix.add_vpp_config()
1203
1204         ipfix_decoder = IPFIXDecoder()
1205         # template packet should arrive immediately
1206         self.vapi.ipfix_flush()
1207         templates = ipfix.verify_templates(ipfix_decoder)
1208
1209         self.create_stream(packets=6)
1210         capture = self.send_packets()
1211
1212         # make sure the one packet we expect actually showed up
1213         cflows = []
1214         self.vapi.ipfix_flush()
1215         cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1216         cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
1217         self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
1218         self.collector.get_capture(5)
1219
1220         ipfix.remove_vpp_config()
1221         self.logger.info("FFP_TEST_FINISH_0002")
1222
1223
1224 @tag_fixme_vpp_workers
1225 class DatapathTx(MethodHolder, DatapathTestsHolder):
1226     """Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""
1227
1228     intf1 = "pg2"
1229     intf2 = "pg4"
1230     intf3 = "pg6"
1231     direction = "tx"
1232
1233
1234 @tag_fixme_vpp_workers
1235 class DatapathRx(MethodHolder, DatapathTestsHolder):
1236     """Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""
1237
1238     intf1 = "pg1"
1239     intf2 = "pg3"
1240     intf3 = "pg5"
1241     direction = "rx"
1242
1243
1244 @unittest.skipUnless(config.extended, "part of extended tests")
1245 class DisableIPFIX(MethodHolder):
1246     """Disable IPFIX"""
1247
1248     @classmethod
1249     def setUpClass(cls):
1250         super(DisableIPFIX, cls).setUpClass()
1251
1252     @classmethod
1253     def tearDownClass(cls):
1254         super(DisableIPFIX, cls).tearDownClass()
1255
1256     def test_0001(self):
1257         """disable IPFIX after first packets"""
1258         self.logger.info("FFP_TEST_START_0001")
1259         self.pg_enable_capture(self.pg_interfaces)
1260         self.pkts = []
1261
1262         ipfix = VppCFLOW(test=self)
1263         ipfix.add_vpp_config()
1264
1265         ipfix_decoder = IPFIXDecoder()
1266         # template packet should arrive immediately
1267         templates = ipfix.verify_templates(ipfix_decoder)
1268
1269         self.create_stream()
1270         self.send_packets()
1271
1272         # make sure the one packet we expect actually showed up
1273         self.vapi.ipfix_flush()
1274         self.wait_for_cflow_packet(self.collector, templates[1])
1275         self.collector.get_capture(4)
1276
1277         # disable IPFIX
1278         ipfix.disable_exporter()
1279         self.pg_enable_capture([self.collector])
1280
1281         self.send_packets()
1282
1283         # make sure no one packet arrived in 1 minute
1284         self.vapi.ipfix_flush()
1285         self.sleep(1, "wait before verifying no packets sent")
1286         self.collector.assert_nothing_captured()
1287
1288         ipfix.remove_vpp_config()
1289         self.logger.info("FFP_TEST_FINISH_0001")
1290
1291
1292 @unittest.skipUnless(config.extended, "part of extended tests")
1293 class ReenableIPFIX(MethodHolder):
1294     """Re-enable IPFIX"""
1295
1296     @classmethod
1297     def setUpClass(cls):
1298         super(ReenableIPFIX, cls).setUpClass()
1299
1300     @classmethod
1301     def tearDownClass(cls):
1302         super(ReenableIPFIX, cls).tearDownClass()
1303
1304     def test_0011(self):
1305         """disable IPFIX after first packets and re-enable after few packets"""
1306         self.logger.info("FFP_TEST_START_0001")
1307         self.pg_enable_capture(self.pg_interfaces)
1308         self.pkts = []
1309
1310         ipfix = VppCFLOW(test=self)
1311         ipfix.add_vpp_config()
1312
1313         ipfix_decoder = IPFIXDecoder()
1314         # template packet should arrive immediately
1315         templates = ipfix.verify_templates(ipfix_decoder)
1316
1317         self.create_stream(packets=5)
1318         self.send_packets()
1319
1320         # make sure the one packet we expect actually showed up
1321         self.vapi.ipfix_flush()
1322         self.wait_for_cflow_packet(self.collector, templates[1])
1323         self.collector.get_capture(4)
1324
1325         # disable IPFIX
1326         ipfix.disable_exporter()
1327         self.vapi.ipfix_flush()
1328         self.pg_enable_capture([self.collector])
1329
1330         self.send_packets()
1331
1332         # make sure no one packet arrived in active timer span
1333         self.vapi.ipfix_flush()
1334         self.sleep(1, "wait before verifying no packets sent")
1335         self.collector.assert_nothing_captured()
1336         self.pg2.get_capture(5)
1337
1338         # enable IPFIX
1339         ipfix.enable_exporter()
1340
1341         capture = self.collector.get_capture(4)
1342         nr_templates = 0
1343         nr_data = 0
1344         for p in capture:
1345             self.assertTrue(p.haslayer(IPFIX))
1346             if p.haslayer(Template):
1347                 nr_templates += 1
1348         self.assertTrue(nr_templates, 3)
1349         for p in capture:
1350             self.assertTrue(p.haslayer(IPFIX))
1351             if p.haslayer(Data):
1352                 nr_data += 1
1353         self.assertTrue(nr_templates, 1)
1354
1355         ipfix.remove_vpp_config()
1356         self.logger.info("FFP_TEST_FINISH_0001")
1357
1358
1359 @unittest.skipUnless(config.extended, "part of extended tests")
1360 class DisableFP(MethodHolder):
1361     """Disable Flowprobe feature"""
1362
1363     @classmethod
1364     def setUpClass(cls):
1365         super(DisableFP, cls).setUpClass()
1366
1367     @classmethod
1368     def tearDownClass(cls):
1369         super(DisableFP, cls).tearDownClass()
1370
1371     def test_0001(self):
1372         """disable flowprobe feature after first packets"""
1373         self.logger.info("FFP_TEST_START_0001")
1374         self.pg_enable_capture(self.pg_interfaces)
1375         self.pkts = []
1376         ipfix = VppCFLOW(test=self)
1377         ipfix.add_vpp_config()
1378
1379         ipfix_decoder = IPFIXDecoder()
1380         # template packet should arrive immediately
1381         templates = ipfix.verify_templates(ipfix_decoder)
1382
1383         self.create_stream()
1384         self.send_packets()
1385
1386         # make sure the one packet we expect actually showed up
1387         self.vapi.ipfix_flush()
1388         self.wait_for_cflow_packet(self.collector, templates[1])
1389         self.collector.get_capture(4)
1390
1391         # disable IPFIX
1392         ipfix.disable_flowprobe_feature()
1393         self.pg_enable_capture([self.collector])
1394
1395         self.send_packets()
1396
1397         # make sure no one packet arrived in active timer span
1398         self.vapi.ipfix_flush()
1399         self.sleep(1, "wait before verifying no packets sent")
1400         self.collector.assert_nothing_captured()
1401
1402         # enable FPP feature so the remove_vpp_config() doesn't fail
1403         # due to missing feature on interface.
1404         ipfix.enable_flowprobe_feature()
1405
1406         ipfix.remove_vpp_config()
1407         self.logger.info("FFP_TEST_FINISH_0001")
1408
1409     def test_no_leftover_flows_after_disabling(self):
1410         """disable flowprobe feature and expect no leftover flows"""
1411         self.pg_enable_capture(self.pg_interfaces)
1412         self.pkts = []
1413
1414         # enable ip4 datapath for an interface
1415         # set active and passive timers
1416         ipfix = VppCFLOW(
1417             test=self,
1418             active=3,
1419             passive=4,
1420             intf="pg3",
1421             layer="l3",
1422             datapath="ip4",
1423             direction="rx",
1424             mtu=100,
1425         )
1426         ipfix.add_vpp_config()
1427
1428         # template packet should arrive immediately
1429         ipfix.verify_templates(count=1)
1430
1431         # send some ip4 packets
1432         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5)
1433         self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1434
1435         # disable feature for the interface
1436         # currently stored ip4 flows should be removed
1437         ipfix.disable_flowprobe_feature()
1438
1439         # no leftover ip4 flows are expected
1440         self.pg_enable_capture([self.collector])
1441         self.sleep(12, "wait for leftover ip4 flows during three passive intervals")
1442         self.collector.assert_nothing_captured()
1443
1444         # re-enable feature for the interface
1445         ipfix.enable_flowprobe_feature()
1446
1447         # template packet should arrive immediately
1448         ipfix_decoder = IPFIXDecoder()
1449         self.vapi.ipfix_flush()
1450         templates = ipfix.verify_templates(ipfix_decoder, count=1)
1451
1452         # send some ip4 packets
1453         self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=5)
1454         capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4)
1455
1456         # verify meta info - packet/octet delta
1457         self.vapi.ipfix_flush()
1458         cflow = self.wait_for_cflow_packet(self.collector, templates[0], timeout=8)
1459         self.verify_cflow_data(ipfix_decoder, capture, cflow)
1460
1461         self.collector.get_capture(2)
1462
1463         # cleanup
1464         ipfix.remove_vpp_config()
1465
1466
1467 @unittest.skipUnless(config.extended, "part of extended tests")
1468 class ReenableFP(MethodHolder):
1469     """Re-enable Flowprobe feature"""
1470
1471     @classmethod
1472     def setUpClass(cls):
1473         super(ReenableFP, cls).setUpClass()
1474
1475     @classmethod
1476     def tearDownClass(cls):
1477         super(ReenableFP, cls).tearDownClass()
1478
1479     def test_0001(self):
1480         """disable flowprobe feature after first packets and re-enable
1481         after few packets"""
1482         self.logger.info("FFP_TEST_START_0001")
1483         self.pg_enable_capture(self.pg_interfaces)
1484         self.pkts = []
1485
1486         ipfix = VppCFLOW(test=self)
1487         ipfix.add_vpp_config()
1488
1489         ipfix_decoder = IPFIXDecoder()
1490         # template packet should arrive immediately
1491         self.vapi.ipfix_flush()
1492         templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1493
1494         self.create_stream()
1495         self.send_packets()
1496
1497         # make sure the one packet we expect actually showed up
1498         self.vapi.ipfix_flush()
1499         self.wait_for_cflow_packet(self.collector, templates[1], 5)
1500         self.collector.get_capture(4)
1501
1502         # disable FPP feature
1503         ipfix.disable_flowprobe_feature()
1504         self.pg_enable_capture([self.collector])
1505
1506         self.send_packets()
1507
1508         # make sure no one packet arrived in active timer span
1509         self.vapi.ipfix_flush()
1510         self.sleep(5, "wait before verifying no packets sent")
1511         self.collector.assert_nothing_captured()
1512
1513         # enable FPP feature
1514         ipfix.enable_flowprobe_feature()
1515         self.vapi.ipfix_flush()
1516         templates = ipfix.verify_templates(ipfix_decoder, timeout=3)
1517
1518         self.send_packets()
1519
1520         # make sure the next packets (templates and data) we expect actually
1521         # showed up
1522         self.vapi.ipfix_flush()
1523         self.wait_for_cflow_packet(self.collector, templates[1], 5)
1524         self.collector.get_capture(4)
1525
1526         ipfix.remove_vpp_config()
1527         self.logger.info("FFP_TEST_FINISH_0001")
1528
1529
1530 if __name__ == "__main__":
1531     unittest.main(testRunner=VppTestRunner)