5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
8 from scapy.contrib.geneve import GENEVE
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP
12 from scapy.layers.vxlan import VXLAN
13 from scapy.compat import raw
14 from scapy.utils import rdpcap
17 class TestTracefilter(VppTestCase):
18 """ Packet Tracer Filter Test """
22 super(TestTracefilter, cls).setUpClass()
25 def tearDownClass(cls):
26 super(TestTracefilter, cls).tearDownClass()
29 super(TestTracefilter, self).setUp()
30 self.create_pg_interfaces(range(2))
31 self.pg0.generate_remote_hosts(11)
32 for i in self.pg_interfaces:
38 super(TestTracefilter, self).tearDown()
39 for i in self.pg_interfaces:
44 r = self.vapi.cli_return_response(cmd)
46 s = "reply '%s'" % r.reply if hasattr(
47 r, "reply") else "retval '%s'" % r.retval
48 raise RuntimeError("cli command '%s' FAIL with %s" % (cmd, s))
51 # check number of hits for classifier
52 def assert_hits(self, n):
53 r = self.cli("show classify table verbose")
54 self.assertTrue(r.reply.find("hits %i" % n) != -1)
56 def add_trace_filter(self, mask, match):
57 self.cli("classify filter trace mask %s match %s" % (mask, match))
58 self.cli("clear trace")
59 self.cli("trace add pg-input 1000 filter")
61 def del_trace_filters(self):
62 self.cli("classify filter trace del")
63 r = self.cli("show classify filter")
64 s = "packet tracer: first table none"
65 self.assertTrue(r.reply.find(s) != -1)
67 def del_pcap_filters(self):
68 self.cli("classify filter pcap del")
69 r = self.cli("show classify filter")
70 s = "pcap rx/tx/drop: first table none"
71 self.assertTrue(r.reply.find(s) != -1)
74 """ Packet Tracer Filter Test """
75 self.add_trace_filter(
78 self.pg0.remote_hosts[5].ip4)
79 self.add_trace_filter(
80 "l3 ip4 proto l4 src_port",
81 "l3 ip4 proto 17 l4 src_port 2345")
82 # the packet we are trying to match
85 src = self.pg0.remote_hosts[i % len(self.pg0.remote_hosts)].ip4
86 p.append((Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
87 IP(src=src, dst=self.pg1.remote_ip4) /
88 UDP(sport=1234, dport=2345) / Raw('\xa5' * 100)))
90 p.append((Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
91 IP(src=self.pg0.remote_hosts[0].ip4,
92 dst=self.pg1.remote_ip4) /
93 UDP(sport=2345, dport=1234) / Raw('\xa5' * 100)))
95 self.send_and_expect(self.pg0, p, self.pg1, trace=False)
97 # Check for 9 and 17 classifier hits, which is the right answer
101 self.del_trace_filters()
103 # install a classify rule, inject traffic and check for hits
104 def assert_classify(self, mask, match, packets, n=None):
105 self.add_trace_filter("hex %s" % mask, "hex %s" % match)
106 self.send_and_expect(self.pg0, packets, self.pg1, trace=False)
107 self.assert_hits(n if n is not None else len(packets))
108 self.del_trace_filters()
110 def test_encap(self):
111 """ Packet Tracer Filter Test with encap """
113 # the packet we are trying to match
114 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
115 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
123 IP(src='192.168.4.167') /
128 # compute filter mask & value
129 # we compute it by XOR'ing a template packet with a modified packet
130 # we need to set checksums to 0 to make sure scapy will not recompute
142 IP(src='0.0.0.0', chksum=0))
146 tmpl[GENEVE].vni = 0xffffff
147 user = tmpl[GENEVE].payload
148 user[IP].src = '255.255.255.255'
150 mask = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
152 # this does not match (wrong vni)
154 user = tmpl[GENEVE].payload
155 user[IP].src = '192.168.4.167'
157 match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
158 self.assert_classify(mask, match, [p] * 11, 0)
161 tmpl[GENEVE].vni = 1234
163 match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
164 self.assert_classify(mask, match, [p] * 17)
167 """ Packet Capture Filter Test """
169 "classify filter pcap mask l3 ip4 src match l3 ip4 src %s" %
170 self.pg0.remote_hosts[5].ip4)
172 "classify filter pcap "
173 "mask l3 ip4 proto l4 src_port "
174 "match l3 ip4 proto 17 l4 src_port 2345")
176 "pcap trace rx tx max 1000 intfc pg0 "
177 "file vpp_test_trace_filter_test_pcap.pcap filter")
178 # the packet we are trying to match
181 src = self.pg0.remote_hosts[i % len(self.pg0.remote_hosts)].ip4
182 p.append((Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
183 IP(src=src, dst=self.pg1.remote_ip4) /
184 UDP(sport=1234, dport=2345) / Raw('\xa5' * 100)))
186 p.append((Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
187 IP(src=self.pg0.remote_hosts[0].ip4,
188 dst=self.pg1.remote_ip4) /
189 UDP(sport=2345, dport=1234) / Raw('\xa5' * 100)))
191 self.send_and_expect(self.pg0, p, self.pg1, trace=False)
193 # Check for 9 and 17 classifier hits, which is the right answer
197 self.cli("pcap trace rx tx off")
198 self.del_pcap_filters()
200 # check captured pcap
201 pcap = rdpcap("/tmp/vpp_test_trace_filter_test_pcap.pcap")
202 self.assertEqual(len(pcap), 9 + 17)
205 self.assertEqual(str(pcap[i]), p_)
207 for i in range(9, 9 + 17):
208 self.assertEqual(str(pcap[i]), p_)
210 def test_pcap_drop(self):
211 """ Drop Packet Capture Filter Test """
213 "pcap trace drop max 1000 "
214 "error {ip4-udp-lookup}.{no_listener} "
215 "file vpp_test_trace_filter_test_pcap_drop.pcap")
216 # the packet we are trying to match
219 # this packet should be forwarded
220 p.append((Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
221 IP(src=self.pg0.remote_hosts[0].ip4,
222 dst=self.pg1.remote_ip4) /
223 UDP(sport=2345, dport=1234) / Raw('\xa5' * 100)))
224 # this packet should be captured (no listener)
225 p.append((Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
226 IP(src=self.pg0.remote_hosts[0].ip4,
227 dst=self.pg0.local_ip4) /
228 UDP(sport=2345, dport=1234) / Raw('\xa5' * 100)))
229 # this packet will be blackholed but not captured
230 p.append((Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
231 IP(src=self.pg0.remote_hosts[0].ip4, dst="0.0.0.0") /
232 UDP(sport=2345, dport=1234) / Raw('\xa5' * 100)))
234 self.send_and_expect(self.pg0, p, self.pg1, n_rx=17, trace=False)
236 self.cli("pcap trace drop off")
238 # check captured pcap
239 pcap = rdpcap("/tmp/vpp_test_trace_filter_test_pcap_drop.pcap")
240 self.assertEqual(len(pcap), 17)
243 if __name__ == '__main__':
244 unittest.main(testRunner=VppTestRunner)