tracenode: filtering feature
[vpp.git] / test / test_trace_filter.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import secrets
5 import socket
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ipip_tun_interface import VppIpIpTunInterface
9 from vpp_papi import VppEnum
10 from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdItfBinding, VppIpsecSpdEntry
11 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto
12
13 from scapy.contrib.geneve import GENEVE
14 from scapy.packet import Raw
15 from scapy.layers.l2 import Ether
16 from scapy.layers.inet import IP, UDP, TCP
17 from scapy.layers.vxlan import VXLAN
18 from scapy.layers.ipsec import ESP, SecurityAssociation
19 from scapy.compat import raw
20 from scapy.utils import rdpcap
21
22
23 class TemplateTraceFilter(VppTestCase):
24     @classmethod
25     def setUpClass(cls):
26         super().setUpClass()
27
28     @classmethod
29     def tearDownClass(cls):
30         super().tearDownClass()
31
32     def setUp(self):
33         super().setUp()
34         self.create_pg_interfaces(range(2))
35         self.pg0.generate_remote_hosts(11)
36         for i in self.pg_interfaces:
37             i.admin_up()
38             i.config_ip4()
39             i.resolve_arp()
40
41     def tearDown(self):
42         super().tearDown()
43         for i in self.pg_interfaces:
44             i.unconfig()
45             i.admin_down()
46
47     def cli(self, cmd):
48         r = self.vapi.cli_return_response(cmd)
49         if r.retval != 0:
50             s = (
51                 "reply '%s'" % r.reply
52                 if hasattr(r, "reply")
53                 else "retval '%s'" % r.retval
54             )
55             raise RuntimeError("cli command '%s' FAIL with %s" % (cmd, s))
56         return r
57
58     # check number of hits for classifier
59     def assert_hits(self, n):
60         r = self.cli("show classify table verbose")
61         self.assertTrue(r.reply.find("hits %i" % n) != -1)
62
63     def clear(self):
64         self.cli("clear trace")
65
66     def add_trace_filter(self, mask, match):
67         self.cli("classify filter trace mask %s match %s" % (mask, match))
68         self.clear()
69         self.cli("trace add pg-input 1000 filter")
70
71     def del_trace_filters(self):
72         self.cli("classify filter trace del")
73         r = self.cli("show classify filter")
74         s = "packet tracer:                 first table none"
75         self.assertTrue(r.reply.find(s) != -1)
76
77     def del_pcap_filters(self):
78         self.cli("classify filter pcap del")
79         r = self.cli("show classify filter")
80         s = "pcap rx/tx/drop:               first table none"
81         self.assertTrue(r.reply.find(s) != -1)
82
83     # install a classify rule, inject traffic and check for hits
84     def assert_classify(self, mask, match, packets, n=None):
85         self.add_trace_filter("hex %s" % mask, "hex %s" % match)
86         self.send_and_expect(self.pg0, packets, self.pg1, trace=False)
87         self.assert_hits(n if n is not None else len(packets))
88         self.del_trace_filters()
89
90
91 class TestTracefilter(TemplateTraceFilter):
92     """Packet Tracer Filter Test"""
93
94     def test_basic(self):
95         """Packet Tracer Filter Test"""
96         self.add_trace_filter(
97             "l3 ip4 src", "l3 ip4 src %s" % self.pg0.remote_hosts[5].ip4
98         )
99         self.add_trace_filter(
100             "l3 ip4 proto l4 src_port", "l3 ip4 proto 17 l4 src_port 2345"
101         )
102         # the packet we are trying to match
103         p = list()
104         for i in range(100):
105             src = self.pg0.remote_hosts[i % len(self.pg0.remote_hosts)].ip4
106             p.append(
107                 (
108                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
109                     / IP(src=src, dst=self.pg1.remote_ip4)
110                     / UDP(sport=1234, dport=2345)
111                     / Raw("\xa5" * 100)
112                 )
113             )
114         for i in range(17):
115             p.append(
116                 (
117                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
118                     / IP(src=self.pg0.remote_hosts[0].ip4, dst=self.pg1.remote_ip4)
119                     / UDP(sport=2345, dport=1234)
120                     / Raw("\xa5" * 100)
121                 )
122             )
123
124         self.send_and_expect(self.pg0, p, self.pg1, trace=False)
125
126         # Check for 9 and 17 classifier hits, which is the right answer
127         self.assert_hits(9)
128         self.assert_hits(17)
129
130         self.del_trace_filters()
131
132     def test_encap(self):
133         """Packet Tracer Filter Test with encap"""
134
135         # the packet we are trying to match
136         p = (
137             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
138             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
139             / UDP()
140             / VXLAN()
141             / Ether()
142             / IP()
143             / UDP()
144             / GENEVE(vni=1234)
145             / Ether()
146             / IP(src="192.168.4.167")
147             / UDP()
148             / Raw("\xa5" * 100)
149         )
150
151         #
152         # compute filter mask & value
153         # we compute it by XOR'ing a template packet with a modified packet
154         # we need to set checksums to 0 to make sure scapy will not recompute
155         # them
156         #
157         tmpl = (
158             Ether()
159             / IP(chksum=0)
160             / UDP(chksum=0)
161             / VXLAN()
162             / Ether()
163             / IP(chksum=0)
164             / UDP(chksum=0)
165             / GENEVE(vni=0)
166             / Ether()
167             / IP(src="0.0.0.0", chksum=0)
168         )
169         ori = raw(tmpl)
170
171         # the mask
172         tmpl[GENEVE].vni = 0xFFFFFF
173         user = tmpl[GENEVE].payload
174         user[IP].src = "255.255.255.255"
175         new = raw(tmpl)
176         mask = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
177
178         # this does not match (wrong vni)
179         tmpl[GENEVE].vni = 1
180         user = tmpl[GENEVE].payload
181         user[IP].src = "192.168.4.167"
182         new = raw(tmpl)
183         match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
184         self.assert_classify(mask, match, [p] * 11, 0)
185
186         # this must match
187         tmpl[GENEVE].vni = 1234
188         new = raw(tmpl)
189         match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
190         self.assert_classify(mask, match, [p] * 17)
191
192     def test_pcap(self):
193         """Packet Capture Filter Test"""
194         self.cli(
195             "classify filter pcap mask l3 ip4 src match l3 ip4 src %s"
196             % self.pg0.remote_hosts[5].ip4
197         )
198         self.cli(
199             "classify filter pcap "
200             "mask l3 ip4 proto l4 src_port "
201             "match l3 ip4 proto 17 l4 src_port 2345"
202         )
203         self.cli(
204             "pcap trace rx tx max 1000 intfc pg0 "
205             "file vpp_test_trace_filter_test_pcap.pcap filter"
206         )
207         # the packet we are trying to match
208         p = list()
209         for i in range(100):
210             src = self.pg0.remote_hosts[i % len(self.pg0.remote_hosts)].ip4
211             p.append(
212                 (
213                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
214                     / IP(src=src, dst=self.pg1.remote_ip4)
215                     / UDP(sport=1234, dport=2345)
216                     / Raw("\xa5" * 100)
217                 )
218             )
219         for i in range(17):
220             p.append(
221                 (
222                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
223                     / IP(src=self.pg0.remote_hosts[0].ip4, dst=self.pg1.remote_ip4)
224                     / UDP(sport=2345, dport=1234)
225                     / Raw("\xa5" * 100)
226                 )
227             )
228
229         self.send_and_expect(self.pg0, p, self.pg1, trace=False)
230
231         # Check for 9 and 17 classifier hits, which is the right answer
232         self.assert_hits(9)
233         self.assert_hits(17)
234
235         self.cli("pcap trace rx tx off")
236         self.del_pcap_filters()
237
238         # check captured pcap
239         pcap = rdpcap("/tmp/vpp_test_trace_filter_test_pcap.pcap")
240         self.assertEqual(len(pcap), 9 + 17)
241         p_ = str(p[5])
242         for i in range(9):
243             self.assertEqual(str(pcap[i]), p_)
244         p_ = str(p[100])
245         for i in range(9, 9 + 17):
246             self.assertEqual(str(pcap[i]), p_)
247
248     def test_pcap_drop(self):
249         """Drop Packet Capture Filter Test"""
250         self.cli(
251             "pcap trace drop max 1000 "
252             "error {ip4-udp-lookup}.{no_listener} "
253             "file vpp_test_trace_filter_test_pcap_drop.pcap"
254         )
255         # the packet we are trying to match
256         p = list()
257         for i in range(17):
258             # this packet should be forwarded
259             p.append(
260                 (
261                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
262                     / IP(src=self.pg0.remote_hosts[0].ip4, dst=self.pg1.remote_ip4)
263                     / UDP(sport=2345, dport=1234)
264                     / Raw("\xa5" * 100)
265                 )
266             )
267             # this packet should be captured (no listener)
268             p.append(
269                 (
270                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
271                     / IP(src=self.pg0.remote_hosts[0].ip4, dst=self.pg0.local_ip4)
272                     / UDP(sport=2345, dport=1234)
273                     / Raw("\xa5" * 100)
274                 )
275             )
276         # this packet will be blackholed but not captured
277         p.append(
278             (
279                 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
280                 / IP(src=self.pg0.remote_hosts[0].ip4, dst="0.0.0.0")
281                 / UDP(sport=2345, dport=1234)
282                 / Raw("\xa5" * 100)
283             )
284         )
285
286         self.send_and_expect(self.pg0, p, self.pg1, n_rx=17, trace=False)
287
288         self.cli("pcap trace drop off")
289
290         # check captured pcap
291         pcap = rdpcap("/tmp/vpp_test_trace_filter_test_pcap_drop.pcap")
292         self.assertEqual(len(pcap), 17)
293
294
295 class TestTraceFilterInner(TemplateTraceFilter):
296     """Packet Tracer Filter Inner Test"""
297
298     extra_vpp_plugin_config = [
299         "plugin tracenode_plugin.so {enable}",
300     ]
301
302     def add_trace_filter(self, mask, match, tn_feature_intfc_index=None):
303         if tn_feature_intfc_index is not None:
304             self.logger.info("fffff")
305             self.vapi.tracenode_enable_disable(sw_if_index=tn_feature_intfc_index)
306         super().add_trace_filter(mask, match)
307
308     def del_trace_filters(self, tn_feature_intfc_index=None):
309         if tn_feature_intfc_index is not None:
310             self.vapi.tracenode_enable_disable(
311                 sw_if_index=tn_feature_intfc_index, enable=False
312             )
313         super().del_trace_filters()
314
315     def __add_sa(self, id_, tun_src, tun_dst):
316         # AES-CTR-128 / SHA2-256
317         crypto_key_length = 16
318         salt_length = 4
319         integ_key_lenght = 16
320         crypto_key = secrets.token_bytes(crypto_key_length)
321         salt = secrets.randbits(salt_length * 8)
322         integ_key = secrets.token_bytes(integ_key_lenght)
323
324         flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
325
326         vpp_sa_in = VppIpsecSA(
327             test=self,
328             id=id_,
329             spi=id_,
330             integ_alg=VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128,
331             integ_key=integ_key,
332             crypto_alg=VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CTR_128,
333             crypto_key=crypto_key,
334             proto=VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP,
335             flags=flags,
336             salt=salt,
337             tun_src=tun_src,
338             tun_dst=tun_dst,
339             udp_src=4500,
340             udp_dst=4500,
341         )
342         vpp_sa_in.add_vpp_config()
343
344         scapy_sa_in = SecurityAssociation(
345             ESP,
346             spi=id_,
347             crypt_algo="AES-CTR",
348             crypt_key=crypto_key + salt.to_bytes(salt_length, "big"),
349             auth_algo="SHA2-256-128",
350             auth_key=integ_key,
351             tunnel_header=IP(src=tun_src, dst=tun_dst),
352             nat_t_header=UDP(sport=4500, dport=4500),
353         )
354
355         id_ += 1
356
357         vpp_sa_out = VppIpsecSA(
358             test=self,
359             id=id_,
360             spi=id_,
361             integ_alg=VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128,
362             integ_key=integ_key,
363             crypto_alg=VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CTR_128,
364             crypto_key=crypto_key,
365             proto=VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP,
366             flags=flags,
367             salt=salt,
368             tun_src=tun_dst,
369             tun_dst=tun_src,
370             udp_src=4500,
371             udp_dst=4500,
372         )
373         vpp_sa_out.add_vpp_config()
374
375         scapy_sa_out = SecurityAssociation(
376             ESP,
377             spi=id_,
378             crypt_algo="AES-CTR",
379             crypt_key=crypto_key + salt.to_bytes(salt_length, "big"),
380             auth_algo="SHA2-256-128",
381             auth_key=integ_key,
382             tunnel_header=IP(src=tun_dst, dst=tun_src),
383             nat_t_header=UDP(sport=4500, dport=4500),
384         )
385
386         return vpp_sa_in, scapy_sa_in, vpp_sa_out, scapy_sa_out
387
388     def __gen_encrypt_pkt(self, scapy_sa, pkt):
389         return Ether(
390             src=self.pg0.local_mac, dst=self.pg0.remote_mac
391         ) / scapy_sa.encrypt(pkt)
392
393     def test_encrypted_encap(self):
394         """Packet Tracer Filter Test with encrypted encap"""
395
396         vpp_sa_in, scapy_sa_in, vpp_sa_out, _ = self.__add_sa(
397             1, self.pg0.local_ip4, self.pg0.remote_ip4
398         )
399
400         spd = VppIpsecSpd(self, 1)
401         spd.add_vpp_config()
402
403         spd_binding = VppIpsecSpdItfBinding(self, spd, self.pg0)
404         spd_binding.add_vpp_config()
405
406         spd_entry = VppIpsecSpdEntry(
407             self,
408             spd,
409             1,
410             self.pg0.local_ip4,
411             self.pg0.local_ip4,
412             self.pg0.remote_ip4,
413             self.pg0.remote_ip4,
414             socket.IPPROTO_ESP,
415             policy=VppEnum.vl_api_ipsec_spd_action_t.IPSEC_API_SPD_ACTION_PROTECT,
416             is_outbound=0,
417         ).add_vpp_config()
418
419         # the inner packet we are trying to match
420         inner_pkt = (
421             IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
422             / TCP(sport=1234, dport=4321)
423             / Raw(b"\xa5" * 100)
424         )
425         pkt = self.__gen_encrypt_pkt(scapy_sa_in, inner_pkt)
426
427         # self.add_trace_filter("l3 ip4 src", f"l3 ip4 src {self.pg0.local_ip4}")
428
429         self.add_trace_filter(
430             "l2 none l3 ip4 src proto l4 dst_port",
431             f"l2 none l3 ip4 src {self.pg1.local_ip4} proto 6 l4 dst_port 4321",
432             tn_feature_intfc_index=self.pg0.sw_if_index,
433         )
434
435         self.logger.info("Sending packet with matching inner")
436         self.send_and_expect(self.pg0, pkt * 67, self.pg1, trace=False)
437         self.assert_hits(67)
438         self.clear()
439
440         self.logger.info("Sending packet with wrong inner port")
441         inner_pkt[TCP].dport = 1111
442         pkt = self.__gen_encrypt_pkt(scapy_sa_in, inner_pkt)
443         self.send_and_expect(self.pg0, pkt * 67, self.pg1, trace=False)
444         # the classify session should still have the 67 previous hits.
445         # In another way, the delta is 0
446         self.assert_hits(67)
447         self.clear()
448
449         self.logger.info("Sending packet with wrong source address")
450         inner_pkt[IP].src = "1.2.3.4"
451         inner_pkt[TCP].dport = 4321
452         pkt = self.__gen_encrypt_pkt(scapy_sa_in, inner_pkt)
453         self.send_and_expect(self.pg0, pkt * 67, self.pg1, trace=False)
454         self.assert_hits(67)
455         self.clear()
456
457         self.del_trace_filters(tn_feature_intfc_index=self.pg0.sw_if_index)
458
459         spd_entry.remove_vpp_config()
460         spd_binding.remove_vpp_config()
461         spd.remove_vpp_config()
462         vpp_sa_in.remove_vpp_config()
463         vpp_sa_out.remove_vpp_config()
464
465
466 if __name__ == "__main__":
467     unittest.main(testRunner=VppTestRunner)