7 from framework import VppTestCase, VppTestRunner
8 from vpp_ipip_tun_interface import VppIpIpTunInterface
9 from vpp_papi import VppEnum
10 from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdItfBinding, VppIpsecSpdEntry
11 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto
13 from scapy.contrib.geneve import GENEVE
14 from scapy.packet import Raw
15 from scapy.layers.l2 import Ether
16 from scapy.layers.inet import IP, UDP, TCP
17 from scapy.layers.vxlan import VXLAN
18 from scapy.layers.ipsec import ESP, SecurityAssociation
19 from scapy.compat import raw
20 from scapy.utils import rdpcap
23 class TemplateTraceFilter(VppTestCase):
29 def tearDownClass(cls):
30 super().tearDownClass()
34 self.create_pg_interfaces(range(2))
35 self.pg0.generate_remote_hosts(11)
36 for i in self.pg_interfaces:
43 for i in self.pg_interfaces:
48 r = self.vapi.cli_return_response(cmd)
51 "reply '%s'" % r.reply
52 if hasattr(r, "reply")
53 else "retval '%s'" % r.retval
55 raise RuntimeError("cli command '%s' FAIL with %s" % (cmd, s))
58 # check number of hits for classifier
59 def assert_hits(self, n):
60 r = self.cli("show classify table verbose")
61 self.assertTrue(r.reply.find("hits %i" % n) != -1)
64 self.cli("clear trace")
66 def add_trace_filter(self, mask, match):
67 self.cli("classify filter trace mask %s match %s" % (mask, match))
69 self.cli("trace add pg-input 1000 filter")
71 def del_trace_filters(self):
72 self.cli("classify filter trace del")
73 r = self.cli("show classify filter")
74 s = "packet tracer: first table none"
75 self.assertTrue(r.reply.find(s) != -1)
77 def del_pcap_filters(self):
78 self.cli("classify filter pcap del")
79 r = self.cli("show classify filter")
80 s = "pcap rx/tx/drop: first table none"
81 self.assertTrue(r.reply.find(s) != -1)
83 # install a classify rule, inject traffic and check for hits
84 def assert_classify(self, mask, match, packets, n=None):
85 self.add_trace_filter("hex %s" % mask, "hex %s" % match)
86 self.send_and_expect(self.pg0, packets, self.pg1, trace=False)
87 self.assert_hits(n if n is not None else len(packets))
88 self.del_trace_filters()
91 class TestTracefilter(TemplateTraceFilter):
92 """Packet Tracer Filter Test"""
95 """Packet Tracer Filter Test"""
96 self.add_trace_filter(
97 "l3 ip4 src", "l3 ip4 src %s" % self.pg0.remote_hosts[5].ip4
99 self.add_trace_filter(
100 "l3 ip4 proto l4 src_port", "l3 ip4 proto 17 l4 src_port 2345"
102 # the packet we are trying to match
105 src = self.pg0.remote_hosts[i % len(self.pg0.remote_hosts)].ip4
108 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
109 / IP(src=src, dst=self.pg1.remote_ip4)
110 / UDP(sport=1234, dport=2345)
117 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
118 / IP(src=self.pg0.remote_hosts[0].ip4, dst=self.pg1.remote_ip4)
119 / UDP(sport=2345, dport=1234)
124 self.send_and_expect(self.pg0, p, self.pg1, trace=False)
126 # Check for 9 and 17 classifier hits, which is the right answer
130 self.del_trace_filters()
132 def test_encap(self):
133 """Packet Tracer Filter Test with encap"""
135 # the packet we are trying to match
137 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
138 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
146 / IP(src="192.168.4.167")
152 # compute filter mask & value
153 # we compute it by XOR'ing a template packet with a modified packet
154 # we need to set checksums to 0 to make sure scapy will not recompute
167 / IP(src="0.0.0.0", chksum=0)
172 tmpl[GENEVE].vni = 0xFFFFFF
173 user = tmpl[GENEVE].payload
174 user[IP].src = "255.255.255.255"
176 mask = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
178 # this does not match (wrong vni)
180 user = tmpl[GENEVE].payload
181 user[IP].src = "192.168.4.167"
183 match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
184 self.assert_classify(mask, match, [p] * 11, 0)
187 tmpl[GENEVE].vni = 1234
189 match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
190 self.assert_classify(mask, match, [p] * 17)
193 """Packet Capture Filter Test"""
195 "classify filter pcap mask l3 ip4 src match l3 ip4 src %s"
196 % self.pg0.remote_hosts[5].ip4
199 "classify filter pcap "
200 "mask l3 ip4 proto l4 src_port "
201 "match l3 ip4 proto 17 l4 src_port 2345"
204 "pcap trace rx tx max 1000 intfc pg0 "
205 "file vpp_test_trace_filter_test_pcap.pcap filter"
207 # the packet we are trying to match
210 src = self.pg0.remote_hosts[i % len(self.pg0.remote_hosts)].ip4
213 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
214 / IP(src=src, dst=self.pg1.remote_ip4)
215 / UDP(sport=1234, dport=2345)
222 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
223 / IP(src=self.pg0.remote_hosts[0].ip4, dst=self.pg1.remote_ip4)
224 / UDP(sport=2345, dport=1234)
229 self.send_and_expect(self.pg0, p, self.pg1, trace=False)
231 # Check for 9 and 17 classifier hits, which is the right answer
235 self.cli("pcap trace rx tx off")
236 self.del_pcap_filters()
238 # check captured pcap
239 pcap = rdpcap("/tmp/vpp_test_trace_filter_test_pcap.pcap")
240 self.assertEqual(len(pcap), 9 + 17)
243 self.assertEqual(str(pcap[i]), p_)
245 for i in range(9, 9 + 17):
246 self.assertEqual(str(pcap[i]), p_)
248 def test_pcap_drop(self):
249 """Drop Packet Capture Filter Test"""
251 "pcap trace drop max 1000 "
252 "error {ip4-udp-lookup}.{no_listener} "
253 "file vpp_test_trace_filter_test_pcap_drop.pcap"
255 # the packet we are trying to match
258 # this packet should be forwarded
261 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
262 / IP(src=self.pg0.remote_hosts[0].ip4, dst=self.pg1.remote_ip4)
263 / UDP(sport=2345, dport=1234)
267 # this packet should be captured (no listener)
270 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
271 / IP(src=self.pg0.remote_hosts[0].ip4, dst=self.pg0.local_ip4)
272 / UDP(sport=2345, dport=1234)
276 # this packet will be blackholed but not captured
279 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
280 / IP(src=self.pg0.remote_hosts[0].ip4, dst="0.0.0.0")
281 / UDP(sport=2345, dport=1234)
286 self.send_and_expect(self.pg0, p, self.pg1, n_rx=17, trace=False)
288 self.cli("pcap trace drop off")
290 # check captured pcap
291 pcap = rdpcap("/tmp/vpp_test_trace_filter_test_pcap_drop.pcap")
292 self.assertEqual(len(pcap), 17)
295 class TestTraceFilterInner(TemplateTraceFilter):
296 """Packet Tracer Filter Inner Test"""
298 extra_vpp_plugin_config = [
299 "plugin tracenode_plugin.so {enable}",
302 def add_trace_filter(self, mask, match, tn_feature_intfc_index=None):
303 if tn_feature_intfc_index is not None:
304 self.logger.info("fffff")
305 self.vapi.tracenode_enable_disable(sw_if_index=tn_feature_intfc_index)
306 super().add_trace_filter(mask, match)
308 def del_trace_filters(self, tn_feature_intfc_index=None):
309 if tn_feature_intfc_index is not None:
310 self.vapi.tracenode_enable_disable(
311 sw_if_index=tn_feature_intfc_index, enable=False
313 super().del_trace_filters()
315 def __add_sa(self, id_, tun_src, tun_dst):
316 # AES-CTR-128 / SHA2-256
317 crypto_key_length = 16
319 integ_key_lenght = 16
320 crypto_key = secrets.token_bytes(crypto_key_length)
321 salt = secrets.randbits(salt_length * 8)
322 integ_key = secrets.token_bytes(integ_key_lenght)
324 flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
326 vpp_sa_in = VppIpsecSA(
330 integ_alg=VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128,
332 crypto_alg=VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CTR_128,
333 crypto_key=crypto_key,
334 proto=VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP,
342 vpp_sa_in.add_vpp_config()
344 scapy_sa_in = SecurityAssociation(
347 crypt_algo="AES-CTR",
348 crypt_key=crypto_key + salt.to_bytes(salt_length, "big"),
349 auth_algo="SHA2-256-128",
351 tunnel_header=IP(src=tun_src, dst=tun_dst),
352 nat_t_header=UDP(sport=4500, dport=4500),
357 vpp_sa_out = VppIpsecSA(
361 integ_alg=VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128,
363 crypto_alg=VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CTR_128,
364 crypto_key=crypto_key,
365 proto=VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP,
373 vpp_sa_out.add_vpp_config()
375 scapy_sa_out = SecurityAssociation(
378 crypt_algo="AES-CTR",
379 crypt_key=crypto_key + salt.to_bytes(salt_length, "big"),
380 auth_algo="SHA2-256-128",
382 tunnel_header=IP(src=tun_dst, dst=tun_src),
383 nat_t_header=UDP(sport=4500, dport=4500),
386 return vpp_sa_in, scapy_sa_in, vpp_sa_out, scapy_sa_out
388 def __gen_encrypt_pkt(self, scapy_sa, pkt):
390 src=self.pg0.local_mac, dst=self.pg0.remote_mac
391 ) / scapy_sa.encrypt(pkt)
393 def test_encrypted_encap(self):
394 """Packet Tracer Filter Test with encrypted encap"""
396 vpp_sa_in, scapy_sa_in, vpp_sa_out, _ = self.__add_sa(
397 1, self.pg0.local_ip4, self.pg0.remote_ip4
400 spd = VppIpsecSpd(self, 1)
403 spd_binding = VppIpsecSpdItfBinding(self, spd, self.pg0)
404 spd_binding.add_vpp_config()
406 spd_entry = VppIpsecSpdEntry(
415 policy=VppEnum.vl_api_ipsec_spd_action_t.IPSEC_API_SPD_ACTION_PROTECT,
419 # the inner packet we are trying to match
421 IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
422 / TCP(sport=1234, dport=4321)
425 pkt = self.__gen_encrypt_pkt(scapy_sa_in, inner_pkt)
427 # self.add_trace_filter("l3 ip4 src", f"l3 ip4 src {self.pg0.local_ip4}")
429 self.add_trace_filter(
430 "l2 none l3 ip4 src proto l4 dst_port",
431 f"l2 none l3 ip4 src {self.pg1.local_ip4} proto 6 l4 dst_port 4321",
432 tn_feature_intfc_index=self.pg0.sw_if_index,
435 self.logger.info("Sending packet with matching inner")
436 self.send_and_expect(self.pg0, pkt * 67, self.pg1, trace=False)
440 self.logger.info("Sending packet with wrong inner port")
441 inner_pkt[TCP].dport = 1111
442 pkt = self.__gen_encrypt_pkt(scapy_sa_in, inner_pkt)
443 self.send_and_expect(self.pg0, pkt * 67, self.pg1, trace=False)
444 # the classify session should still have the 67 previous hits.
445 # In another way, the delta is 0
449 self.logger.info("Sending packet with wrong source address")
450 inner_pkt[IP].src = "1.2.3.4"
451 inner_pkt[TCP].dport = 4321
452 pkt = self.__gen_encrypt_pkt(scapy_sa_in, inner_pkt)
453 self.send_and_expect(self.pg0, pkt * 67, self.pg1, trace=False)
457 self.del_trace_filters(tn_feature_intfc_index=self.pg0.sw_if_index)
459 spd_entry.remove_vpp_config()
460 spd_binding.remove_vpp_config()
461 spd.remove_vpp_config()
462 vpp_sa_in.remove_vpp_config()
463 vpp_sa_out.remove_vpp_config()
466 if __name__ == "__main__":
467 unittest.main(testRunner=VppTestRunner)