1 from framework import VppTestCase
2 from asfframework import VppTestRunner
4 from config import config
5 from scapy.layers.l2 import Ether
6 from scapy.layers.inet import IP, UDP
7 from random import randint
11 "bpf_trace_filter" in config.excluded_plugins,
12 "Exclude BPF Trace Filter plugin tests",
14 class TestBpfTraceFilter(VppTestCase):
15 """BPF Trace filter test"""
19 super(TestBpfTraceFilter, cls).setUpClass()
21 cls.create_pg_interfaces(range(2))
22 for i in cls.pg_interfaces:
31 def tearDownClass(cls):
32 for i in cls.pg_interfaces:
35 super(TestBpfTraceFilter, cls).tearDownClass()
37 # reset trace filter before each test
39 super(TestBpfTraceFilter, self).setUp()
40 self.vapi.cli("set trace filter function vnet_is_packet_traced")
41 self.vapi.cli("clear trace")
43 def create_stream(self, src_if, dst_if, count):
45 for i in range(count):
46 info = self.create_packet_info(src_if, dst_if)
47 payload = self.info_to_payload(info)
49 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
50 / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
51 / UDP(sport=randint(49152, 65535), dport=5678 + i)
57 def test_bpf_trace_filter_cli(self):
58 """BPF Trace filter test [CLI]"""
59 self.vapi.cli("set bpf trace filter {{tcp}}")
60 self.vapi.cli("set trace filter function bpf_trace_filter")
62 packets = self.create_stream(self.pg0, self.pg1, 3)
63 self.pg0.add_stream(packets)
64 self.pg_start(traceFilter=True)
66 # verify that bpf trace filter has been selected
67 reply = self.vapi.cli("show trace filter function")
69 "(*) name:bpf_trace_filter", reply, "BPF Trace filter is not selected"
72 # verify that trace is empty
73 reply = self.vapi.cli("show trace")
75 "No packets in trace buffer",
77 "Unexpected packets in the trace buffer",
80 reply = self.vapi.cli("show bpf trace filter")
81 self.assertIn("(000)", reply, "Unexpected bpf filter dump")
83 def test_bpf_trace_filter_vapi(self):
84 """BPF Trace filter test [VAPI]"""
85 self.vapi.bpf_trace_filter_set(filter="tcp")
86 self.vapi.trace_set_filter_function(filter_function_name="bpf_trace_filter")
88 packets = self.create_stream(self.pg0, self.pg1, 3)
89 self.pg0.add_stream(packets)
90 self.pg_start(traceFilter=True)
92 # verify that bpf trace filter has been selected
93 reply = self.vapi.cli("show trace filter function")
95 "(*) name:bpf_trace_filter", reply, "BPF Trace filter is not selected"
98 # verify that trace is empty
99 reply = self.vapi.cli("show trace")
101 "No packets in trace buffer",
103 "Unexpected packets in the trace buffer",
106 def test_bpf_trace_filter_vapi_v2(self):
107 """BPF Trace filter test [VAPI v2]"""
108 self.vapi.bpf_trace_filter_set_v2(filter="tcp or dst port 5678")
109 self.vapi.trace_set_filter_function(filter_function_name="bpf_trace_filter")
111 packets = self.create_stream(self.pg0, self.pg1, 3)
112 self.pg0.add_stream(packets)
113 self.pg_start(traceFilter=True)
115 # verify that bpf trace filter has been selected
116 reply = self.vapi.cli("show trace filter function")
118 "(*) name:bpf_trace_filter", reply, "BPF Trace filter is not selected"
121 # verify that trace is filtered
122 reply = self.vapi.cli("show trace")
126 "No expected packets in the trace buffer",
131 "Unexpected packets in the trace buffer",
135 if __name__ == "__main__":
136 unittest.main(testRunner=VppTestRunner)