p = (
Ether(dst=src_if.local_mac, src=src_if.remote_mac)
/ IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
- / UDP(sport=randint(49152, 65535), dport=5678)
+ / UDP(sport=randint(49152, 65535), dport=5678 + i)
)
info.data = p.copy()
packets.append(p)
"Unexpected packets in the trace buffer",
)
+ reply = self.vapi.cli("show bpf trace filter")
+ self.assertIn("(000)", reply, "Unexpected bpf filter dump")
+
def test_bpf_trace_filter_vapi(self):
"""BPF Trace filter test [VAPI]"""
self.vapi.bpf_trace_filter_set(filter="tcp")
"Unexpected packets in the trace buffer",
)
+ def test_bpf_trace_filter_vapi_v2(self):
+ """BPF Trace filter test [VAPI v2]"""
+ self.vapi.bpf_trace_filter_set_v2(filter="tcp or dst port 5678")
+ self.vapi.trace_set_filter_function(filter_function_name="bpf_trace_filter")
+
+ packets = self.create_stream(self.pg0, self.pg1, 3)
+ self.pg0.add_stream(packets)
+ self.pg_start(traceFilter=True)
+
+ # verify that bpf trace filter has been selected
+ reply = self.vapi.cli("show trace filter function")
+ self.assertIn(
+ "(*) name:bpf_trace_filter", reply, "BPF Trace filter is not selected"
+ )
+
+ # verify that trace is filtered
+ reply = self.vapi.cli("show trace")
+ self.assertIn(
+ "Packet 1\n",
+ reply,
+ "No expected packets in the trace buffer",
+ )
+ self.assertNotIn(
+ "Packet 2\n",
+ reply,
+ "Unexpected packets in the trace buffer",
+ )
+
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)