classify: add complex encap packet tracing test
[vpp.git] / test / test_trace_filter.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner, running_extended_tests
6 from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, UDP
11 from scapy.layers.vxlan import VXLAN
12 from scapy.layers.geneve import GENEVE
13 from scapy.compat import raw
14
15
16 class TestTracefilter(VppTestCase):
17     """ Packet Tracer Filter Test """
18
19     @classmethod
20     def setUpClass(cls):
21         super(TestTracefilter, cls).setUpClass()
22
23     @classmethod
24     def tearDownClass(cls):
25         super(TestTracefilter, cls).tearDownClass()
26
27     def setUp(self):
28         super(TestTracefilter, self).setUp()
29         self.create_pg_interfaces(range(1))
30         for i in self.pg_interfaces:
31             i.admin_up()
32             i.config_ip4()
33
34     def tearDown(self):
35         super(TestTracefilter, self).tearDown()
36         for i in self.pg_interfaces:
37             i.unconfig()
38             i.admin_down()
39
40     def cli(self, cmd):
41         r = self.vapi.cli_return_response(cmd)
42         if r.retval != 0:
43             if hasattr(r, 'reply'):
44                 self.logger.info(cmd + " FAIL reply " + r.reply)
45             else:
46                 self.logger.info(cmd + " FAIL retval " + str(r.retval))
47         return r
48
49     # check number of hits for classifier
50     def assert_hits(self, n):
51         r = self.cli("show classify table verbose 2")
52         self.assertTrue(r.retval == 0)
53         self.assertTrue(hasattr(r, 'reply'))
54         self.assertTrue(r.reply.find("hits %i" % n) != -1)
55
56     def test_mactime_unitTest(self):
57         """ Packet Tracer Filter Test """
58         cmds = ["loopback create",
59                 "set int ip address loop0 192.168.1.1/24",
60                 "set int state loop0 up",
61                 "packet-generator new {\n"
62                 " name classifyme\n"
63                 " limit 100\n"
64                 " size 300-300\n"
65                 " interface loop0\n"
66                 " node ethernet-input\n"
67                 " data { \n"
68                 "      IP4: 1.2.3 -> 4.5.6\n"
69                 "      UDP: 192.168.1.10 - 192.168.1.20 -> 192.168.2.10\n"
70                 "      UDP: 1234 -> 2345\n"
71                 "      incrementing 286\n"
72                 "     }\n"
73                 "}\n",
74                 "classify filter trace mask l3 ip4 src\n"
75                 " match l3 ip4 src 192.168.1.15",
76                 "trace add pg-input 100 filter",
77                 "pa en classifyme"]
78
79         for cmd in cmds:
80             self.cli(cmd)
81
82         # Check for 9 classifier hits, which is the right answer
83         self.assert_hits(9)
84
85         # cleanup
86         self.cli("pa de classifyme")
87         self.cli("classify filter trace del mask l3 ip4 src "
88                  "match l3 ip4 src 192.168.1.15")
89
90     # install a classify rule, inject traffic and check for hits
91     def assert_classify(self, mask, match, packets, n=None):
92         r = self.cli(
93             "classify filter trace mask hex %s match hex %s" %
94             (mask, match))
95         self.assertTrue(r.retval == 0)
96         r = self.cli("trace add pg-input %i filter" % len(packets))
97         self.assertTrue(r.retval == 0)
98         self.pg0.add_stream(packets)
99         self.cli("pa en")
100         self.assert_hits(n if n is not None else len(packets))
101         self.cli("clear trace")
102         self.cli(
103             "classify filter trace del mask hex %s match hex %s" %
104             (mask, match))
105
106     def test_encap(self):
107         """ Packet Tracer Filter Test with encap """
108
109         # the packet we are trying to match
110         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
111              IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
112              UDP() /
113              VXLAN() /
114              Ether() /
115              IP() /
116              UDP() /
117              GENEVE(vni=1234) /
118              Ether() /
119              IP(src='192.168.4.167') /
120              UDP() /
121              Raw('\xa5' * 100))
122
123         #
124         # compute filter mask & value
125         # we compute it by XOR'ing a template packet with a modified packet
126         # we need to set checksums to 0 to make sure scapy will not recompute
127         # them
128         #
129         tmpl = (Ether() /
130                 IP(chksum=0) /
131                 UDP(chksum=0) /
132                 VXLAN() /
133                 Ether() /
134                 IP(chksum=0) /
135                 UDP(chksum=0) /
136                 GENEVE(vni=0) /
137                 Ether() /
138                 IP(src='0.0.0.0', chksum=0))
139         ori = raw(tmpl)
140
141         # the mask
142         tmpl[GENEVE].vni = 0xffffff
143         user = tmpl[GENEVE].payload
144         user[IP].src = '255.255.255.255'
145         new = raw(tmpl)
146         mask = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
147
148         # this does not match (wrong vni)
149         tmpl[GENEVE].vni = 1
150         user = tmpl[GENEVE].payload
151         user[IP].src = '192.168.4.167'
152         new = raw(tmpl)
153         match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
154         self.assert_classify(mask, match, [p] * 11, 0)
155
156         # this must match
157         tmpl[GENEVE].vni = 1234
158         new = raw(tmpl)
159         match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
160         self.assert_classify(mask, match, [p] * 17)
161
162
163 if __name__ == '__main__':
164     unittest.main(testRunner=VppTestRunner)