5 from framework import VppTestCase, VppTestRunner, running_extended_tests
6 from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
8 from scapy.contrib.geneve import GENEVE
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP
12 from scapy.layers.vxlan import VXLAN
13 from scapy.compat import raw
16 class TestTracefilter(VppTestCase):
17 """ Packet Tracer Filter Test """
21 super(TestTracefilter, cls).setUpClass()
24 def tearDownClass(cls):
25 super(TestTracefilter, cls).tearDownClass()
28 super(TestTracefilter, self).setUp()
29 self.create_pg_interfaces(range(1))
30 for i in self.pg_interfaces:
35 super(TestTracefilter, self).tearDown()
36 for i in self.pg_interfaces:
41 r = self.vapi.cli_return_response(cmd)
43 if hasattr(r, 'reply'):
44 self.logger.info(cmd + " FAIL reply " + r.reply)
46 self.logger.info(cmd + " FAIL retval " + str(r.retval))
49 # check number of hits for classifier
50 def assert_hits(self, n):
51 r = self.cli("show classify table verbose 2")
52 self.assertTrue(r.retval == 0)
53 self.assertTrue(hasattr(r, 'reply'))
54 self.assertTrue(r.reply.find("hits %i" % n) != -1)
56 def test_mactime_unitTest(self):
57 """ Packet Tracer Filter Test """
58 cmds = ["loopback create",
59 "set int ip address loop0 192.168.1.1/24",
60 "set int state loop0 up",
61 "packet-generator new {\n"
66 " node ethernet-input\n"
68 " IP4: 1.2.3 -> 4.5.6\n"
69 " UDP: 192.168.1.10 - 192.168.1.20 -> 192.168.2.10\n"
70 " UDP: 1234 -> 2345\n"
74 "classify filter trace mask l3 ip4 src"
75 " match l3 ip4 src 192.168.1.15",
76 "trace add pg-input 100 filter",
82 # Check for 9 classifier hits, which is the right answer
86 self.cli("pa de classifyme")
87 self.cli("classify filter trace del mask l3 ip4 src")
89 # install a classify rule, inject traffic and check for hits
90 def assert_classify(self, mask, match, packets, n=None):
92 "classify filter trace mask hex %s match hex %s" %
94 self.assertTrue(r.retval == 0)
95 r = self.cli("trace add pg-input %i filter" % len(packets))
96 self.assertTrue(r.retval == 0)
97 self.pg0.add_stream(packets)
99 self.assert_hits(n if n is not None else len(packets))
100 self.cli("clear trace")
102 "classify filter trace del mask hex %s" %
105 def test_encap(self):
106 """ Packet Tracer Filter Test with encap """
108 # the packet we are trying to match
109 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
110 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
118 IP(src='192.168.4.167') /
123 # compute filter mask & value
124 # we compute it by XOR'ing a template packet with a modified packet
125 # we need to set checksums to 0 to make sure scapy will not recompute
137 IP(src='0.0.0.0', chksum=0))
141 tmpl[GENEVE].vni = 0xffffff
142 user = tmpl[GENEVE].payload
143 user[IP].src = '255.255.255.255'
145 mask = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
147 # this does not match (wrong vni)
149 user = tmpl[GENEVE].payload
150 user[IP].src = '192.168.4.167'
152 match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
153 self.assert_classify(mask, match, [p] * 11, 0)
156 tmpl[GENEVE].vni = 1234
158 match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
159 self.assert_classify(mask, match, [p] * 17)
162 if __name__ == '__main__':
163 unittest.main(testRunner=VppTestRunner)