tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / test_bpf_trace_filter.py
1 from framework import VppTestCase
2 from asfframework import VppTestRunner
3 import unittest
4 from config import config
5 from scapy.layers.l2 import Ether
6 from scapy.layers.inet import IP, UDP
7 from random import randint
8
9
10 @unittest.skipIf(
11     "bpf_trace_filter" in config.excluded_plugins,
12     "Exclude BPF Trace Filter plugin tests",
13 )
14 class TestBpfTraceFilter(VppTestCase):
15     """BPF Trace filter test"""
16
17     @classmethod
18     def setUpClass(cls):
19         super(TestBpfTraceFilter, cls).setUpClass()
20         try:
21             cls.create_pg_interfaces(range(2))
22             for i in cls.pg_interfaces:
23                 i.config_ip4()
24                 i.resolve_arp()
25                 i.admin_up()
26         except Exception:
27             cls.tearDownClass()
28             raise
29
30     @classmethod
31     def tearDownClass(cls):
32         for i in cls.pg_interfaces:
33             i.unconfig_ip4()
34             i.admin_down()
35         super(TestBpfTraceFilter, cls).tearDownClass()
36
37     # reset trace filter before each test
38     def setUp(self):
39         super(TestBpfTraceFilter, self).setUp()
40         self.vapi.cli("set trace filter function vnet_is_packet_traced")
41         self.vapi.cli("clear trace")
42
43     def create_stream(self, src_if, dst_if, count):
44         packets = []
45         for i in range(count):
46             info = self.create_packet_info(src_if, dst_if)
47             payload = self.info_to_payload(info)
48             p = (
49                 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
50                 / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
51                 / UDP(sport=randint(49152, 65535), dport=5678 + i)
52             )
53             info.data = p.copy()
54             packets.append(p)
55         return packets
56
57     def test_bpf_trace_filter_cli(self):
58         """BPF Trace filter test [CLI]"""
59         self.vapi.cli("set bpf trace filter {{tcp}}")
60         self.vapi.cli("set trace filter function bpf_trace_filter")
61
62         packets = self.create_stream(self.pg0, self.pg1, 3)
63         self.pg0.add_stream(packets)
64         self.pg_start(traceFilter=True)
65
66         # verify that bpf trace filter has been selected
67         reply = self.vapi.cli("show trace filter function")
68         self.assertIn(
69             "(*) name:bpf_trace_filter", reply, "BPF Trace filter is not selected"
70         )
71
72         # verify that trace is empty
73         reply = self.vapi.cli("show trace")
74         self.assertIn(
75             "No packets in trace buffer",
76             reply,
77             "Unexpected packets in the trace buffer",
78         )
79
80         reply = self.vapi.cli("show bpf trace filter")
81         self.assertIn("(000)", reply, "Unexpected bpf filter dump")
82
83     def test_bpf_trace_filter_vapi(self):
84         """BPF Trace filter test [VAPI]"""
85         self.vapi.bpf_trace_filter_set(filter="tcp")
86         self.vapi.trace_set_filter_function(filter_function_name="bpf_trace_filter")
87
88         packets = self.create_stream(self.pg0, self.pg1, 3)
89         self.pg0.add_stream(packets)
90         self.pg_start(traceFilter=True)
91
92         # verify that bpf trace filter has been selected
93         reply = self.vapi.cli("show trace filter function")
94         self.assertIn(
95             "(*) name:bpf_trace_filter", reply, "BPF Trace filter is not selected"
96         )
97
98         # verify that trace is empty
99         reply = self.vapi.cli("show trace")
100         self.assertIn(
101             "No packets in trace buffer",
102             reply,
103             "Unexpected packets in the trace buffer",
104         )
105
106     def test_bpf_trace_filter_vapi_v2(self):
107         """BPF Trace filter test [VAPI v2]"""
108         self.vapi.bpf_trace_filter_set_v2(filter="tcp or dst port 5678")
109         self.vapi.trace_set_filter_function(filter_function_name="bpf_trace_filter")
110
111         packets = self.create_stream(self.pg0, self.pg1, 3)
112         self.pg0.add_stream(packets)
113         self.pg_start(traceFilter=True)
114
115         # verify that bpf trace filter has been selected
116         reply = self.vapi.cli("show trace filter function")
117         self.assertIn(
118             "(*) name:bpf_trace_filter", reply, "BPF Trace filter is not selected"
119         )
120
121         # verify that trace is filtered
122         reply = self.vapi.cli("show trace")
123         self.assertIn(
124             "Packet 1\n",
125             reply,
126             "No expected packets in the trace buffer",
127         )
128         self.assertNotIn(
129             "Packet 2\n",
130             reply,
131             "Unexpected packets in the trace buffer",
132         )
133
134
135 if __name__ == "__main__":
136     unittest.main(testRunner=VppTestRunner)