5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import raw, Raw
9 from scapy.layers.inet6 import (
18 from framework import VppTestCase
19 from asfframework import VppTestRunner
20 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
21 from vpp_papi import VppEnum
23 from vpp_ipsec import VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding
24 from ipaddress import ip_address
29 class IPsecIPv4Params:
30 addr_type = socket.AF_INET
32 addr_bcast = "255.255.255.255"
37 self.remote_tun_if_host = "1.1.1.1"
38 self.remote_tun_if_host6 = "1111::1"
40 self.scapy_tun_sa_id = 100
41 self.scapy_tun_spi = 1000
42 self.vpp_tun_sa_id = 200
43 self.vpp_tun_spi = 2000
45 self.scapy_tra_sa_id = 300
46 self.scapy_tra_spi = 3000
47 self.vpp_tra_sa_id = 400
48 self.vpp_tra_spi = 4000
50 self.outer_hop_limit = 64
51 self.inner_hop_limit = 255
52 self.outer_flow_label = 0
53 self.inner_flow_label = 0x12345
55 self.anti_replay_window_size = 64
57 self.auth_algo_vpp_id = (
58 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
60 self.auth_algo = "HMAC-SHA1-96" # scapy name
61 self.auth_key = b"C91KUR9GYMm5GfkEvNjX"
63 self.crypt_algo_vpp_id = (
64 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
66 self.crypt_algo = "AES-CBC" # scapy name
67 self.crypt_key = b"JPjyOWBeVEQiMe7h"
70 self.nat_header = None
72 VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
75 self.async_mode = False
78 class IPsecIPv6Params:
79 addr_type = socket.AF_INET6
81 addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
86 self.remote_tun_if_host = "1111:1111:1111:1111:1111:1111:1111:1111"
87 self.remote_tun_if_host4 = "1.1.1.1"
89 self.scapy_tun_sa_id = 500
90 self.scapy_tun_spi = 3001
91 self.vpp_tun_sa_id = 600
92 self.vpp_tun_spi = 3000
94 self.scapy_tra_sa_id = 700
95 self.scapy_tra_spi = 4001
96 self.vpp_tra_sa_id = 800
97 self.vpp_tra_spi = 4000
99 self.outer_hop_limit = 64
100 self.inner_hop_limit = 255
101 self.outer_flow_label = 0
102 self.inner_flow_label = 0x12345
104 self.anti_replay_window_size = 64
106 self.auth_algo_vpp_id = (
107 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
109 self.auth_algo = "HMAC-SHA1-96" # scapy name
110 self.auth_key = b"C91KUR9GYMm5GfkEvNjX"
112 self.crypt_algo_vpp_id = (
113 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
115 self.crypt_algo = "AES-CBC" # scapy name
116 self.crypt_key = b"JPjyOWBeVEQiMe7h"
119 self.nat_header = None
121 VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
124 self.async_mode = False
127 def mk_scapy_crypt_key(p):
128 if p.crypt_algo in ("AES-GCM", "AES-CTR", "AES-NULL-GMAC"):
129 return p.crypt_key + struct.pack("!I", p.salt)
134 def config_tun_params(p, encryption_type, tun_if):
135 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
137 p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
139 p.tun_dst = tun_if.remote_addr[p.addr_type]
140 p.tun_src = tun_if.local_addr[p.addr_type]
141 crypt_key = mk_scapy_crypt_key(p)
142 p.scapy_tun_sa = SecurityAssociation(
145 crypt_algo=p.crypt_algo,
147 auth_algo=p.auth_algo,
149 tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
150 nat_t_header=p.nat_header,
153 p.vpp_tun_sa = SecurityAssociation(
156 crypt_algo=p.crypt_algo,
158 auth_algo=p.auth_algo,
160 tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
161 nat_t_header=p.nat_header,
166 def config_tra_params(p, encryption_type):
168 p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
170 crypt_key = mk_scapy_crypt_key(p)
171 p.scapy_tra_sa = SecurityAssociation(
174 crypt_algo=p.crypt_algo,
176 auth_algo=p.auth_algo,
178 nat_t_header=p.nat_header,
181 p.vpp_tra_sa = SecurityAssociation(
184 crypt_algo=p.crypt_algo,
186 auth_algo=p.auth_algo,
188 nat_t_header=p.nat_header,
193 class TemplateIpsec(VppTestCase):
198 |tra_if| <-------> |VPP|
203 ------ encrypt --- plain ---
204 |tun_if| <------- |VPP| <------ |pg1|
207 ------ decrypt --- plain ---
208 |tun_if| -------> |VPP| ------> |pg1|
215 def ipsec_select_backend(self):
216 """empty method to be overloaded when necessary"""
221 super(TemplateIpsec, cls).setUpClass()
224 def tearDownClass(cls):
225 super(TemplateIpsec, cls).tearDownClass()
227 def setup_params(self):
228 if not hasattr(self, "ipv4_params"):
229 self.ipv4_params = IPsecIPv4Params()
230 if not hasattr(self, "ipv6_params"):
231 self.ipv6_params = IPsecIPv6Params()
233 self.ipv4_params.addr_type: self.ipv4_params,
234 self.ipv6_params.addr_type: self.ipv6_params,
237 def config_interfaces(self):
238 self.create_pg_interfaces(range(3))
239 self.interfaces = list(self.pg_interfaces)
240 for i in self.interfaces:
248 super(TemplateIpsec, self).setUp()
252 self.vpp_esp_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP
253 self.vpp_ah_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_AH
255 self.config_interfaces()
257 self.ipsec_select_backend()
259 def unconfig_interfaces(self):
260 for i in self.interfaces:
266 super(TemplateIpsec, self).tearDown()
268 self.unconfig_interfaces()
270 def show_commands_at_teardown(self):
271 self.logger.info(self.vapi.cli("show hardware"))
273 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
275 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
276 / sa.encrypt(IP(src=src, dst=dst) / ICMP() / Raw(b"X" * payload_size))
277 for i in range(count)
280 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
282 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
284 IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
285 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
287 for i in range(count)
290 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
292 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
293 / IP(src=src, dst=dst)
295 / Raw(b"X" * payload_size)
296 for i in range(count)
299 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
301 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
302 / IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
303 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
304 for i in range(count)
308 class IpsecTcp(object):
309 def verify_tcp_checksum(self):
310 # start http cli server listener on http://0.0.0.0:80
311 self.vapi.cli("http cli server")
312 p = self.params[socket.AF_INET]
314 src=self.tun_if.remote_mac, dst=self.tun_if.local_mac
315 ) / p.scapy_tun_sa.encrypt(
316 IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
317 / TCP(flags="S", dport=80)
319 self.logger.debug(ppp("Sending packet:", send))
320 recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
322 decrypted = p.vpp_tun_sa.decrypt(recv[IP])
323 self.assert_packet_checksums_valid(decrypted)
326 class IpsecTcpTests(IpsecTcp):
327 def test_tcp_checksum(self):
328 """verify checksum correctness for vpp generated packets"""
329 self.verify_tcp_checksum()
332 class IpsecTra4(object):
333 """verify methods for Transport v4"""
335 def get_replay_counts(self, p):
336 replay_node_name = "/err/%s/replay" % self.tra4_decrypt_node_name[0]
337 count = self.statistics.get_err_counter(replay_node_name)
340 replay_post_node_name = (
341 "/err/%s/replay" % self.tra4_decrypt_node_name[p.async_mode]
343 count += self.statistics.get_err_counter(replay_post_node_name)
347 def get_hash_failed_counts(self, p):
348 if ESP == self.encryption_type and p.crypt_algo in ("AES-GCM", "AES-NULL-GMAC"):
349 hash_failed_node_name = (
350 "/err/%s/decryption_failed" % self.tra4_decrypt_node_name[p.async_mode]
353 hash_failed_node_name = (
354 "/err/%s/integ_error" % self.tra4_decrypt_node_name[p.async_mode]
356 count = self.statistics.get_err_counter(hash_failed_node_name)
359 count += self.statistics.get_err_counter("/err/crypto-dispatch/bad-hmac")
363 def verify_hi_seq_num(self):
364 p = self.params[socket.AF_INET]
365 saf = VppEnum.vl_api_ipsec_sad_flags_t
366 esn_on = p.vpp_tra_sa.esn_en
367 ar_on = p.flags & saf.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
369 seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
370 replay_count = self.get_replay_counts(p)
371 hash_failed_count = self.get_hash_failed_counts(p)
372 seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
374 # a few packets so we get the rx seq number above the window size and
375 # thus can simulate a wrap with an out of window packet
378 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
379 / p.scapy_tra_sa.encrypt(
380 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
384 for seq in range(63, 80)
386 recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
388 # these 4 packets will all choose seq-num 0 to decrpyt since none
389 # are out of window when first checked. however, once #200 has
390 # decrypted it will move the window to 200 and has #81 is out of
391 # window. this packet should be dropped.
394 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
395 / p.scapy_tra_sa.encrypt(
396 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
401 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
402 / p.scapy_tra_sa.encrypt(
403 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
408 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
409 / p.scapy_tra_sa.encrypt(
410 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
415 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
416 / p.scapy_tra_sa.encrypt(
417 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
423 # if anti-replay is off then we won't drop #81
424 n_rx = 3 if ar_on else 4
425 self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx)
426 # this packet is one before the wrap
429 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
430 / p.scapy_tra_sa.encrypt(
431 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
436 recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
438 # a replayed packet, then an out of window, then a legit
439 # tests that a early failure on the batch doesn't affect subsequent packets.
442 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
443 / p.scapy_tra_sa.encrypt(
444 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
449 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
450 / p.scapy_tra_sa.encrypt(
451 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
456 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
457 / p.scapy_tra_sa.encrypt(
458 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
463 n_rx = 1 if ar_on else 3
464 recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx)
466 # move the window over half way to a wrap
469 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
470 / p.scapy_tra_sa.encrypt(
471 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
476 recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
478 # anti-replay will drop old packets, no anti-replay will not
481 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
482 / p.scapy_tra_sa.encrypt(
483 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
490 self.send_and_assert_no_replies(self.tra_if, pkts)
492 recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
496 # validate wrapping the ESN
499 # wrap scapy's TX SA SN
500 p.scapy_tra_sa.seq_num = 0x100000005
502 # send a packet that wraps the window for both AR and no AR
505 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
506 / p.scapy_tra_sa.encrypt(
507 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
514 rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
516 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
518 # move the window forward to half way to the next wrap
521 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
522 / p.scapy_tra_sa.encrypt(
523 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
530 rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
532 # a packet less than 2^30 from the current position is:
533 # - AR: out of window and dropped
537 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
538 / p.scapy_tra_sa.encrypt(
539 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
547 self.send_and_assert_no_replies(self.tra_if, pkts)
549 self.send_and_expect(self.tra_if, pkts, self.tra_if)
551 # a packet more than 2^30 from the current position is:
552 # - AR: out of window and dropped
553 # - non-AR: considered a wrap, but since it's not a wrap
554 # it won't decrpyt and so will be dropped
557 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
558 / p.scapy_tra_sa.encrypt(
559 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
566 self.send_and_assert_no_replies(self.tra_if, pkts)
568 # a packet less than 2^30 from the current position and is a
569 # wrap; (the seq is currently at 0x180000005).
570 # - AR: out of window so considered a wrap, so accepted
571 # - non-AR: not considered a wrap, so won't decrypt
572 p.scapy_tra_sa.seq_num = 0x260000005
575 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
576 / p.scapy_tra_sa.encrypt(
577 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
584 self.send_and_expect(self.tra_if, pkts, self.tra_if)
586 self.send_and_assert_no_replies(self.tra_if, pkts)
589 # window positions are different now for AR/non-AR
590 # move non-AR forward
593 # a packet more than 2^30 from the current position and is a
594 # wrap; (the seq is currently at 0x180000005).
596 # - non-AR: not considered a wrap, so won't decrypt
600 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
601 / p.scapy_tra_sa.encrypt(
602 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
608 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
609 / p.scapy_tra_sa.encrypt(
610 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
616 self.send_and_expect(self.tra_if, pkts, self.tra_if)
620 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
621 / p.scapy_tra_sa.encrypt(
622 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
628 self.send_and_expect(self.tra_if, pkts, self.tra_if)
630 def verify_tra_anti_replay(self):
631 p = self.params[socket.AF_INET]
632 esn_en = p.vpp_tra_sa.esn_en
633 anti_replay_window_size = p.anti_replay_window_size
635 seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
636 replay_count = self.get_replay_counts(p)
637 initial_sa_node_replay_diff = replay_count - p.tra_sa_in.get_err("replay")
638 hash_failed_count = self.get_hash_failed_counts(p)
639 seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
640 initial_sa_node_cycled_diff = seq_cycle_count - p.tra_sa_in.get_err(
643 hash_err = "integ_error"
645 if ESP == self.encryption_type:
646 undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
647 undersize_count = self.statistics.get_err_counter(undersize_node_name)
648 initial_sa_node_undersize_diff = undersize_count - p.tra_sa_in.get_err(
651 # For AES-GCM an error in the hash is reported as a decryption failure
652 if p.crypt_algo in ("AES-GCM", "AES-NULL-GMAC"):
653 hash_err = "decryption_failed"
654 # In async mode, we don't report errors in the hash.
658 initial_sa_node_hash_diff = hash_failed_count - p.tra_sa_in.get_err(
663 # send packets with seq numbers 1->34
664 # this means the window size is still in Case B (see RFC4303
667 # for reasons i haven't investigated Scapy won't create a packet with
672 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
673 / p.scapy_tra_sa.encrypt(
674 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
678 for seq in range(1, 34)
680 recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
682 # replayed packets are dropped
683 self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
684 replay_count += len(pkts)
685 self.assertEqual(self.get_replay_counts(p), replay_count)
686 err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
687 self.assertEqual(err, replay_count)
690 # now send a batch of packets all with the same sequence number
691 # the first packet in the batch is legitimate, the rest bogus
693 self.vapi.cli("clear error")
694 self.vapi.cli("clear node counters")
696 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
697 ) / p.scapy_tra_sa.encrypt(
698 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
701 recv_pkts = self.send_and_expect(self.tra_if, pkts * 8, self.tra_if, n_rx=1)
703 self.assertEqual(self.get_replay_counts(p), replay_count)
704 err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
705 self.assertEqual(err, replay_count)
708 # now move the window over to anti_replay_window_size + 100 and into Case A
710 self.vapi.cli("clear error")
712 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
713 ) / p.scapy_tra_sa.encrypt(
714 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
715 seq_num=anti_replay_window_size + 100,
717 recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
719 self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
721 # replayed packets are dropped
722 self.send_and_assert_no_replies(self.tra_if, pkt * 3, timeout=0.2)
724 self.assertEqual(self.get_replay_counts(p), replay_count)
725 err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
726 self.assertEqual(err, replay_count)
728 # the window size is anti_replay_window_size packets
729 # in window are still accepted
731 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
732 ) / p.scapy_tra_sa.encrypt(
733 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
737 # a packet that does not decrypt does not move the window forward
738 bogus_sa = SecurityAssociation(
739 self.encryption_type,
741 crypt_algo=p.crypt_algo,
742 crypt_key=mk_scapy_crypt_key(p)[::-1],
743 auth_algo=p.auth_algo,
744 auth_key=p.auth_key[::-1],
747 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
748 ) / bogus_sa.encrypt(
749 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
750 seq_num=anti_replay_window_size + 200,
752 self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
754 hash_failed_count += 17
755 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
757 err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
758 self.assertEqual(err, hash_failed_count)
760 # a malformed 'runt' packet
761 # created by a mis-constructed SA
762 if ESP == self.encryption_type and p.crypt_algo != "NULL":
763 bogus_sa = SecurityAssociation(self.encryption_type, p.scapy_tra_spi)
765 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
766 ) / bogus_sa.encrypt(
767 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
768 seq_num=anti_replay_window_size + 200,
770 self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
772 undersize_count += 17
773 self.assert_error_counter_equal(undersize_node_name, undersize_count)
774 err = p.tra_sa_in.get_err("runt") + initial_sa_node_undersize_diff
775 self.assertEqual(err, undersize_count)
777 # which we can determine since this packet is still in the window
779 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
780 ) / p.scapy_tra_sa.encrypt(
781 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
784 self.send_and_expect(self.tra_if, [pkt], self.tra_if)
787 # out of window are dropped
788 # this is Case B. So VPP will consider this to be a high seq num wrap
789 # and so the decrypt attempt will fail
792 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
793 ) / p.scapy_tra_sa.encrypt(
794 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
797 self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
800 # an out of window error with ESN looks like a high sequence
801 # wrap. but since it isn't then the verify will fail.
802 hash_failed_count += 17
803 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
805 err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
806 self.assertEqual(err, hash_failed_count)
810 self.assertEqual(self.get_replay_counts(p), replay_count)
811 err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
812 self.assertEqual(err, replay_count)
814 # valid packet moves the window over to anti_replay_window_size + 258
816 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
817 ) / p.scapy_tra_sa.encrypt(
818 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
819 seq_num=anti_replay_window_size + 258,
821 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
822 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
825 # move VPP's SA TX seq-num to just before the seq-number wrap.
826 # then fire in a packet that VPP should drop on TX because it
827 # causes the TX seq number to wrap; unless we're using extened sequence
830 self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.vpp_tra_sa_id)
831 self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
832 self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
836 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
837 / p.scapy_tra_sa.encrypt(
838 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
842 for seq in range(259, 280)
846 rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
849 # in order for scapy to decrypt its SA's high order number needs
852 p.vpp_tra_sa.seq_num = 0x100000000
854 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
857 # wrap scapy's TX high sequence number. VPP is in case B, so it
858 # will consider this a high seq wrap also.
859 # The low seq num we set it to will place VPP's RX window in Case A
861 p.scapy_tra_sa.seq_num = 0x100000005
863 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
864 ) / p.scapy_tra_sa.encrypt(
865 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
868 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
870 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
873 # A packet that has seq num between (2^32-anti_replay_window_size)+4 and 5 is within
876 p.scapy_tra_sa.seq_num = 0xFFFFFFFD
878 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
879 ) / p.scapy_tra_sa.encrypt(
880 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
883 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
884 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
887 # While in case A we cannot wrap the high sequence number again
888 # because VPP will consider this packet to be one that moves the
892 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
893 ) / p.scapy_tra_sa.encrypt(
894 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
897 self.send_and_assert_no_replies(
898 self.tra_if, [pkt], self.tra_if, timeout=0.2
901 hash_failed_count += 1
902 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
904 err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
905 self.assertEqual(err, hash_failed_count)
908 # but if we move the window forward to case B, then we can wrap
911 p.scapy_tra_sa.seq_num = 0x100000000 + anti_replay_window_size + 0x555
913 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
914 ) / p.scapy_tra_sa.encrypt(
915 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
916 seq_num=p.scapy_tra_sa.seq_num,
918 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
919 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
921 p.scapy_tra_sa.seq_num = 0x200000444
923 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
924 ) / p.scapy_tra_sa.encrypt(
925 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
928 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
929 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
933 # without ESN TX sequence numbers can't wrap and packets are
934 # dropped from here on out.
936 self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
937 seq_cycle_count += len(pkts)
938 self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count)
939 err = p.tra_sa_out.get_err("seq_cycled") + initial_sa_node_cycled_diff
940 self.assertEqual(err, seq_cycle_count)
942 # move the security-associations seq number on to the last we used
943 self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
944 p.scapy_tra_sa.seq_num = 351
945 p.vpp_tra_sa.seq_num = 351
947 def verify_tra_lost(self):
948 p = self.params[socket.AF_INET]
949 esn_en = p.vpp_tra_sa.esn_en
952 # send packets with seq numbers 1->34
953 # this means the window size is still in Case B (see RFC4303
956 # for reasons i haven't investigated Scapy won't create a packet with
961 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
962 / p.scapy_tra_sa.encrypt(
963 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
967 for seq in range(1, 3)
969 self.send_and_expect(self.tra_if, pkts, self.tra_if)
971 self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
973 # skip a sequence number
976 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
977 / p.scapy_tra_sa.encrypt(
978 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
982 for seq in range(4, 6)
984 self.send_and_expect(self.tra_if, pkts, self.tra_if)
986 self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
988 # the lost packet are counted untill we get up past the first
989 # sizeof(replay_window) packets
992 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
993 / p.scapy_tra_sa.encrypt(
994 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
998 for seq in range(6, 100)
1000 self.send_and_expect(self.tra_if, pkts, self.tra_if)
1002 self.assertEqual(p.tra_sa_in.get_err("lost"), 1)
1004 # lost of holes in the sequence
1007 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1008 / p.scapy_tra_sa.encrypt(
1009 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1013 for seq in range(100, 200, 2)
1015 self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=50)
1019 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1020 / p.scapy_tra_sa.encrypt(
1021 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1025 for seq in range(200, 300)
1027 self.send_and_expect(self.tra_if, pkts, self.tra_if)
1029 self.assertEqual(p.tra_sa_in.get_err("lost"), 51)
1031 # a big hole in the seq number space
1034 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1035 / p.scapy_tra_sa.encrypt(
1036 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1040 for seq in range(400, 500)
1042 self.send_and_expect(self.tra_if, pkts, self.tra_if)
1044 self.assertEqual(p.tra_sa_in.get_err("lost"), 151)
1046 def verify_tra_basic4(self, count=1, payload_size=54):
1047 """ipsec v4 transport basic test"""
1048 self.vapi.cli("clear errors")
1049 self.vapi.cli("clear ipsec sa")
1051 p = self.params[socket.AF_INET]
1052 send_pkts = self.gen_encrypt_pkts(
1056 src=self.tra_if.remote_ip4,
1057 dst=self.tra_if.local_ip4,
1059 payload_size=payload_size,
1061 recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
1062 for rx in recv_pkts:
1063 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
1064 self.assert_packet_checksums_valid(rx)
1066 decrypted = p.vpp_tra_sa.decrypt(rx[IP])
1067 self.assert_packet_checksums_valid(decrypted)
1069 self.logger.debug(ppp("Unexpected packet:", rx))
1072 self.logger.info(self.vapi.ppcli("show error"))
1073 self.logger.info(self.vapi.ppcli("show ipsec all"))
1075 pkts = p.tra_sa_in.get_stats()["packets"]
1077 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1079 pkts = p.tra_sa_out.get_stats()["packets"]
1081 pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
1083 self.assertEqual(p.tra_sa_out.get_err("lost"), 0)
1084 self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
1086 self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
1087 self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count)
1089 def _verify_tra_anti_replay_algorithm_esn(self):
1090 def seq_num(seqh, seql):
1091 return (seqh << 32) | (seql & 0xFFFF_FFFF)
1093 p = self.params[socket.AF_INET]
1094 anti_replay_window_size = p.anti_replay_window_size
1096 seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
1097 replay_count = self.get_replay_counts(p)
1098 hash_failed_count = self.get_hash_failed_counts(p)
1099 seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
1101 if ESP == self.encryption_type:
1102 undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
1103 undersize_count = self.statistics.get_err_counter(undersize_node_name)
1105 # reset the TX SA to avoid conflict with left configuration
1106 self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0")
1109 RFC 4303 Appendix A2. Case A
1112 a-i: possible seq num received
1114 [BT]l(sign) = [BT]l (sign) 2^32 mod 2^32 (Th inc/dec-remented by 1)
1117 --|--a--+---b---+-c--|--d--+---e---+-f--|--g--+---h---+--i-|--
1118 ========= ========= =========
1119 Bl- Tl- Bl Tl Bl+ Tl+
1121 Case A implies Tl >= W - 1
1125 Tl = anti_replay_window_size + 40
1126 Bl = Tl - anti_replay_window_size + 1
1128 # move VPP's RX AR window to Case A
1129 self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
1130 p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
1134 - pre-crypto check: algorithm predicts that the packet wrap the window
1136 - integrity check: should fail
1137 - post-crypto check: ...
1141 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1142 / p.scapy_tra_sa.encrypt(
1143 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1147 for seq in range(seq_num(Th - 1, Bl - 20), seq_num(Th - 1, Bl - 5))
1150 # out-of-window packets fail integrity check
1151 self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1152 hash_failed_count += len(pkts)
1153 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1156 case b: Bl <= Seql <= Tl
1157 - pre-crypto check: algorithm predicts that the packet is in the window
1159 -> check for a replayed packet with Seql
1160 - integrity check: should fail
1161 - post-crypto check: ...
1165 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1166 / p.scapy_tra_sa.encrypt(
1167 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1171 for seq in range(seq_num(Th, Tl - 10), seq_num(Th, Tl - 5))
1173 self.send_and_expect(self.tra_if, pkts, self.tra_if)
1175 p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl)
1178 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1179 / p.scapy_tra_sa.encrypt(
1180 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1184 for seq in range(seq_num(Th - 1, Tl - 35), seq_num(Th - 1, Tl - 5))
1187 self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1189 # some packets are rejected by the pre-crypto check
1191 self.assertEqual(self.get_replay_counts(p), replay_count)
1193 # out-of-window packets fail integrity check
1194 hash_failed_count += len(pkts) - 5
1195 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1199 - pre-crypto check: algorithm predicts that the packet does not wrap the window
1201 - integrity check: should fail
1202 - post-crypto check: ...
1206 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1207 / p.scapy_tra_sa.encrypt(
1208 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1212 for seq in range(seq_num(Th - 1, Tl + 5), seq_num(Th - 1, Tl + 20))
1215 self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1217 # out-of-window packets fail integrity check
1218 hash_failed_count += len(pkts)
1219 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1223 - pre-crypto check: algorithm predicts that the packet wrap the window
1225 - integrity check: should fail
1226 - post-crypto check: ...
1228 p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
1231 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1232 / p.scapy_tra_sa.encrypt(
1233 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1237 for seq in range(seq_num(Th, Bl - 20), seq_num(Th, Bl - 5))
1240 # out-of-window packets fail integrity check
1241 self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1242 hash_failed_count += len(pkts)
1243 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1246 case e: Bl <= Seql <= Tl
1247 - pre-crypto check: algorithm predicts that the packet is in the window
1249 -> check for a replayed packet with Seql
1250 - integrity check: should pass
1251 - post-crypto check: should pass
1252 -> Seql is marked in the AR window
1256 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1257 / p.scapy_tra_sa.encrypt(
1258 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1262 for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 30))
1265 self.send_and_expect(self.tra_if, pkts, self.tra_if)
1269 - pre-crypto check: algorithm predicts that the packet does not wrap the window
1271 - integrity check: should pass
1272 - post-crypto check: should pass
1273 -> AR window shift (the window stays Case A)
1274 -> Seql is marked in the AR window
1278 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1279 / p.scapy_tra_sa.encrypt(
1280 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1284 for seq in range(seq_num(Th, Tl + 50), seq_num(Th, Tl + 60))
1287 self.send_and_expect(self.tra_if, pkts, self.tra_if)
1291 - pre-crypto check: algorithm predicts that the packet wrap the window
1293 - integrity check: should pass
1294 - post-crypto check: should pass
1295 -> AR window shift (may set the window in Case B)
1296 -> Seql is marked in the AR window
1298 p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
1301 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1302 / p.scapy_tra_sa.encrypt(
1303 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1307 # set the window in Case B (the minimum window size is 64
1308 # so we are sure to overlap)
1309 for seq in range(seq_num(Th + 1, 10), seq_num(Th + 1, 20))
1312 self.send_and_expect(self.tra_if, pkts, self.tra_if)
1314 # reset the VPP's RX AR window to Case A
1316 Tl = 2 * anti_replay_window_size + 40
1317 Bl = Tl - anti_replay_window_size + 1
1319 self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
1321 p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
1324 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1325 / p.scapy_tra_sa.encrypt(
1326 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1330 # the AR will stay in Case A
1332 seq_num(Th + 1, anti_replay_window_size + 10),
1333 seq_num(Th + 1, anti_replay_window_size + 20),
1337 self.send_and_expect(self.tra_if, pkts, self.tra_if)
1340 case h: Bl <= Seql <= Tl
1341 - pre-crypto check: algorithm predicts that the packet is in the window
1343 -> check for a replayed packet with Seql
1344 - integrity check: the wrap is not detected, should fail
1345 - post-crypto check: ...
1348 Tl = anti_replay_window_size + 20
1349 Bl = Tl - anti_replay_window_size + 1
1351 p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
1355 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1356 / p.scapy_tra_sa.encrypt(
1357 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1361 for seq in range(seq_num(Th + 1, Tl - 20), seq_num(Th + 1, Tl - 5))
1364 self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1366 # some packets are rejected by the pre-crypto check
1368 self.assertEqual(self.get_replay_counts(p), replay_count)
1370 # out-of-window packets fail integrity check
1371 hash_failed_count += len(pkts) - 5
1372 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1376 - pre-crypto check: algorithm predicts that the packet does not wrap the window
1378 - integrity check: the wrap is not detected, shoud fail
1379 - post-crypto check: ...
1383 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1384 / p.scapy_tra_sa.encrypt(
1385 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1389 for seq in range(seq_num(Th + 1, Tl + 5), seq_num(Th + 1, Tl + 15))
1392 # out-of-window packets fail integrity check
1393 self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1394 hash_failed_count += len(pkts)
1395 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1398 RFC 4303 Appendix A2. Case B
1401 ----|-a-+-----b----+--c--|-d-+----e-----+--f--|-g-+--h---
1402 ========= =========== ===========
1405 Case B implies Tl < W - 1
1408 # reset the VPP's RX AR window to Case B
1410 Tl = 30 # minimum window size of 64, we are sure to overlap
1411 Bl = (Tl - anti_replay_window_size + 1) % (1 << 32)
1413 self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
1414 p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
1417 case a: Seql <= Tl < Bl
1418 - pre-crypto check: algorithm predicts that the packet is in the window
1420 -> check for replayed packet
1421 - integrity check: should fail
1422 - post-crypto check: ...
1426 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1427 / p.scapy_tra_sa.encrypt(
1428 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1432 for seq in range(seq_num(Th, 5), seq_num(Th, 10))
1435 self.send_and_expect(self.tra_if, pkts, self.tra_if)
1437 p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl)
1440 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1441 / p.scapy_tra_sa.encrypt(
1442 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1446 for seq in range(seq_num(Th - 1, 0), seq_num(Th - 1, 15))
1449 self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1451 # some packets are rejected by the pre-crypto check
1453 self.assertEqual(self.get_replay_counts(p), replay_count)
1455 # out-of-window packets fail integrity check
1456 hash_failed_count += len(pkts) - 5
1457 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1460 case b: Tl < Seql < Bl
1461 - pre-crypto check: algorithm predicts that the packet will shift the window
1463 - integrity check: should fail
1464 - post-crypto check: ...
1468 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1469 / p.scapy_tra_sa.encrypt(
1470 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1474 for seq in range(seq_num(Th - 1, Tl + 10), seq_num(Th - 1, Tl + 20))
1477 self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1479 # out-of-window packets fail integrity check
1480 hash_failed_count += len(pkts)
1481 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1484 case c: Tl < Bl <= Seql
1485 - pre-crypto check: algorithm predicts that the packet is in the window
1487 -> check for a replayed packet with Seql
1488 - integrity check: should pass
1489 - post-crypto check: should pass
1490 -> Seql is marked in the AR window
1494 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1495 / p.scapy_tra_sa.encrypt(
1496 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1500 for seq in range(seq_num(Th - 1, Bl + 10), seq_num(Th - 1, Bl + 20))
1503 self.send_and_expect(self.tra_if, pkts, self.tra_if)
1506 case d: Seql <= Tl < Bl
1507 - pre-crypto check: algorithm predicts that the packet is the window
1509 -> check for replayed packet
1510 - integrity check: should pass
1511 - post-crypto check: should pass
1512 -> Seql is marked in the AR window
1514 p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
1517 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1518 / p.scapy_tra_sa.encrypt(
1519 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1523 for seq in range(seq_num(Th, 15), seq_num(Th, 25))
1526 self.send_and_expect(self.tra_if, pkts, self.tra_if)
1529 case e: Tl < Seql < Bl
1530 - pre-crypto check: algorithm predicts that the packet is in the window
1532 -> check for a replayed packet with Seql
1533 - integrity check: should pass
1534 - post-crypto check: should pass
1535 -> AR window shift (may set the window in Case A)
1536 -> Seql is marked in the AR window
1540 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1541 / p.scapy_tra_sa.encrypt(
1542 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1546 for seq in range(seq_num(Th, Tl + 5), seq_num(Th, Tl + 15))
1549 # the window stays in Case B
1550 self.send_and_expect(self.tra_if, pkts, self.tra_if)
1554 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1555 / p.scapy_tra_sa.encrypt(
1556 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1561 seq_num(Th, Tl + anti_replay_window_size + 5),
1562 seq_num(Th, Tl + anti_replay_window_size + 15),
1566 # the window moves to Case A
1567 self.send_and_expect(self.tra_if, pkts, self.tra_if)
1569 # reset the VPP's RX AR window to Case B
1571 Tl = 30 # minimum window size of 64, we are sure to overlap
1572 Bl = (Tl - anti_replay_window_size + 1) % (1 << 32)
1574 self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
1575 p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
1578 case f: Tl < Bl <= Seql
1579 - pre-crypto check: algorithm predicts that the packet is in the previous window
1581 -> check for a replayed packet with Seql
1582 - integrity check: should fail
1583 - post-crypto check: ...
1587 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1588 / p.scapy_tra_sa.encrypt(
1589 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1593 for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 20))
1596 self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1598 # out-of-window packets fail integrity check
1599 hash_failed_count += len(pkts)
1600 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1603 case g: Seql <= Tl < Bl
1604 - pre-crypto check: algorithm predicts that the packet is the window
1606 -> check for replayed packet
1607 - integrity check: should fail
1608 - post-crypto check: ...
1612 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1613 / p.scapy_tra_sa.encrypt(
1614 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1618 for seq in range(seq_num(Th, 10), seq_num(Th, 15))
1621 self.send_and_expect(self.tra_if, pkts, self.tra_if)
1623 p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
1626 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1627 / p.scapy_tra_sa.encrypt(
1628 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1632 for seq in range(seq_num(Th + 1, 0), seq_num(Th + 1, 15))
1635 self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1637 # some packets are rejected by the pre-crypto check
1639 self.assertEqual(self.get_replay_counts(p), replay_count)
1641 # out-of-window packets fail integrity check
1642 hash_failed_count += len(pkts) - 5
1643 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1646 case h: Tl < Seql < Bl
1647 - pre-crypto check: algorithm predicts that the packet will shift the window
1649 - integrity check: should fail
1650 - post-crypto check: ...
1654 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1655 / p.scapy_tra_sa.encrypt(
1656 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1660 for seq in range(seq_num(Th + 1, Tl + 10), seq_num(Th + 1, Tl + 20))
1663 self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1665 # out-of-window packets fail integrity check
1666 hash_failed_count += len(pkts)
1667 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1669 def _verify_tra_anti_replay_algorithm_no_esn(self):
1671 return seql & 0xFFFF_FFFF
1673 p = self.params[socket.AF_INET]
1674 anti_replay_window_size = p.anti_replay_window_size
1676 seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
1677 replay_count = self.get_replay_counts(p)
1678 hash_failed_count = self.get_hash_failed_counts(p)
1679 seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
1681 if ESP == self.encryption_type:
1682 undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
1683 undersize_count = self.statistics.get_err_counter(undersize_node_name)
1685 # reset the TX SA to avoid conflict with left configuration
1686 self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0")
1689 RFC 4303 Appendix A2. Case A
1691 a-c: possible seq num received
1694 |--a--+---b---+-c--|
1698 No ESN implies Th = 0
1699 Case A implies Tl >= W - 1
1702 Tl = anti_replay_window_size + 40
1703 Bl = Tl - anti_replay_window_size + 1
1705 # move VPP's RX AR window to Case A
1706 self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}")
1707 p.scapy_tra_sa.seq_num = seq_num(Tl)
1711 - pre-crypto check: algorithm predicts that the packet is out of window
1712 -> packet should be dropped
1713 - integrity check: ...
1714 - post-crypto check: ...
1718 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1719 / p.scapy_tra_sa.encrypt(
1720 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1724 for seq in range(seq_num(Bl - 20), seq_num(Bl - 5))
1727 # out-of-window packets
1728 self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1729 replay_count += len(pkts)
1730 self.assertEqual(self.get_replay_counts(p), replay_count)
1733 case b: Bl <= Seql <= Tl
1734 - pre-crypto check: algorithm predicts that the packet is in the window
1735 -> check for a replayed packet with Seql
1736 - integrity check: should pass
1737 - post-crypto check:
1738 -> check for a replayed packet with Seql
1742 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1743 / p.scapy_tra_sa.encrypt(
1744 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1748 for seq in range(seq_num(Tl - 50), seq_num(Tl - 30))
1750 self.send_and_expect(self.tra_if, pkts, self.tra_if)
1754 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1755 / p.scapy_tra_sa.encrypt(
1756 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1760 for seq in range(seq_num(Tl - 35), seq_num(Tl - 30))
1763 self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1767 self.assertEqual(self.get_replay_counts(p), replay_count)
1771 - pre-crypto check: algorithm predicts that the packet will shift the window
1772 - integrity check: should pass
1773 - post-crypto check: should pass
1774 -> AR window is shifted
1778 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1779 / p.scapy_tra_sa.encrypt(
1780 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1784 for seq in range(seq_num(Tl + 5), seq_num(Tl + 20))
1787 self.send_and_expect(self.tra_if, pkts, self.tra_if)
1790 RFC 4303 Appendix A2. Case B
1792 |-a-----+------b-----|
1796 Case B implies Tl < W - 1
1799 # reset the VPP's RX AR window to Case B
1800 Tl = 30 # minimum window size of 64, we are sure to overlap
1801 Bl = seq_num(Tl - anti_replay_window_size + 1)
1803 self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}")
1806 case a: Seql <= Tl < Bl
1807 - pre-crypto check: algorithm predicts that the packet is in the window
1808 -> check for replayed packet
1809 - integrity check: should fail
1810 - post-crypto check: ...
1814 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1815 / p.scapy_tra_sa.encrypt(
1816 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1820 for seq in range(seq_num(5), seq_num(10))
1823 self.send_and_expect(self.tra_if, pkts, self.tra_if)
1826 case b: Tl < Seql < Bl
1827 - pre-crypto check: algorithm predicts that the packet will shift the window
1828 - integrity check: should pass
1829 - post-crypto check: should pass
1830 -> AR window is shifted
1834 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1835 / p.scapy_tra_sa.encrypt(
1836 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1840 for seq in range(seq_num(-50), seq_num(-20))
1843 self.send_and_expect(self.tra_if, pkts, self.tra_if)
1845 def verify_tra_anti_replay_algorithm(self):
1846 if self.params[socket.AF_INET].vpp_tra_sa.esn_en:
1847 self._verify_tra_anti_replay_algorithm_esn()
1849 self._verify_tra_anti_replay_algorithm_no_esn()
1852 class IpsecTra4Tests(IpsecTra4):
1853 """UT test methods for Transport v4"""
1855 def test_tra_anti_replay(self):
1856 """ipsec v4 transport anti-replay test"""
1857 self.verify_tra_anti_replay()
1859 def test_tra_anti_replay_algorithm(self):
1860 """ipsec v4 transport anti-replay algorithm test"""
1861 self.verify_tra_anti_replay_algorithm()
1863 def test_tra_lost(self):
1864 """ipsec v4 transport lost packet test"""
1865 self.verify_tra_lost()
1867 def test_tra_basic(self, count=1):
1868 """ipsec v4 transport basic test"""
1869 self.verify_tra_basic4(count=1)
1871 def test_tra_burst(self):
1872 """ipsec v4 transport burst test"""
1873 self.verify_tra_basic4(count=257)
1876 class IpsecTra6(object):
1877 """verify methods for Transport v6"""
1879 def verify_tra_basic6(self, count=1, payload_size=54):
1880 self.vapi.cli("clear errors")
1881 self.vapi.cli("clear ipsec sa")
1883 p = self.params[socket.AF_INET6]
1884 send_pkts = self.gen_encrypt_pkts6(
1888 src=self.tra_if.remote_ip6,
1889 dst=self.tra_if.local_ip6,
1891 payload_size=payload_size,
1893 recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
1894 for rx in recv_pkts:
1895 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen)
1897 decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
1898 self.assert_packet_checksums_valid(decrypted)
1900 self.logger.debug(ppp("Unexpected packet:", rx))
1903 self.logger.info(self.vapi.ppcli("show error"))
1904 self.logger.info(self.vapi.ppcli("show ipsec all"))
1906 pkts = p.tra_sa_in.get_stats()["packets"]
1908 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1910 pkts = p.tra_sa_out.get_stats()["packets"]
1912 pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
1914 self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
1915 self.assert_packet_counter_equal(self.tra6_decrypt_node_name[0], count)
1917 def gen_encrypt_pkts_ext_hdrs6(
1918 self, sa, sw_intf, src, dst, count=1, payload_size=54
1921 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1923 IPv6(src=src, dst=dst)
1924 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
1926 for i in range(count)
1929 def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
1931 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1932 / IPv6(src=src, dst=dst)
1933 / IPv6ExtHdrHopByHop()
1934 / IPv6ExtHdrFragment(id=2, offset=200)
1935 / Raw(b"\xff" * 200)
1936 for i in range(count)
1939 def verify_tra_encrypted6(self, p, sa, rxs):
1942 self.assert_packet_checksums_valid(rx)
1944 decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
1945 decrypted.append(decrypt_pkt)
1946 self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
1947 self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
1949 self.logger.debug(ppp("Unexpected packet:", rx))
1951 self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1957 def verify_tra_66_ext_hdrs(self, p):
1961 # check we can decrypt with options
1963 tx = self.gen_encrypt_pkts_ext_hdrs6(
1966 src=self.tra_if.remote_ip6,
1967 dst=self.tra_if.local_ip6,
1970 self.send_and_expect(self.tra_if, tx, self.tra_if)
1973 # injecting a packet from ourselves to be routed of box is a hack
1974 # but it matches an outbout policy, alors je ne regrette rien
1977 # one extension before ESP
1979 Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1980 / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1981 / IPv6ExtHdrFragment(id=2, offset=200)
1982 / Raw(b"\xff" * 200)
1985 rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
1986 dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
1989 # for reasons i'm not going to investigate scapy does not
1990 # created the correct headers after decrypt. but reparsing
1991 # the ipv6 packet fixes it
1992 dc = IPv6(raw(dc[IPv6]))
1993 self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
1995 # two extensions before ESP
1997 Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1998 / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1999 / IPv6ExtHdrHopByHop()
2000 / IPv6ExtHdrFragment(id=2, offset=200)
2001 / Raw(b"\xff" * 200)
2004 rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
2005 dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
2008 dc = IPv6(raw(dc[IPv6]))
2009 self.assertTrue(dc[IPv6ExtHdrHopByHop])
2010 self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
2012 # two extensions before ESP, one after
2014 Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
2015 / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
2016 / IPv6ExtHdrHopByHop()
2017 / IPv6ExtHdrFragment(id=2, offset=200)
2018 / IPv6ExtHdrDestOpt()
2019 / Raw(b"\xff" * 200)
2022 rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
2023 dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
2026 dc = IPv6(raw(dc[IPv6]))
2027 self.assertTrue(dc[IPv6ExtHdrDestOpt])
2028 self.assertTrue(dc[IPv6ExtHdrHopByHop])
2029 self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
2032 class IpsecTra6Tests(IpsecTra6):
2033 """UT test methods for Transport v6"""
2035 def test_tra_basic6(self):
2036 """ipsec v6 transport basic test"""
2037 self.verify_tra_basic6(count=1)
2039 def test_tra_burst6(self):
2040 """ipsec v6 transport burst test"""
2041 self.verify_tra_basic6(count=257)
2044 class IpsecTra6ExtTests(IpsecTra6):
2045 def test_tra_ext_hdrs_66(self):
2046 """ipsec 6o6 tra extension headers test"""
2047 self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
2050 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
2051 """UT test methods for Transport v6 and v4"""
2056 class IpsecTun4(object):
2057 """verify methods for Tunnel v4"""
2059 def verify_counters4(self, p, count, n_frags=None, worker=None):
2062 if hasattr(p, "spd_policy_in_any"):
2063 pkts = p.spd_policy_in_any.get_stats(worker)["packets"]
2067 "incorrect SPD any policy: expected %d != %d" % (count, pkts),
2070 if hasattr(p, "tun_sa_in"):
2071 pkts = p.tun_sa_in.get_stats(worker)["packets"]
2073 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
2075 pkts = p.tun_sa_out.get_stats(worker)["packets"]
2079 "incorrect SA out counts: expected %d != %d" % (count, pkts),
2082 self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
2083 self.assert_packet_counter_equal(self.tun4_decrypt_node_name[0], count)
2085 def verify_decrypted(self, p, rxs):
2087 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2088 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2089 self.assert_packet_checksums_valid(rx)
2091 def verify_esp_padding(self, sa, esp_payload, decrypt_pkt):
2092 align = sa.crypt_algo.block_size
2095 exp_len = (len(decrypt_pkt) + 2 + (align - 1)) & ~(align - 1)
2096 exp_len += sa.crypt_algo.iv_size
2097 exp_len += sa.crypt_algo.icv_size or sa.auth_algo.icv_size
2098 self.assertEqual(exp_len, len(esp_payload))
2100 def verify_encrypted(self, p, sa, rxs):
2104 self.assertEqual(rx[UDP].dport, p.nat_header.dport)
2105 self.assert_packet_checksums_valid(rx)
2106 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
2109 decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
2110 if not decrypt_pkt.haslayer(IP):
2111 decrypt_pkt = IP(decrypt_pkt[Raw].load)
2112 if rx_ip.proto == socket.IPPROTO_ESP:
2113 self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
2114 decrypt_pkts.append(decrypt_pkt)
2115 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
2116 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
2118 self.logger.debug(ppp("Unexpected packet:", rx))
2120 self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
2124 pkts = reassemble4(decrypt_pkts)
2126 self.assert_packet_checksums_valid(pkt)
2128 def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
2129 self.vapi.cli("clear errors")
2130 self.vapi.cli("clear ipsec counters")
2131 self.vapi.cli("clear ipsec sa")
2135 send_pkts = self.gen_encrypt_pkts(
2139 src=p.remote_tun_if_host,
2140 dst=self.pg1.remote_ip4,
2142 payload_size=payload_size,
2144 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
2145 self.verify_decrypted(p, recv_pkts)
2147 send_pkts = self.gen_pkts(
2149 src=self.pg1.remote_ip4,
2150 dst=p.remote_tun_if_host,
2152 payload_size=payload_size,
2154 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if, n_rx)
2155 self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
2157 for rx in recv_pkts:
2158 self.assertEqual(rx[IP].src, p.tun_src)
2159 self.assertEqual(rx[IP].dst, p.tun_dst)
2162 self.logger.info(self.vapi.ppcli("show error"))
2163 self.logger.info(self.vapi.ppcli("show ipsec all"))
2165 self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
2166 self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
2167 self.verify_counters4(p, count, n_rx)
2169 def verify_tun_dropped_44(self, p, count=1, payload_size=64, n_rx=None):
2170 self.vapi.cli("clear errors")
2174 send_pkts = self.gen_encrypt_pkts(
2178 src=p.remote_tun_if_host,
2179 dst=self.pg1.remote_ip4,
2182 self.send_and_assert_no_replies(self.tun_if, send_pkts)
2184 send_pkts = self.gen_pkts(
2186 src=self.pg1.remote_ip4,
2187 dst=p.remote_tun_if_host,
2189 payload_size=payload_size,
2191 self.send_and_assert_no_replies(self.pg1, send_pkts)
2194 self.logger.info(self.vapi.ppcli("show error"))
2195 self.logger.info(self.vapi.ppcli("show ipsec all"))
2197 def verify_tun_reass_44(self, p):
2198 self.vapi.cli("clear errors")
2199 self.vapi.ip_reassembly_enable_disable(
2200 sw_if_index=self.tun_if.sw_if_index, enable_ip4=True
2204 send_pkts = self.gen_encrypt_pkts(
2208 src=p.remote_tun_if_host,
2209 dst=self.pg1.remote_ip4,
2213 send_pkts = fragment_rfc791(send_pkts[0], 1400)
2214 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1)
2215 self.verify_decrypted(p, recv_pkts)
2217 send_pkts = self.gen_pkts(
2218 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host, count=1
2220 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
2221 self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
2224 self.logger.info(self.vapi.ppcli("show error"))
2225 self.logger.info(self.vapi.ppcli("show ipsec all"))
2227 self.verify_counters4(p, 1, 1)
2228 self.vapi.ip_reassembly_enable_disable(
2229 sw_if_index=self.tun_if.sw_if_index, enable_ip4=False
2232 def verify_tun_64(self, p, count=1):
2233 self.vapi.cli("clear errors")
2234 self.vapi.cli("clear ipsec sa")
2236 send_pkts = self.gen_encrypt_pkts6(
2240 src=p.remote_tun_if_host6,
2241 dst=self.pg1.remote_ip6,
2244 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
2245 for recv_pkt in recv_pkts:
2246 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
2247 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
2248 self.assert_packet_checksums_valid(recv_pkt)
2249 send_pkts = self.gen_pkts6(
2252 src=self.pg1.remote_ip6,
2253 dst=p.remote_tun_if_host6,
2256 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
2257 for recv_pkt in recv_pkts:
2259 decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
2260 if not decrypt_pkt.haslayer(IPv6):
2261 decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
2262 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
2263 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
2264 self.assert_packet_checksums_valid(decrypt_pkt)
2266 self.logger.error(ppp("Unexpected packet:", recv_pkt))
2268 self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
2273 self.logger.info(self.vapi.ppcli("show error"))
2274 self.logger.info(self.vapi.ppcli("show ipsec all"))
2276 self.verify_counters4(p, count)
2278 def verify_keepalive(self, p):
2279 # the sizeof Raw is calculated to pad to the minimum ehternet
2280 # frame size of 64 btyes
2282 Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
2283 / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
2284 / UDP(sport=333, dport=4500)
2288 self.send_and_assert_no_replies(self.tun_if, pkt * 31)
2289 self.assert_error_counter_equal(
2290 "/err/%s/nat_keepalive" % self.tun4_input_node, 31
2294 Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
2295 / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
2296 / UDP(sport=333, dport=4500)
2299 self.send_and_assert_no_replies(self.tun_if, pkt * 31)
2300 self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 31)
2303 Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
2304 / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
2305 / UDP(sport=333, dport=4500)
2309 self.send_and_assert_no_replies(self.tun_if, pkt * 31)
2310 self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 62)
2313 class IpsecTun4Tests(IpsecTun4):
2314 """UT test methods for Tunnel v4"""
2316 def test_tun_basic44(self):
2317 """ipsec 4o4 tunnel basic test"""
2318 self.verify_tun_44(self.params[socket.AF_INET], count=1)
2319 self.tun_if.admin_down()
2320 self.tun_if.resolve_arp()
2321 self.tun_if.admin_up()
2322 self.verify_tun_44(self.params[socket.AF_INET], count=1)
2324 def test_tun_reass_basic44(self):
2325 """ipsec 4o4 tunnel basic reassembly test"""
2326 self.verify_tun_reass_44(self.params[socket.AF_INET])
2328 def test_tun_burst44(self):
2329 """ipsec 4o4 tunnel burst test"""
2330 self.verify_tun_44(self.params[socket.AF_INET], count=127)
2333 class IpsecTun6(object):
2334 """verify methods for Tunnel v6"""
2336 def verify_counters6(self, p_in, p_out, count, worker=None):
2337 if hasattr(p_in, "tun_sa_in"):
2338 pkts = p_in.tun_sa_in.get_stats(worker)["packets"]
2340 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
2342 if hasattr(p_out, "tun_sa_out"):
2343 pkts = p_out.tun_sa_out.get_stats(worker)["packets"]
2347 "incorrect SA out counts: expected %d != %d" % (count, pkts),
2349 self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
2350 self.assert_packet_counter_equal(self.tun6_decrypt_node_name[0], count)
2352 def verify_decrypted6(self, p, rxs):
2354 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2355 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2356 self.assert_packet_checksums_valid(rx)
2358 def verify_encrypted6(self, p, sa, rxs):
2360 self.assert_packet_checksums_valid(rx)
2361 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen)
2362 self.assert_equal(rx[IPv6].hlim, p.outer_hop_limit)
2363 if p.outer_flow_label:
2364 self.assert_equal(rx[IPv6].fl, p.outer_flow_label)
2366 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
2367 if not decrypt_pkt.haslayer(IPv6):
2368 decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
2369 self.assert_packet_checksums_valid(decrypt_pkt)
2370 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
2371 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
2372 self.assert_equal(decrypt_pkt.hlim, p.inner_hop_limit - 1)
2373 self.assert_equal(decrypt_pkt.fl, p.inner_flow_label)
2375 self.logger.debug(ppp("Unexpected packet:", rx))
2377 self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
2382 def verify_drop_tun_tx_66(self, p_in, count=1, payload_size=64):
2383 self.vapi.cli("clear errors")
2384 self.vapi.cli("clear ipsec sa")
2386 send_pkts = self.gen_pkts6(
2389 src=self.pg1.remote_ip6,
2390 dst=p_in.remote_tun_if_host,
2392 payload_size=payload_size,
2394 self.send_and_assert_no_replies(self.tun_if, send_pkts)
2395 self.logger.info(self.vapi.cli("sh punt stats"))
2397 def verify_drop_tun_rx_66(self, p_in, count=1, payload_size=64):
2398 self.vapi.cli("clear errors")
2399 self.vapi.cli("clear ipsec sa")
2401 send_pkts = self.gen_encrypt_pkts6(
2405 src=p_in.remote_tun_if_host,
2406 dst=self.pg1.remote_ip6,
2409 self.send_and_assert_no_replies(self.tun_if, send_pkts)
2411 def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
2412 self.verify_drop_tun_tx_66(p_in, count=count, payload_size=payload_size)
2413 self.verify_drop_tun_rx_66(p_in, count=count, payload_size=payload_size)
2415 def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
2416 self.vapi.cli("clear errors")
2417 self.vapi.cli("clear ipsec sa")
2421 send_pkts = self.gen_encrypt_pkts6(
2425 src=p_in.remote_tun_if_host,
2426 dst=self.pg1.remote_ip6,
2428 payload_size=payload_size,
2430 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
2431 self.verify_decrypted6(p_in, recv_pkts)
2433 send_pkts = self.gen_pkts6(
2436 src=self.pg1.remote_ip6,
2437 dst=p_out.remote_tun_if_host,
2439 payload_size=payload_size,
2441 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
2442 self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
2444 for rx in recv_pkts:
2445 self.assertEqual(rx[IPv6].src, p_out.tun_src)
2446 self.assertEqual(rx[IPv6].dst, p_out.tun_dst)
2449 self.logger.info(self.vapi.ppcli("show error"))
2450 self.logger.info(self.vapi.ppcli("show ipsec all"))
2451 self.verify_counters6(p_in, p_out, count)
2453 def verify_tun_reass_66(self, p):
2454 self.vapi.cli("clear errors")
2455 self.vapi.ip_reassembly_enable_disable(
2456 sw_if_index=self.tun_if.sw_if_index, enable_ip6=True
2460 send_pkts = self.gen_encrypt_pkts6(
2464 src=p.remote_tun_if_host,
2465 dst=self.pg1.remote_ip6,
2469 send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
2470 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1)
2471 self.verify_decrypted6(p, recv_pkts)
2473 send_pkts = self.gen_pkts6(
2476 src=self.pg1.remote_ip6,
2477 dst=p.remote_tun_if_host,
2481 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
2482 self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
2484 self.logger.info(self.vapi.ppcli("show error"))
2485 self.logger.info(self.vapi.ppcli("show ipsec all"))
2486 self.verify_counters6(p, p, 1)
2487 self.vapi.ip_reassembly_enable_disable(
2488 sw_if_index=self.tun_if.sw_if_index, enable_ip6=False
2491 def verify_tun_46(self, p, count=1):
2492 """ipsec 4o6 tunnel basic test"""
2493 self.vapi.cli("clear errors")
2494 self.vapi.cli("clear ipsec sa")
2496 send_pkts = self.gen_encrypt_pkts(
2500 src=p.remote_tun_if_host4,
2501 dst=self.pg1.remote_ip4,
2504 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
2505 for recv_pkt in recv_pkts:
2506 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
2507 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
2508 self.assert_packet_checksums_valid(recv_pkt)
2509 send_pkts = self.gen_pkts(
2511 src=self.pg1.remote_ip4,
2512 dst=p.remote_tun_if_host4,
2515 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
2516 for recv_pkt in recv_pkts:
2518 decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
2519 if not decrypt_pkt.haslayer(IP):
2520 decrypt_pkt = IP(decrypt_pkt[Raw].load)
2521 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
2522 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
2523 self.assert_packet_checksums_valid(decrypt_pkt)
2525 self.logger.debug(ppp("Unexpected packet:", recv_pkt))
2527 self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
2532 self.logger.info(self.vapi.ppcli("show error"))
2533 self.logger.info(self.vapi.ppcli("show ipsec all"))
2534 self.verify_counters6(p, p, count)
2536 def verify_keepalive(self, p):
2537 # the sizeof Raw is calculated to pad to the minimum ehternet
2538 # frame size of 64 btyes
2540 Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
2541 / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
2542 / UDP(sport=333, dport=4500)
2546 self.send_and_assert_no_replies(self.tun_if, pkt * 31)
2547 self.assert_error_counter_equal(
2548 "/err/%s/nat_keepalive" % self.tun6_input_node, 31
2552 Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
2553 / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
2554 / UDP(sport=333, dport=4500)
2557 self.send_and_assert_no_replies(self.tun_if, pkt * 31)
2558 self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 31)
2561 Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
2562 / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
2563 / UDP(sport=333, dport=4500)
2567 self.send_and_assert_no_replies(self.tun_if, pkt * 31)
2568 self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 62)
2571 class IpsecTun6Tests(IpsecTun6):
2572 """UT test methods for Tunnel v6"""
2574 def test_tun_basic66(self):
2575 """ipsec 6o6 tunnel basic test"""
2576 self.verify_tun_66(self.params[socket.AF_INET6], count=1)
2578 def test_tun_reass_basic66(self):
2579 """ipsec 6o6 tunnel basic reassembly test"""
2580 self.verify_tun_reass_66(self.params[socket.AF_INET6])
2582 def test_tun_burst66(self):
2583 """ipsec 6o6 tunnel burst test"""
2584 self.verify_tun_66(self.params[socket.AF_INET6], count=257)
2587 class IpsecTun6HandoffTests(IpsecTun6):
2588 """UT test methods for Tunnel v6 with multiple workers"""
2590 vpp_worker_count = 2
2592 def test_tun_handoff_66(self):
2593 """ipsec 6o6 tunnel worker hand-off test"""
2594 self.vapi.cli("clear errors")
2595 self.vapi.cli("clear ipsec sa")
2598 p = self.params[socket.AF_INET6]
2600 # inject alternately on worker 0 and 1. all counts on the SA
2601 # should be against worker 0
2602 for worker in [0, 1, 0, 1]:
2603 send_pkts = self.gen_encrypt_pkts6(
2607 src=p.remote_tun_if_host,
2608 dst=self.pg1.remote_ip6,
2611 recv_pkts = self.send_and_expect(
2612 self.tun_if, send_pkts, self.pg1, worker=worker
2614 self.verify_decrypted6(p, recv_pkts)
2616 send_pkts = self.gen_pkts6(
2619 src=self.pg1.remote_ip6,
2620 dst=p.remote_tun_if_host,
2623 recv_pkts = self.send_and_expect(
2624 self.pg1, send_pkts, self.tun_if, worker=worker
2626 self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
2628 # all counts against the first worker that was used
2629 self.verify_counters6(p, p, 4 * N_PKTS, worker=0)
2632 class IpsecTun4HandoffTests(IpsecTun4):
2633 """UT test methods for Tunnel v4 with multiple workers"""
2635 vpp_worker_count = 2
2637 def test_tun_handooff_44(self):
2638 """ipsec 4o4 tunnel worker hand-off test"""
2639 self.vapi.cli("clear errors")
2640 self.vapi.cli("clear ipsec sa")
2643 p = self.params[socket.AF_INET]
2645 # inject alternately on worker 0 and 1. all counts on the SA
2646 # should be against worker 0
2647 for worker in [0, 1, 0, 1]:
2648 send_pkts = self.gen_encrypt_pkts(
2652 src=p.remote_tun_if_host,
2653 dst=self.pg1.remote_ip4,
2656 recv_pkts = self.send_and_expect(
2657 self.tun_if, send_pkts, self.pg1, worker=worker
2659 self.verify_decrypted(p, recv_pkts)
2661 send_pkts = self.gen_pkts(
2663 src=self.pg1.remote_ip4,
2664 dst=p.remote_tun_if_host,
2667 recv_pkts = self.send_and_expect(
2668 self.pg1, send_pkts, self.tun_if, worker=worker
2670 self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
2672 # all counts against the first worker that was used
2673 self.verify_counters4(p, 4 * N_PKTS, worker=0)
2676 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
2677 """UT test methods for Tunnel v6 & v4"""
2682 class IPSecIPv4Fwd(VppTestCase):
2683 """Test IPSec by capturing and verifying IPv4 forwarded pkts"""
2686 def setUpConstants(cls):
2687 super(IPSecIPv4Fwd, cls).setUpConstants()
2690 super(IPSecIPv4Fwd, self).setUp()
2691 # store SPD objects so we can remove configs on tear down
2693 self.spd_policies = []
2696 # remove SPD policies
2697 for obj in self.spd_policies:
2698 obj.remove_vpp_config()
2699 self.spd_policies = []
2700 # remove SPD items (interface bindings first, then SPD)
2701 for obj in reversed(self.spd_objs):
2702 obj.remove_vpp_config()
2704 # close down pg intfs
2705 for pg in self.pg_interfaces:
2708 super(IPSecIPv4Fwd, self).tearDown()
2710 def create_interfaces(self, num_ifs=2):
2711 # create interfaces pg0 ... pg<num_ifs>
2712 self.create_pg_interfaces(range(num_ifs))
2713 for pg in self.pg_interfaces:
2714 # put the interface up
2716 # configure IPv4 address on the interface
2718 # resolve ARP, so that we know VPP MAC
2720 self.logger.info(self.vapi.ppcli("show int addr"))
2722 def spd_create_and_intf_add(self, spd_id, pg_list):
2723 spd = VppIpsecSpd(self, spd_id)
2724 spd.add_vpp_config()
2725 self.spd_objs.append(spd)
2727 spdItf = VppIpsecSpdItfBinding(self, spd, pg)
2728 spdItf.add_vpp_config()
2729 self.spd_objs.append(spdItf)
2731 def get_policy(self, policy_type):
2732 e = VppEnum.vl_api_ipsec_spd_action_t
2733 if policy_type == "protect":
2734 return e.IPSEC_API_SPD_ACTION_PROTECT
2735 elif policy_type == "bypass":
2736 return e.IPSEC_API_SPD_ACTION_BYPASS
2737 elif policy_type == "discard":
2738 return e.IPSEC_API_SPD_ACTION_DISCARD
2740 raise Exception("Invalid policy type: %s", policy_type)
2742 def spd_add_rem_policy(
2754 local_ip_start=ip_address("0.0.0.0"),
2755 local_ip_stop=ip_address("255.255.255.255"),
2756 remote_ip_start=ip_address("0.0.0.0"),
2757 remote_ip_stop=ip_address("255.255.255.255"),
2758 remote_port_start=0,
2759 remote_port_stop=65535,
2761 local_port_stop=65535,
2763 spd = VppIpsecSpd(self, spd_id)
2766 src_range_low = ip_address("0.0.0.0")
2767 src_range_high = ip_address("255.255.255.255")
2768 dst_range_low = ip_address("0.0.0.0")
2769 dst_range_high = ip_address("255.255.255.255")
2772 src_range_low = local_ip_start
2773 src_range_high = local_ip_stop
2774 dst_range_low = remote_ip_start
2775 dst_range_high = remote_ip_stop
2778 src_range_low = src_if.remote_ip4
2779 src_range_high = src_if.remote_ip4
2780 dst_range_low = dst_if.remote_ip4
2781 dst_range_high = dst_if.remote_ip4
2783 spdEntry = VppIpsecSpdEntry(
2793 policy=self.get_policy(policy_type),
2795 remote_port_start=remote_port_start,
2796 remote_port_stop=remote_port_stop,
2797 local_port_start=local_port_start,
2798 local_port_stop=local_port_stop,
2802 spdEntry.add_vpp_config()
2803 self.spd_policies.append(spdEntry)
2805 spdEntry.remove_vpp_config()
2806 self.spd_policies.remove(spdEntry)
2807 self.logger.info(self.vapi.ppcli("show ipsec all"))
2811 self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP"
2815 sa = SecurityAssociation(
2818 crypt_algo="AES-CBC",
2819 crypt_key=b"JPjyOWBeVEQiMe7h",
2820 auth_algo="HMAC-SHA1-96",
2821 auth_key=b"C91KUR9GYMm5GfkEvNjX",
2822 tunnel_header=IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
2823 nat_t_header=UDP(sport=src_prt, dport=dst_prt),
2825 for i in range(pkt_count):
2826 # create packet info stored in the test case instance
2827 info = self.create_packet_info(src_if, dst_if)
2828 # convert the info into packet payload
2829 payload = self.info_to_payload(info)
2830 # create the packet itself
2832 if proto == "UDP-ESP":
2833 p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) / sa.encrypt(
2834 IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
2835 / UDP(sport=src_prt, dport=dst_prt)
2838 elif proto == "UDP":
2840 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
2841 / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
2842 / UDP(sport=src_prt, dport=dst_prt)
2845 elif proto == "TCP":
2847 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
2848 / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
2849 / TCP(sport=src_prt, dport=dst_prt)
2852 # store a copy of the packet in the packet info
2853 info.data = p.copy()
2854 # append the packet to the list
2856 # return the created packet list
2859 def verify_capture(self, src_if, dst_if, capture):
2861 for packet in capture:
2865 # convert the payload to packet info object
2866 payload_info = self.payload_to_info(packet)
2867 # make sure the indexes match
2869 payload_info.src, src_if.sw_if_index, "source sw_if_index"
2872 payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
2874 packet_info = self.get_next_packet_info_for_interface2(
2875 src_if.sw_if_index, dst_if.sw_if_index, packet_info
2877 # make sure we didn't run out of saved packets
2878 self.assertIsNotNone(packet_info)
2880 payload_info.index, packet_info.index, "packet info index"
2882 saved_packet = packet_info.data # fetch the saved packet
2883 # assert the values match
2884 self.assert_equal(ip.src, saved_packet[IP].src, "IP source address")
2885 # ... more assertions here
2886 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
2887 except Exception as e:
2888 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2890 remaining_packet = self.get_next_packet_info_for_interface2(
2891 src_if.sw_if_index, dst_if.sw_if_index, packet_info
2895 "Interface %s: Packet expected from interface "
2896 "%s didn't arrive" % (dst_if.name, src_if.name),
2899 def verify_policy_match(self, pkt_count, spdEntry):
2900 self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
2901 matched_pkts = spdEntry.get_stats().get("packets")
2902 self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
2903 self.assert_equal(pkt_count, matched_pkts)
2905 # Method verify_l3_l4_capture() will verify network and transport layer
2906 # fields of the packet sa.encrypt() gives interface number garbadge.
2907 # thus interface validation get failed (scapy bug?). However our intent
2908 # is to verify IP layer and above and that is covered.
2910 def verify_l3_l4_capture(
2911 self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
2913 for packet in capture:
2915 self.assert_packet_checksums_valid(packet)
2919 "decrypted packet source address",
2924 "decrypted packet destination address",
2926 if packet.haslayer(TCP):
2928 packet.haslayer(UDP),
2929 "unexpected UDP header in decrypted packet",
2931 elif packet.haslayer(UDP):
2932 if packet[UDP].payload:
2934 packet[UDP][1].haslayer(UDP),
2935 "unexpected UDP header in decrypted packet",
2939 packet.haslayer(UDP),
2940 "unexpected UDP header in decrypted packet",
2943 packet[ICMP].id, self.icmp_id_in, "decrypted packet ICMP ID"
2946 self.logger.error(ppp("Unexpected or invalid plain packet:", packet))
2950 class SpdFlowCacheTemplate(IPSecIPv4Fwd):
2952 def setUpConstants(cls):
2953 super(SpdFlowCacheTemplate, cls).setUpConstants()
2954 # Override this method with required cmdline parameters e.g.
2955 # cls.vpp_cmdline.extend(["ipsec", "{",
2956 # "ipv4-outbound-spd-flow-cache on",
2958 # cls.logger.info("VPP modified cmdline is %s" % " "
2959 # .join(cls.vpp_cmdline))
2962 super(SpdFlowCacheTemplate, self).setUp()
2965 super(SpdFlowCacheTemplate, self).tearDown()
2967 def get_spd_flow_cache_entries(self, outbound):
2968 """'show ipsec spd' output:
2969 ipv4-inbound-spd-flow-cache-entries: 0
2970 ipv4-outbound-spd-flow-cache-entries: 0
2972 show_ipsec_reply = self.vapi.cli("show ipsec spd")
2973 # match the relevant section of 'show ipsec spd' output
2975 regex_match = re.search(
2976 "ipv4-outbound-spd-flow-cache-entries: (.*)",
2981 regex_match = re.search(
2982 "ipv4-inbound-spd-flow-cache-entries: (.*)", show_ipsec_reply, re.DOTALL
2984 if regex_match is None:
2986 "Unable to find spd flow cache entries \
2987 in 'show ipsec spd' CLI output - regex failed to match"
2991 num_entries = int(regex_match.group(1))
2994 "Unable to get spd flow cache entries \
2995 from 'show ipsec spd' string: %s",
2996 regex_match.group(0),
2998 self.logger.info("%s", regex_match.group(0))
3001 def verify_num_outbound_flow_cache_entries(self, expected_elements):
3003 self.get_spd_flow_cache_entries(outbound=True), expected_elements
3006 def verify_num_inbound_flow_cache_entries(self, expected_elements):
3008 self.get_spd_flow_cache_entries(outbound=False), expected_elements
3011 def crc32_supported(self):
3012 # lscpu is part of util-linux package, available on all Linux Distros
3013 stream = os.popen("lscpu")
3014 cpu_info = stream.read()
3015 # feature/flag "crc32" on Aarch64 and "sse4_2" on x86
3016 # see vppinfra/crc32.h
3017 if "crc32" or "sse4_2" in cpu_info:
3018 self.logger.info("\ncrc32 supported:\n" + cpu_info)
3021 self.logger.info("\ncrc32 NOT supported:\n" + cpu_info)
3025 cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
3028 packets = super(SpdFlowCacheTemplate, cls).create_stream(
3029 src_if, dst_if, pkt_count, src_prt, dst_prt, proto
3034 self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
3036 super(SpdFlowCacheTemplate, self).verify_l3_l4_capture(
3037 src_if, dst_if, capture, tcp_port_in, udp_port_in
3041 class SpdFastPathTemplate(IPSecIPv4Fwd):
3043 def setUpConstants(cls):
3044 super(SpdFastPathTemplate, cls).setUpConstants()
3045 # Override this method with required cmdline parameters e.g.
3046 # cls.vpp_cmdline.extend(["ipsec", "{",
3047 # "ipv4-outbound-spd-flow-cache on",
3049 # cls.logger.info("VPP modified cmdline is %s" % " "
3050 # .join(cls.vpp_cmdline))
3053 super(SpdFastPathTemplate, self).setUp()
3056 super(SpdFastPathTemplate, self).tearDown()
3059 cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
3062 packets = super(SpdFastPathTemplate, cls).create_stream(
3063 src_if, dst_if, pkt_count, src_prt, dst_prt, proto
3068 self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
3070 super(SpdFastPathTemplate, self).verify_l3_l4_capture(
3071 src_if, dst_if, capture, tcp_port_in, udp_port_in
3075 class IpsecDefaultTemplate(IPSecIPv4Fwd):
3077 def setUpConstants(cls):
3078 super(IpsecDefaultTemplate, cls).setUpConstants()
3081 super(IpsecDefaultTemplate, self).setUp()
3084 super(IpsecDefaultTemplate, self).tearDown()
3087 cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
3090 packets = super(IpsecDefaultTemplate, cls).create_stream(
3091 src_if, dst_if, pkt_count, src_prt, dst_prt, proto
3096 self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
3098 super(IpsecDefaultTemplate, self).verify_l3_l4_capture(
3099 src_if, dst_if, capture, tcp_port_in, udp_port_in
3103 class IPSecIPv6Fwd(VppTestCase):
3104 """Test IPSec by capturing and verifying IPv6 forwarded pkts"""
3107 def setUpConstants(cls):
3108 super(IPSecIPv6Fwd, cls).setUpConstants()
3111 super(IPSecIPv6Fwd, self).setUp()
3112 # store SPD objects so we can remove configs on tear down
3114 self.spd_policies = []
3117 # remove SPD policies
3118 for obj in self.spd_policies:
3119 obj.remove_vpp_config()
3120 self.spd_policies = []
3121 # remove SPD items (interface bindings first, then SPD)
3122 for obj in reversed(self.spd_objs):
3123 obj.remove_vpp_config()
3125 # close down pg intfs
3126 for pg in self.pg_interfaces:
3129 super(IPSecIPv6Fwd, self).tearDown()
3131 def create_interfaces(self, num_ifs=2):
3132 # create interfaces pg0 ... pg<num_ifs>
3133 self.create_pg_interfaces(range(num_ifs))
3134 for pg in self.pg_interfaces:
3135 # put the interface up
3137 # configure IPv6 address on the interface
3140 self.logger.info(self.vapi.ppcli("show int addr"))
3142 def spd_create_and_intf_add(self, spd_id, pg_list):
3143 spd = VppIpsecSpd(self, spd_id)
3144 spd.add_vpp_config()
3145 self.spd_objs.append(spd)
3147 spdItf = VppIpsecSpdItfBinding(self, spd, pg)
3148 spdItf.add_vpp_config()
3149 self.spd_objs.append(spdItf)
3151 def get_policy(self, policy_type):
3152 e = VppEnum.vl_api_ipsec_spd_action_t
3153 if policy_type == "protect":
3154 return e.IPSEC_API_SPD_ACTION_PROTECT
3155 elif policy_type == "bypass":
3156 return e.IPSEC_API_SPD_ACTION_BYPASS
3157 elif policy_type == "discard":
3158 return e.IPSEC_API_SPD_ACTION_DISCARD
3160 raise Exception("Invalid policy type: %s", policy_type)
3162 def spd_add_rem_policy(
3174 local_ip_start=ip_address("0::0"),
3175 local_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
3176 remote_ip_start=ip_address("0::0"),
3177 remote_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
3178 remote_port_start=0,
3179 remote_port_stop=65535,
3181 local_port_stop=65535,
3183 spd = VppIpsecSpd(self, spd_id)
3186 src_range_low = ip_address("0::0")
3187 src_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
3188 dst_range_low = ip_address("0::0")
3189 dst_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
3192 src_range_low = local_ip_start
3193 src_range_high = local_ip_stop
3194 dst_range_low = remote_ip_start
3195 dst_range_high = remote_ip_stop
3198 src_range_low = src_if.remote_ip6
3199 src_range_high = src_if.remote_ip6
3200 dst_range_low = dst_if.remote_ip6
3201 dst_range_high = dst_if.remote_ip6
3203 spdEntry = VppIpsecSpdEntry(
3213 policy=self.get_policy(policy_type),
3215 remote_port_start=remote_port_start,
3216 remote_port_stop=remote_port_stop,
3217 local_port_start=local_port_start,
3218 local_port_stop=local_port_stop,
3222 spdEntry.add_vpp_config()
3223 self.spd_policies.append(spdEntry)
3225 spdEntry.remove_vpp_config()
3226 self.spd_policies.remove(spdEntry)
3227 self.logger.info(self.vapi.ppcli("show ipsec all"))
3230 def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
3232 for i in range(pkt_count):
3233 # create packet info stored in the test case instance
3234 info = self.create_packet_info(src_if, dst_if)
3235 # convert the info into packet payload
3236 payload = self.info_to_payload(info)
3237 # create the packet itself
3239 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
3240 / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
3241 / UDP(sport=src_prt, dport=dst_prt)
3244 # store a copy of the packet in the packet info
3245 info.data = p.copy()
3246 # append the packet to the list
3248 # return the created packet list
3251 def verify_capture(self, src_if, dst_if, capture):
3253 for packet in capture:
3257 # convert the payload to packet info object
3258 payload_info = self.payload_to_info(packet)
3259 # make sure the indexes match
3261 payload_info.src, src_if.sw_if_index, "source sw_if_index"
3264 payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
3266 packet_info = self.get_next_packet_info_for_interface2(
3267 src_if.sw_if_index, dst_if.sw_if_index, packet_info
3269 # make sure we didn't run out of saved packets
3270 self.assertIsNotNone(packet_info)
3272 payload_info.index, packet_info.index, "packet info index"
3274 saved_packet = packet_info.data # fetch the saved packet
3275 # assert the values match
3276 self.assert_equal(ip.src, saved_packet[IPv6].src, "IP source address")
3277 # ... more assertions here
3278 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
3279 except Exception as e:
3280 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3282 remaining_packet = self.get_next_packet_info_for_interface2(
3283 src_if.sw_if_index, dst_if.sw_if_index, packet_info
3287 "Interface %s: Packet expected from interface "
3288 "%s didn't arrive" % (dst_if.name, src_if.name),
3291 def verify_policy_match(self, pkt_count, spdEntry):
3292 self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
3293 matched_pkts = spdEntry.get_stats().get("packets")
3294 self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
3295 self.assert_equal(pkt_count, matched_pkts)
3298 if __name__ == "__main__":
3299 unittest.main(testRunner=VppTestRunner)