5 from framework import VppTestCase, VppTestRunner, running_extended_tests
6 from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
8 from scapy.contrib.geneve import GENEVE
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP
12 from scapy.layers.vxlan import VXLAN
13 from scapy.compat import raw
16 class TestTracefilter(VppTestCase):
17 """ Packet Tracer Filter Test """
21 super(TestTracefilter, cls).setUpClass()
24 def tearDownClass(cls):
25 super(TestTracefilter, cls).tearDownClass()
28 super(TestTracefilter, self).setUp()
29 self.create_pg_interfaces(range(1))
30 for i in self.pg_interfaces:
35 super(TestTracefilter, self).tearDown()
36 for i in self.pg_interfaces:
41 r = self.vapi.cli_return_response(cmd)
43 if hasattr(r, 'reply'):
44 self.logger.info(cmd + " FAIL reply " + r.reply)
46 self.logger.info(cmd + " FAIL retval " + str(r.retval))
49 # check number of hits for classifier
50 def assert_hits(self, n):
51 r = self.cli("show classify table verbose 2")
52 self.assertTrue(r.retval == 0)
53 self.assertTrue(hasattr(r, 'reply'))
54 self.assertTrue(r.reply.find("hits %i" % n) != -1)
56 def test_mactime_unitTest(self):
57 """ Packet Tracer Filter Test """
58 cmds = ["loopback create",
59 "set int ip address loop0 192.168.1.1/24",
60 "set int state loop0 up",
61 "packet-generator new {\n"
66 " node ethernet-input\n"
68 " IP4: 1.2.3 -> 4.5.6\n"
69 " UDP: 192.168.1.10 - 192.168.1.20 -> 192.168.2.10\n"
70 " UDP: 1234 -> 2345\n"
74 "classify filter trace mask l3 ip4 src\n"
75 " match l3 ip4 src 192.168.1.15",
76 "trace add pg-input 100 filter",
82 # Check for 9 classifier hits, which is the right answer
86 self.cli("pa de classifyme")
87 self.cli("classify filter trace del mask l3 ip4 src "
88 "match l3 ip4 src 192.168.1.15")
90 # install a classify rule, inject traffic and check for hits
91 def assert_classify(self, mask, match, packets, n=None):
93 "classify filter trace mask hex %s match hex %s" %
95 self.assertTrue(r.retval == 0)
96 r = self.cli("trace add pg-input %i filter" % len(packets))
97 self.assertTrue(r.retval == 0)
98 self.pg0.add_stream(packets)
100 self.assert_hits(n if n is not None else len(packets))
101 self.cli("clear trace")
103 "classify filter trace del mask hex %s match hex %s" %
106 def test_encap(self):
107 """ Packet Tracer Filter Test with encap """
109 # the packet we are trying to match
110 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
111 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
119 IP(src='192.168.4.167') /
124 # compute filter mask & value
125 # we compute it by XOR'ing a template packet with a modified packet
126 # we need to set checksums to 0 to make sure scapy will not recompute
138 IP(src='0.0.0.0', chksum=0))
142 tmpl[GENEVE].vni = 0xffffff
143 user = tmpl[GENEVE].payload
144 user[IP].src = '255.255.255.255'
146 mask = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
148 # this does not match (wrong vni)
150 user = tmpl[GENEVE].payload
151 user[IP].src = '192.168.4.167'
153 match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
154 self.assert_classify(mask, match, [p] * 11, 0)
157 tmpl[GENEVE].vni = 1234
159 match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
160 self.assert_classify(mask, match, [p] * 17)
163 if __name__ == '__main__':
164 unittest.main(testRunner=VppTestRunner)