7 from framework import VppTestCase
8 from asfframework import VppTestRunner
9 from vpp_papi import VppEnum
10 from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdItfBinding, VppIpsecSpdEntry
12 from scapy.contrib.geneve import GENEVE
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether
15 from scapy.layers.inet import IP, UDP, TCP
16 from scapy.layers.vxlan import VXLAN
17 from scapy.layers.ipsec import ESP, SecurityAssociation
18 from scapy.compat import raw
19 from scapy.utils import rdpcap
22 class TemplateTraceFilter(VppTestCase):
28 def tearDownClass(cls):
29 super().tearDownClass()
33 self.create_pg_interfaces(range(2))
34 self.pg0.generate_remote_hosts(11)
35 for i in self.pg_interfaces:
42 for i in self.pg_interfaces:
47 r = self.vapi.cli_return_response(cmd)
50 "reply '%s'" % r.reply
51 if hasattr(r, "reply")
52 else "retval '%s'" % r.retval
54 raise RuntimeError("cli command '%s' FAIL with %s" % (cmd, s))
57 # check number of hits for classifier
58 def assert_hits(self, n):
59 r = self.cli("show classify table verbose")
60 self.assertTrue(r.reply.find("hits %i" % n) != -1)
63 self.cli("clear trace")
65 def add_trace_filter(self, mask, match):
66 self.cli("classify filter trace mask %s match %s" % (mask, match))
68 self.cli("trace add pg-input 1000 filter")
70 def del_trace_filters(self):
71 self.cli("classify filter trace del")
72 r = self.cli("show classify filter")
73 s = "packet tracer: first table none"
74 self.assertTrue(r.reply.find(s) != -1)
76 def del_pcap_filters(self):
77 self.cli("classify filter pcap del")
78 r = self.cli("show classify filter")
79 s = "pcap rx/tx/drop: first table none"
80 self.assertTrue(r.reply.find(s) != -1)
82 # install a classify rule, inject traffic and check for hits
83 def assert_classify(self, mask, match, packets, n=None):
84 self.add_trace_filter("hex %s" % mask, "hex %s" % match)
85 self.send_and_expect(self.pg0, packets, self.pg1, trace=False)
86 self.assert_hits(n if n is not None else len(packets))
87 self.del_trace_filters()
90 class TestTracefilter(TemplateTraceFilter):
91 """Packet Tracer Filter Test"""
94 """Packet Tracer Filter Test"""
95 self.add_trace_filter(
96 "l3 ip4 src", "l3 ip4 src %s" % self.pg0.remote_hosts[5].ip4
98 self.add_trace_filter(
99 "l3 ip4 proto l4 src_port", "l3 ip4 proto 17 l4 src_port 2345"
101 # the packet we are trying to match
104 src = self.pg0.remote_hosts[i % len(self.pg0.remote_hosts)].ip4
107 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
108 / IP(src=src, dst=self.pg1.remote_ip4)
109 / UDP(sport=1234, dport=2345)
116 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
117 / IP(src=self.pg0.remote_hosts[0].ip4, dst=self.pg1.remote_ip4)
118 / UDP(sport=2345, dport=1234)
123 self.send_and_expect(self.pg0, p, self.pg1, trace=False)
125 # Check for 9 and 17 classifier hits, which is the right answer
129 self.del_trace_filters()
131 def test_encap(self):
132 """Packet Tracer Filter Test with encap"""
134 # the packet we are trying to match
136 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
137 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
145 / IP(src="192.168.4.167")
151 # compute filter mask & value
152 # we compute it by XOR'ing a template packet with a modified packet
153 # we need to set checksums to 0 to make sure scapy will not recompute
166 / IP(src="0.0.0.0", chksum=0)
171 tmpl[GENEVE].vni = 0xFFFFFF
172 user = tmpl[GENEVE].payload
173 user[IP].src = "255.255.255.255"
175 mask = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
177 # this does not match (wrong vni)
179 user = tmpl[GENEVE].payload
180 user[IP].src = "192.168.4.167"
182 match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
183 self.assert_classify(mask, match, [p] * 11, 0)
186 tmpl[GENEVE].vni = 1234
188 match = "".join(("{:02x}".format(o ^ n) for o, n in zip(ori, new)))
189 self.assert_classify(mask, match, [p] * 17)
192 """Packet Capture Filter Test"""
194 "classify filter pcap mask l3 ip4 src match l3 ip4 src %s"
195 % self.pg0.remote_hosts[5].ip4
198 "classify filter pcap "
199 "mask l3 ip4 proto l4 src_port "
200 "match l3 ip4 proto 17 l4 src_port 2345"
203 "pcap trace rx tx max 1000 intfc pg0 "
204 "file vpp_test_trace_filter_test_pcap.pcap filter"
206 # the packet we are trying to match
209 src = self.pg0.remote_hosts[i % len(self.pg0.remote_hosts)].ip4
212 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
213 / IP(src=src, dst=self.pg1.remote_ip4)
214 / UDP(sport=1234, dport=2345)
221 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
222 / IP(src=self.pg0.remote_hosts[0].ip4, dst=self.pg1.remote_ip4)
223 / UDP(sport=2345, dport=1234)
228 self.send_and_expect(self.pg0, p, self.pg1, trace=False)
230 # Check for 9 and 17 classifier hits, which is the right answer
234 self.cli("pcap trace rx tx off")
235 self.del_pcap_filters()
237 # check captured pcap
238 pcap = rdpcap("/tmp/vpp_test_trace_filter_test_pcap.pcap")
239 self.assertEqual(len(pcap), 9 + 17)
242 self.assertEqual(str(pcap[i]), p_)
244 for i in range(9, 9 + 17):
245 self.assertEqual(str(pcap[i]), p_)
247 def test_pcap_drop(self):
248 """Drop Packet Capture Filter Test"""
250 "pcap trace drop max 1000 "
251 "error {ip4-udp-lookup}.{no_listener} "
252 "file vpp_test_trace_filter_test_pcap_drop.pcap"
254 # the packet we are trying to match
257 # this packet should be forwarded
260 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
261 / IP(src=self.pg0.remote_hosts[0].ip4, dst=self.pg1.remote_ip4)
262 / UDP(sport=2345, dport=1234)
266 # this packet should be captured (no listener)
269 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
270 / IP(src=self.pg0.remote_hosts[0].ip4, dst=self.pg0.local_ip4)
271 / UDP(sport=2345, dport=1234)
275 # this packet will be blackholed but not captured
278 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
279 / IP(src=self.pg0.remote_hosts[0].ip4, dst="0.0.0.0")
280 / UDP(sport=2345, dport=1234)
285 self.send_and_expect(self.pg0, p, self.pg1, n_rx=17, trace=False)
287 self.cli("pcap trace drop off")
289 # check captured pcap
290 pcap = rdpcap("/tmp/vpp_test_trace_filter_test_pcap_drop.pcap")
291 self.assertEqual(len(pcap), 17)
294 class TestTraceFilterInner(TemplateTraceFilter):
295 """Packet Tracer Filter Inner Test"""
297 extra_vpp_plugin_config = [
298 "plugin tracenode_plugin.so {enable}",
301 def add_trace_filter(self, mask, match, tn_feature_intfc_index=None):
302 if tn_feature_intfc_index is not None:
303 self.logger.info("fffff")
304 self.vapi.tracenode_enable_disable(sw_if_index=tn_feature_intfc_index)
305 super().add_trace_filter(mask, match)
307 def del_trace_filters(self, tn_feature_intfc_index=None):
308 if tn_feature_intfc_index is not None:
309 self.vapi.tracenode_enable_disable(
310 sw_if_index=tn_feature_intfc_index, enable=False
312 super().del_trace_filters()
314 def __add_sa(self, id_, tun_src, tun_dst):
315 # AES-CTR-128 / SHA2-256
316 crypto_key_length = 16
318 integ_key_lenght = 16
319 crypto_key = secrets.token_bytes(crypto_key_length)
320 salt = secrets.randbits(salt_length * 8)
321 integ_key = secrets.token_bytes(integ_key_lenght)
323 flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
325 vpp_sa_in = VppIpsecSA(
329 integ_alg=VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128,
331 crypto_alg=VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CTR_128,
332 crypto_key=crypto_key,
333 proto=VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP,
341 vpp_sa_in.add_vpp_config()
343 scapy_sa_in = SecurityAssociation(
346 crypt_algo="AES-CTR",
347 crypt_key=crypto_key + salt.to_bytes(salt_length, "big"),
348 auth_algo="SHA2-256-128",
350 tunnel_header=IP(src=tun_src, dst=tun_dst),
351 nat_t_header=UDP(sport=4500, dport=4500),
356 vpp_sa_out = VppIpsecSA(
360 integ_alg=VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128,
362 crypto_alg=VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CTR_128,
363 crypto_key=crypto_key,
364 proto=VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP,
372 vpp_sa_out.add_vpp_config()
374 scapy_sa_out = SecurityAssociation(
377 crypt_algo="AES-CTR",
378 crypt_key=crypto_key + salt.to_bytes(salt_length, "big"),
379 auth_algo="SHA2-256-128",
381 tunnel_header=IP(src=tun_dst, dst=tun_src),
382 nat_t_header=UDP(sport=4500, dport=4500),
385 return vpp_sa_in, scapy_sa_in, vpp_sa_out, scapy_sa_out
387 def __gen_encrypt_pkt(self, scapy_sa, pkt):
389 src=self.pg0.local_mac, dst=self.pg0.remote_mac
390 ) / scapy_sa.encrypt(pkt)
392 def test_encrypted_encap(self):
393 """Packet Tracer Filter Test with encrypted encap"""
395 vpp_sa_in, scapy_sa_in, vpp_sa_out, _ = self.__add_sa(
396 1, self.pg0.local_ip4, self.pg0.remote_ip4
399 spd = VppIpsecSpd(self, 1)
402 spd_binding = VppIpsecSpdItfBinding(self, spd, self.pg0)
403 spd_binding.add_vpp_config()
405 spd_entry = VppIpsecSpdEntry(
414 policy=VppEnum.vl_api_ipsec_spd_action_t.IPSEC_API_SPD_ACTION_PROTECT,
418 # the inner packet we are trying to match
420 IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
421 / TCP(sport=1234, dport=4321)
424 pkt = self.__gen_encrypt_pkt(scapy_sa_in, inner_pkt)
426 # self.add_trace_filter("l3 ip4 src", f"l3 ip4 src {self.pg0.local_ip4}")
428 self.add_trace_filter(
429 "l2 none l3 ip4 src proto l4 dst_port",
430 f"l2 none l3 ip4 src {self.pg1.local_ip4} proto 6 l4 dst_port 4321",
431 tn_feature_intfc_index=self.pg0.sw_if_index,
434 self.logger.info("Sending packet with matching inner")
435 self.send_and_expect(self.pg0, pkt * 67, self.pg1, trace=False)
439 self.logger.info("Sending packet with wrong inner port")
440 inner_pkt[TCP].dport = 1111
441 pkt = self.__gen_encrypt_pkt(scapy_sa_in, inner_pkt)
442 self.send_and_expect(self.pg0, pkt * 67, self.pg1, trace=False)
443 # the classify session should still have the 67 previous hits.
444 # In another way, the delta is 0
448 self.logger.info("Sending packet with wrong source address")
449 inner_pkt[IP].src = "1.2.3.4"
450 inner_pkt[TCP].dport = 4321
451 pkt = self.__gen_encrypt_pkt(scapy_sa_in, inner_pkt)
452 self.send_and_expect(self.pg0, pkt * 67, self.pg1, trace=False)
456 self.del_trace_filters(tn_feature_intfc_index=self.pg0.sw_if_index)
458 spd_entry.remove_vpp_config()
459 spd_binding.remove_vpp_config()
460 spd.remove_vpp_config()
461 vpp_sa_in.remove_vpp_config()
462 vpp_sa_out.remove_vpp_config()
465 if __name__ == "__main__":
466 unittest.main(testRunner=VppTestRunner)