classify: honor pcap interface filter also when classify filter is used
[vpp.git] / test / test_trace_filter.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner, running_extended_tests
6 from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
7
8 from scapy.contrib.geneve import GENEVE
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP
12 from scapy.layers.vxlan import VXLAN
13 from scapy.compat import raw
14 from scapy.utils import rdpcap
15
16
17 class TestTracefilter(VppTestCase):
18     """ Packet Tracer Filter Test """
19
20     @classmethod
21     def setUpClass(cls):
22         super(TestTracefilter, cls).setUpClass()
23
24     @classmethod
25     def tearDownClass(cls):
26         super(TestTracefilter, cls).tearDownClass()
27
28     def setUp(self):
29         super(TestTracefilter, self).setUp()
30         self.create_pg_interfaces(range(2))
31         self.pg0.generate_remote_hosts(11)
32         for i in self.pg_interfaces:
33             i.admin_up()
34             i.config_ip4()
35             i.resolve_arp()
36
37     def tearDown(self):
38         super(TestTracefilter, self).tearDown()
39         for i in self.pg_interfaces:
40             i.unconfig()
41             i.admin_down()
42
43     def cli(self, cmd):
44         r = self.vapi.cli_return_response(cmd)
45         if r.retval != 0:
46             s = "reply '%s'" % r.reply if hasattr(
47                 r, "reply") else "retval '%s'" % r.retval
48             raise RuntimeError("cli command '%s' FAIL with %s" % (cmd, s))
49         return r
50
51     # check number of hits for classifier
52     def assert_hits(self, n):
53         r = self.cli("show classify table verbose")
54         self.assertTrue(r.reply.find("hits %i" % n) != -1)
55
56     def add_trace_filter(self, mask, match):
57         self.cli("classify filter trace mask %s match %s" % (mask, match))
58         self.cli("clear trace")
59         self.cli("trace add pg-input 1000 filter")
60
61     def del_trace_filters(self):
62         self.cli("classify filter trace del")
63         r = self.cli("show classify filter")
64         s = "packet tracer:                 first table none"
65         self.assertTrue(r.reply.find(s) != -1)
66
67     def del_pcap_filters(self):
68         self.cli("classify filter pcap del")
69         r = self.cli("show classify filter")
70         s = "pcap rx/tx/drop:               first table none"
71         self.assertTrue(r.reply.find(s) != -1)
72
73     def test_basic(self):
74         """ Packet Tracer Filter Test """
75         self.add_trace_filter(
76             "l3 ip4 src",
77             "l3 ip4 src %s" %
78             self.pg0.remote_hosts[5].ip4)
79         self.add_trace_filter(
80             "l3 ip4 proto l4 src_port",
81             "l3 ip4 proto 17 l4 src_port 2345")
82         # the packet we are trying to match
83         p = list()
84         for i in range(100):
85             src = self.pg0.remote_hosts[i % len(self.pg0.remote_hosts)].ip4
86             p.append((Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
87                       IP(src=src, dst=self.pg1.remote_ip4) /
88                       UDP(sport=1234, dport=2345) / Raw('\xa5' * 100)))
89         for i in range(17):
90             p.append((Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
91                       IP(src=self.pg0.remote_hosts[0].ip4,
92                          dst=self.pg1.remote_ip4) /
93                       UDP(sport=2345, dport=1234) / Raw('\xa5' * 100)))
94
95         self.send_and_expect(self.pg0, p, self.pg1, trace=False)
96
97         # Check for 9 and 17 classifier hits, which is the right answer
98         self.assert_hits(9)
99         self.assert_hits(17)
100
101         self.del_trace_filters()
102
103     # install a classify rule, inject traffic and check for hits
104     def assert_classify(self, mask, match, packets, n=None):
105         self.add_trace_filter("hex %s" % mask, "hex %s" % match)
106         self.send_and_expect(self.pg0, packets, self.pg1, trace=False)
107         self.assert_hits(n if n is not None else len(packets))
108         self.del_trace_filters()
109
110     def test_encap(self):
111         """ Packet Tracer Filter Test with encap """
112
113         # the packet we are trying to match
114         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
115              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
116              UDP() /
117              VXLAN() /
118              Ether() /
119              IP() /
120              UDP() /
121              GENEVE(vni=1234) /
122              Ether() /
123              IP(src='192.168.4.167') /
124              UDP() /
125              Raw('\xa5' * 100))
126
127         #
128         # compute filter mask & value
129         # we compute it by XOR'ing a template packet with a modified packet
130         # we need to set checksums to 0 to make sure scapy will not recompute
131         # them
132         #
133         tmpl = (Ether() /
134                 IP(chksum=0) /
135                 UDP(chksum=0) /
136                 VXLAN() /
137                 Ether() /
138                 IP(chksum=0) /
139                 UDP(chksum=0) /
140                 GENEVE(vni=0) /
141                 Ether() /
142                 IP(src='0.0.0.0', chksum=0))
143         ori = raw(tmpl)
144
145         # the mask
146         tmpl[GENEVE].vni = 0xffffff
147         user = tmpl[GENEVE].payload
148         user[IP].src = '255.255.255.255'
149         new = raw(tmpl)
150         mask = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
151
152         # this does not match (wrong vni)
153         tmpl[GENEVE].vni = 1
154         user = tmpl[GENEVE].payload
155         user[IP].src = '192.168.4.167'
156         new = raw(tmpl)
157         match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
158         self.assert_classify(mask, match, [p] * 11, 0)
159
160         # this must match
161         tmpl[GENEVE].vni = 1234
162         new = raw(tmpl)
163         match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
164         self.assert_classify(mask, match, [p] * 17)
165
166     def test_pcap(self):
167         """ Packet Capture Filter Test """
168         self.cli(
169             "classify filter pcap mask l3 ip4 src match l3 ip4 src %s" %
170             self.pg0.remote_hosts[5].ip4)
171         self.cli(
172             "classify filter pcap "
173             "mask l3 ip4 proto l4 src_port "
174             "match l3 ip4 proto 17 l4 src_port 2345")
175         self.cli(
176             "pcap trace rx tx max 1000 intfc pg0 "
177             "file vpp_test_trace_filter.pcap filter")
178         # the packet we are trying to match
179         p = list()
180         for i in range(100):
181             src = self.pg0.remote_hosts[i % len(self.pg0.remote_hosts)].ip4
182             p.append((Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
183                       IP(src=src, dst=self.pg1.remote_ip4) /
184                       UDP(sport=1234, dport=2345) / Raw('\xa5' * 100)))
185         for i in range(17):
186             p.append((Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
187                       IP(src=self.pg0.remote_hosts[0].ip4,
188                          dst=self.pg1.remote_ip4) /
189                       UDP(sport=2345, dport=1234) / Raw('\xa5' * 100)))
190
191         self.send_and_expect(self.pg0, p, self.pg1, trace=False)
192
193         # Check for 9 and 17 classifier hits, which is the right answer
194         self.assert_hits(9)
195         self.assert_hits(17)
196
197         self.cli("pcap trace rx tx off")
198         self.del_pcap_filters()
199
200         # check captured pcap
201         pcap = rdpcap("/tmp/vpp_test_trace_filter.pcap")
202         self.assertEqual(len(pcap), 9 + 17)
203         p_ = str(p[5])
204         for i in range(9):
205             self.assertEqual(str(pcap[i]), p_)
206         p_ = str(p[100])
207         for i in range(9, 9 + 17):
208             self.assertEqual(str(pcap[i]), p_)
209
210
211 if __name__ == '__main__':
212     unittest.main(testRunner=VppTestRunner)