5 from framework import VppTestCase, VppTestRunner, running_extended_tests
6 from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
8 from scapy.contrib.geneve import GENEVE
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP
12 from scapy.layers.vxlan import VXLAN
13 from scapy.compat import raw
14 from scapy.utils import rdpcap
17 class TestTracefilter(VppTestCase):
18 """ Packet Tracer Filter Test """
22 super(TestTracefilter, cls).setUpClass()
25 def tearDownClass(cls):
26 super(TestTracefilter, cls).tearDownClass()
29 super(TestTracefilter, self).setUp()
30 self.create_pg_interfaces(range(2))
31 self.pg0.generate_remote_hosts(11)
32 for i in self.pg_interfaces:
38 super(TestTracefilter, self).tearDown()
39 for i in self.pg_interfaces:
44 r = self.vapi.cli_return_response(cmd)
46 s = "reply '%s'" % r.reply if hasattr(
47 r, "reply") else "retval '%s'" % r.retval
48 raise RuntimeError("cli command '%s' FAIL with %s" % (cmd, s))
51 # check number of hits for classifier
52 def assert_hits(self, n):
53 r = self.cli("show classify table verbose")
54 self.assertTrue(r.reply.find("hits %i" % n) != -1)
56 def add_trace_filter(self, mask, match):
57 self.cli("classify filter trace mask %s match %s" % (mask, match))
58 self.cli("clear trace")
59 self.cli("trace add pg-input 1000 filter")
61 def del_trace_filters(self):
62 self.cli("classify filter trace del")
63 r = self.cli("show classify filter")
64 s = "packet tracer: first table none"
65 self.assertTrue(r.reply.find(s) != -1)
67 def del_pcap_filters(self):
68 self.cli("classify filter pcap del")
69 r = self.cli("show classify filter")
70 s = "pcap rx/tx/drop: first table none"
71 self.assertTrue(r.reply.find(s) != -1)
74 """ Packet Tracer Filter Test """
75 self.add_trace_filter(
78 self.pg0.remote_hosts[5].ip4)
79 self.add_trace_filter(
80 "l3 ip4 proto l4 src_port",
81 "l3 ip4 proto 17 l4 src_port 2345")
82 # the packet we are trying to match
85 src = self.pg0.remote_hosts[i % len(self.pg0.remote_hosts)].ip4
86 p.append((Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
87 IP(src=src, dst=self.pg1.remote_ip4) /
88 UDP(sport=1234, dport=2345) / Raw('\xa5' * 100)))
90 p.append((Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
91 IP(src=self.pg0.remote_hosts[0].ip4,
92 dst=self.pg1.remote_ip4) /
93 UDP(sport=2345, dport=1234) / Raw('\xa5' * 100)))
95 self.send_and_expect(self.pg0, p, self.pg1, trace=False)
97 # Check for 9 and 17 classifier hits, which is the right answer
101 self.del_trace_filters()
103 # install a classify rule, inject traffic and check for hits
104 def assert_classify(self, mask, match, packets, n=None):
105 self.add_trace_filter("hex %s" % mask, "hex %s" % match)
106 self.send_and_expect(self.pg0, packets, self.pg1, trace=False)
107 self.assert_hits(n if n is not None else len(packets))
108 self.del_trace_filters()
110 def test_encap(self):
111 """ Packet Tracer Filter Test with encap """
113 # the packet we are trying to match
114 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
115 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
123 IP(src='192.168.4.167') /
128 # compute filter mask & value
129 # we compute it by XOR'ing a template packet with a modified packet
130 # we need to set checksums to 0 to make sure scapy will not recompute
142 IP(src='0.0.0.0', chksum=0))
146 tmpl[GENEVE].vni = 0xffffff
147 user = tmpl[GENEVE].payload
148 user[IP].src = '255.255.255.255'
150 mask = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
152 # this does not match (wrong vni)
154 user = tmpl[GENEVE].payload
155 user[IP].src = '192.168.4.167'
157 match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
158 self.assert_classify(mask, match, [p] * 11, 0)
161 tmpl[GENEVE].vni = 1234
163 match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
164 self.assert_classify(mask, match, [p] * 17)
167 """ Packet Capture Filter Test """
169 "classify filter pcap mask l3 ip4 src match l3 ip4 src %s" %
170 self.pg0.remote_hosts[5].ip4)
172 "classify filter pcap "
173 "mask l3 ip4 proto l4 src_port "
174 "match l3 ip4 proto 17 l4 src_port 2345")
176 "pcap trace rx tx max 1000 intfc pg0 "
177 "file vpp_test_trace_filter.pcap filter")
178 # the packet we are trying to match
181 src = self.pg0.remote_hosts[i % len(self.pg0.remote_hosts)].ip4
182 p.append((Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
183 IP(src=src, dst=self.pg1.remote_ip4) /
184 UDP(sport=1234, dport=2345) / Raw('\xa5' * 100)))
186 p.append((Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
187 IP(src=self.pg0.remote_hosts[0].ip4,
188 dst=self.pg1.remote_ip4) /
189 UDP(sport=2345, dport=1234) / Raw('\xa5' * 100)))
191 self.send_and_expect(self.pg0, p, self.pg1, trace=False)
193 # Check for 9 and 17 classifier hits, which is the right answer
197 self.cli("pcap trace rx tx off")
198 self.del_pcap_filters()
200 # check captured pcap
201 pcap = rdpcap("/tmp/vpp_test_trace_filter.pcap")
202 self.assertEqual(len(pcap), 9 + 17)
205 self.assertEqual(str(pcap[i]), p_)
207 for i in range(9, 9 + 17):
208 self.assertEqual(str(pcap[i]), p_)
211 if __name__ == '__main__':
212 unittest.main(testRunner=VppTestRunner)