5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import raw, Raw
9 from scapy.layers.inet6 import (
18 from framework import VppTestCase, VppTestRunner
19 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
20 from vpp_papi import VppEnum
22 from vpp_ipsec import VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding
23 from ipaddress import ip_address
28 class IPsecIPv4Params:
30 addr_type = socket.AF_INET
32 addr_bcast = "255.255.255.255"
37 self.remote_tun_if_host = "1.1.1.1"
38 self.remote_tun_if_host6 = "1111::1"
40 self.scapy_tun_sa_id = 100
41 self.scapy_tun_spi = 1000
42 self.vpp_tun_sa_id = 200
43 self.vpp_tun_spi = 2000
45 self.scapy_tra_sa_id = 300
46 self.scapy_tra_spi = 3000
47 self.vpp_tra_sa_id = 400
48 self.vpp_tra_spi = 4000
50 self.outer_hop_limit = 64
51 self.inner_hop_limit = 255
52 self.outer_flow_label = 0
53 self.inner_flow_label = 0x12345
55 self.auth_algo_vpp_id = (
56 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
58 self.auth_algo = "HMAC-SHA1-96" # scapy name
59 self.auth_key = b"C91KUR9GYMm5GfkEvNjX"
61 self.crypt_algo_vpp_id = (
62 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
64 self.crypt_algo = "AES-CBC" # scapy name
65 self.crypt_key = b"JPjyOWBeVEQiMe7h"
68 self.nat_header = None
70 VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
73 self.async_mode = False
76 class IPsecIPv6Params:
78 addr_type = socket.AF_INET6
80 addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
85 self.remote_tun_if_host = "1111:1111:1111:1111:1111:1111:1111:1111"
86 self.remote_tun_if_host4 = "1.1.1.1"
88 self.scapy_tun_sa_id = 500
89 self.scapy_tun_spi = 3001
90 self.vpp_tun_sa_id = 600
91 self.vpp_tun_spi = 3000
93 self.scapy_tra_sa_id = 700
94 self.scapy_tra_spi = 4001
95 self.vpp_tra_sa_id = 800
96 self.vpp_tra_spi = 4000
98 self.outer_hop_limit = 64
99 self.inner_hop_limit = 255
100 self.outer_flow_label = 0
101 self.inner_flow_label = 0x12345
103 self.auth_algo_vpp_id = (
104 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
106 self.auth_algo = "HMAC-SHA1-96" # scapy name
107 self.auth_key = b"C91KUR9GYMm5GfkEvNjX"
109 self.crypt_algo_vpp_id = (
110 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
112 self.crypt_algo = "AES-CBC" # scapy name
113 self.crypt_key = b"JPjyOWBeVEQiMe7h"
116 self.nat_header = None
118 VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
121 self.async_mode = False
124 def mk_scapy_crypt_key(p):
125 if p.crypt_algo in ("AES-GCM", "AES-CTR"):
126 return p.crypt_key + struct.pack("!I", p.salt)
131 def config_tun_params(p, encryption_type, tun_if):
132 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
134 p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
136 p.tun_dst = tun_if.remote_addr[p.addr_type]
137 p.tun_src = tun_if.local_addr[p.addr_type]
138 crypt_key = mk_scapy_crypt_key(p)
139 p.scapy_tun_sa = SecurityAssociation(
142 crypt_algo=p.crypt_algo,
144 auth_algo=p.auth_algo,
146 tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
147 nat_t_header=p.nat_header,
150 p.vpp_tun_sa = SecurityAssociation(
153 crypt_algo=p.crypt_algo,
155 auth_algo=p.auth_algo,
157 tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
158 nat_t_header=p.nat_header,
163 def config_tra_params(p, encryption_type):
165 p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
167 crypt_key = mk_scapy_crypt_key(p)
168 p.scapy_tra_sa = SecurityAssociation(
171 crypt_algo=p.crypt_algo,
173 auth_algo=p.auth_algo,
175 nat_t_header=p.nat_header,
178 p.vpp_tra_sa = SecurityAssociation(
181 crypt_algo=p.crypt_algo,
183 auth_algo=p.auth_algo,
185 nat_t_header=p.nat_header,
190 class TemplateIpsec(VppTestCase):
195 |tra_if| <-------> |VPP|
200 ------ encrypt --- plain ---
201 |tun_if| <------- |VPP| <------ |pg1|
204 ------ decrypt --- plain ---
205 |tun_if| -------> |VPP| ------> |pg1|
212 def ipsec_select_backend(self):
213 """empty method to be overloaded when necessary"""
218 super(TemplateIpsec, cls).setUpClass()
221 def tearDownClass(cls):
222 super(TemplateIpsec, cls).tearDownClass()
224 def setup_params(self):
225 if not hasattr(self, "ipv4_params"):
226 self.ipv4_params = IPsecIPv4Params()
227 if not hasattr(self, "ipv6_params"):
228 self.ipv6_params = IPsecIPv6Params()
230 self.ipv4_params.addr_type: self.ipv4_params,
231 self.ipv6_params.addr_type: self.ipv6_params,
234 def config_interfaces(self):
235 self.create_pg_interfaces(range(3))
236 self.interfaces = list(self.pg_interfaces)
237 for i in self.interfaces:
245 super(TemplateIpsec, self).setUp()
249 self.vpp_esp_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP
250 self.vpp_ah_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_AH
252 self.config_interfaces()
254 self.ipsec_select_backend()
256 def unconfig_interfaces(self):
257 for i in self.interfaces:
263 super(TemplateIpsec, self).tearDown()
265 self.unconfig_interfaces()
267 def show_commands_at_teardown(self):
268 self.logger.info(self.vapi.cli("show hardware"))
270 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
272 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
273 / sa.encrypt(IP(src=src, dst=dst) / ICMP() / Raw(b"X" * payload_size))
274 for i in range(count)
277 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
279 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
281 IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
282 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
284 for i in range(count)
287 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
289 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
290 / IP(src=src, dst=dst)
292 / Raw(b"X" * payload_size)
293 for i in range(count)
296 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
298 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
299 / IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
300 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
301 for i in range(count)
305 class IpsecTcp(object):
306 def verify_tcp_checksum(self):
307 # start http cli server listener on http://0.0.0.0:80
308 self.vapi.cli("http cli server")
309 p = self.params[socket.AF_INET]
311 src=self.tun_if.remote_mac, dst=self.tun_if.local_mac
312 ) / p.scapy_tun_sa.encrypt(
313 IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
314 / TCP(flags="S", dport=80)
316 self.logger.debug(ppp("Sending packet:", send))
317 recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
319 decrypted = p.vpp_tun_sa.decrypt(recv[IP])
320 self.assert_packet_checksums_valid(decrypted)
323 class IpsecTcpTests(IpsecTcp):
324 def test_tcp_checksum(self):
325 """verify checksum correctness for vpp generated packets"""
326 self.verify_tcp_checksum()
329 class IpsecTra4(object):
330 """verify methods for Transport v4"""
332 def get_replay_counts(self, p):
333 replay_node_name = "/err/%s/replay" % self.tra4_decrypt_node_name[0]
334 count = self.statistics.get_err_counter(replay_node_name)
337 replay_post_node_name = (
338 "/err/%s/replay" % self.tra4_decrypt_node_name[p.async_mode]
340 count += self.statistics.get_err_counter(replay_post_node_name)
344 def get_hash_failed_counts(self, p):
345 if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
346 hash_failed_node_name = (
347 "/err/%s/decryption_failed" % self.tra4_decrypt_node_name[p.async_mode]
350 hash_failed_node_name = (
351 "/err/%s/integ_error" % self.tra4_decrypt_node_name[p.async_mode]
353 count = self.statistics.get_err_counter(hash_failed_node_name)
356 count += self.statistics.get_err_counter("/err/crypto-dispatch/bad-hmac")
360 def verify_hi_seq_num(self):
361 p = self.params[socket.AF_INET]
362 saf = VppEnum.vl_api_ipsec_sad_flags_t
363 esn_on = p.vpp_tra_sa.esn_en
364 ar_on = p.flags & saf.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
366 seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
367 replay_count = self.get_replay_counts(p)
368 hash_failed_count = self.get_hash_failed_counts(p)
369 seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
371 # a few packets so we get the rx seq number above the window size and
372 # thus can simulate a wrap with an out of window packet
375 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
376 / p.scapy_tra_sa.encrypt(
377 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
381 for seq in range(63, 80)
383 recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
385 # these 4 packets will all choose seq-num 0 to decrpyt since none
386 # are out of window when first checked. however, once #200 has
387 # decrypted it will move the window to 200 and has #81 is out of
388 # window. this packet should be dropped.
391 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
392 / p.scapy_tra_sa.encrypt(
393 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
398 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
399 / p.scapy_tra_sa.encrypt(
400 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
405 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
406 / p.scapy_tra_sa.encrypt(
407 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
412 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
413 / p.scapy_tra_sa.encrypt(
414 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
420 # if anti-replay is off then we won't drop #81
421 n_rx = 3 if ar_on else 4
422 self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx)
423 # this packet is one before the wrap
426 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
427 / p.scapy_tra_sa.encrypt(
428 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
433 recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
435 # move the window over half way to a wrap
438 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
439 / p.scapy_tra_sa.encrypt(
440 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
445 recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
447 # anti-replay will drop old packets, no anti-replay will not
450 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
451 / p.scapy_tra_sa.encrypt(
452 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
459 self.send_and_assert_no_replies(self.tra_if, pkts)
461 recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
465 # validate wrapping the ESN
468 # wrap scapy's TX SA SN
469 p.scapy_tra_sa.seq_num = 0x100000005
471 # send a packet that wraps the window for both AR and no AR
474 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
475 / p.scapy_tra_sa.encrypt(
476 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
483 rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
485 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
487 # move the window forward to half way to the next wrap
490 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
491 / p.scapy_tra_sa.encrypt(
492 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
499 rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
501 # a packet less than 2^30 from the current position is:
502 # - AR: out of window and dropped
506 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
507 / p.scapy_tra_sa.encrypt(
508 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
516 self.send_and_assert_no_replies(self.tra_if, pkts)
518 self.send_and_expect(self.tra_if, pkts, self.tra_if)
520 # a packet more than 2^30 from the current position is:
521 # - AR: out of window and dropped
522 # - non-AR: considered a wrap, but since it's not a wrap
523 # it won't decrpyt and so will be dropped
526 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
527 / p.scapy_tra_sa.encrypt(
528 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
535 self.send_and_assert_no_replies(self.tra_if, pkts)
537 # a packet less than 2^30 from the current position and is a
538 # wrap; (the seq is currently at 0x180000005).
539 # - AR: out of window so considered a wrap, so accepted
540 # - non-AR: not considered a wrap, so won't decrypt
541 p.scapy_tra_sa.seq_num = 0x260000005
544 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
545 / p.scapy_tra_sa.encrypt(
546 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
553 self.send_and_expect(self.tra_if, pkts, self.tra_if)
555 self.send_and_assert_no_replies(self.tra_if, pkts)
558 # window positions are different now for AR/non-AR
559 # move non-AR forward
562 # a packet more than 2^30 from the current position and is a
563 # wrap; (the seq is currently at 0x180000005).
565 # - non-AR: not considered a wrap, so won't decrypt
569 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
570 / p.scapy_tra_sa.encrypt(
571 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
577 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
578 / p.scapy_tra_sa.encrypt(
579 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
585 self.send_and_expect(self.tra_if, pkts, self.tra_if)
589 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
590 / p.scapy_tra_sa.encrypt(
591 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
597 self.send_and_expect(self.tra_if, pkts, self.tra_if)
599 def verify_tra_anti_replay(self):
600 p = self.params[socket.AF_INET]
601 esn_en = p.vpp_tra_sa.esn_en
603 seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
604 replay_count = self.get_replay_counts(p)
605 hash_failed_count = self.get_hash_failed_counts(p)
606 seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
608 if ESP == self.encryption_type:
609 undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
610 undersize_count = self.statistics.get_err_counter(undersize_node_name)
613 # send packets with seq numbers 1->34
614 # this means the window size is still in Case B (see RFC4303
617 # for reasons i haven't investigated Scapy won't create a packet with
622 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
623 / p.scapy_tra_sa.encrypt(
624 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
628 for seq in range(1, 34)
630 recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
632 # replayed packets are dropped
633 self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
634 replay_count += len(pkts)
635 self.assertEqual(self.get_replay_counts(p), replay_count)
638 # now send a batch of packets all with the same sequence number
639 # the first packet in the batch is legitimate, the rest bogus
641 self.vapi.cli("clear error")
642 self.vapi.cli("clear node counters")
644 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
645 ) / p.scapy_tra_sa.encrypt(
646 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
649 recv_pkts = self.send_and_expect(self.tra_if, pkts * 8, self.tra_if, n_rx=1)
651 self.assertEqual(self.get_replay_counts(p), replay_count)
654 # now move the window over to 257 (more than one byte) and into Case A
656 self.vapi.cli("clear error")
658 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
659 ) / p.scapy_tra_sa.encrypt(
660 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
663 recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
665 # replayed packets are dropped
666 self.send_and_assert_no_replies(self.tra_if, pkt * 3, timeout=0.2)
668 self.assertEqual(self.get_replay_counts(p), replay_count)
670 # the window size is 64 packets
671 # in window are still accepted
673 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
674 ) / p.scapy_tra_sa.encrypt(
675 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
678 recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
680 # a packet that does not decrypt does not move the window forward
681 bogus_sa = SecurityAssociation(
682 self.encryption_type,
684 crypt_algo=p.crypt_algo,
685 crypt_key=mk_scapy_crypt_key(p)[::-1],
686 auth_algo=p.auth_algo,
687 auth_key=p.auth_key[::-1],
690 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
691 ) / bogus_sa.encrypt(
692 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
695 self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
697 hash_failed_count += 17
698 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
700 # a malformed 'runt' packet
701 # created by a mis-constructed SA
702 if ESP == self.encryption_type and p.crypt_algo != "NULL":
703 bogus_sa = SecurityAssociation(self.encryption_type, p.vpp_tra_spi)
705 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
706 ) / bogus_sa.encrypt(
707 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
710 self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
712 undersize_count += 17
713 self.assert_error_counter_equal(undersize_node_name, undersize_count)
715 # which we can determine since this packet is still in the window
717 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
718 ) / p.scapy_tra_sa.encrypt(
719 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
722 self.send_and_expect(self.tra_if, [pkt], self.tra_if)
725 # out of window are dropped
726 # this is Case B. So VPP will consider this to be a high seq num wrap
727 # and so the decrypt attempt will fail
730 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
731 ) / p.scapy_tra_sa.encrypt(
732 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
735 self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
738 # an out of window error with ESN looks like a high sequence
739 # wrap. but since it isn't then the verify will fail.
740 hash_failed_count += 17
741 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
745 self.assertEqual(self.get_replay_counts(p), replay_count)
747 # valid packet moves the window over to 258
749 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
750 ) / p.scapy_tra_sa.encrypt(
751 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
754 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
755 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
758 # move VPP's SA TX seq-num to just before the seq-number wrap.
759 # then fire in a packet that VPP should drop on TX because it
760 # causes the TX seq number to wrap; unless we're using extened sequence
763 self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
764 self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
765 self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
769 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
770 / p.scapy_tra_sa.encrypt(
771 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
775 for seq in range(259, 280)
779 rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
782 # in order for scapy to decrypt its SA's high order number needs
785 p.vpp_tra_sa.seq_num = 0x100000000
787 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
790 # wrap scapy's TX high sequence number. VPP is in case B, so it
791 # will consider this a high seq wrap also.
792 # The low seq num we set it to will place VPP's RX window in Case A
794 p.scapy_tra_sa.seq_num = 0x100000005
796 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
797 ) / p.scapy_tra_sa.encrypt(
798 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
801 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
803 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
806 # A packet that has seq num between (2^32-64) and 5 is within
809 p.scapy_tra_sa.seq_num = 0xFFFFFFFD
811 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
812 ) / p.scapy_tra_sa.encrypt(
813 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
816 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
817 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
820 # While in case A we cannot wrap the high sequence number again
821 # because VPP will consider this packet to be one that moves the
825 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
826 ) / p.scapy_tra_sa.encrypt(
827 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
830 self.send_and_assert_no_replies(
831 self.tra_if, [pkt], self.tra_if, timeout=0.2
834 hash_failed_count += 1
835 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
838 # but if we move the window forward to case B, then we can wrap
841 p.scapy_tra_sa.seq_num = 0x100000555
843 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
844 ) / p.scapy_tra_sa.encrypt(
845 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
848 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
849 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
851 p.scapy_tra_sa.seq_num = 0x200000444
853 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
854 ) / p.scapy_tra_sa.encrypt(
855 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
858 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
859 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
863 # without ESN TX sequence numbers can't wrap and packets are
864 # dropped from here on out.
866 self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
867 seq_cycle_count += len(pkts)
868 self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count)
870 # move the security-associations seq number on to the last we used
871 self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
872 p.scapy_tra_sa.seq_num = 351
873 p.vpp_tra_sa.seq_num = 351
875 def verify_tra_lost(self):
876 p = self.params[socket.AF_INET]
877 esn_en = p.vpp_tra_sa.esn_en
880 # send packets with seq numbers 1->34
881 # this means the window size is still in Case B (see RFC4303
884 # for reasons i haven't investigated Scapy won't create a packet with
889 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
890 / p.scapy_tra_sa.encrypt(
891 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
895 for seq in range(1, 3)
897 self.send_and_expect(self.tra_if, pkts, self.tra_if)
899 self.assertEqual(p.tra_sa_out.get_lost(), 0)
901 # skip a sequence number
904 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
905 / p.scapy_tra_sa.encrypt(
906 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
910 for seq in range(4, 6)
912 self.send_and_expect(self.tra_if, pkts, self.tra_if)
914 self.assertEqual(p.tra_sa_out.get_lost(), 0)
916 # the lost packet are counted untill we get up past the first
917 # sizeof(replay_window) packets
920 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
921 / p.scapy_tra_sa.encrypt(
922 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
926 for seq in range(6, 100)
928 self.send_and_expect(self.tra_if, pkts, self.tra_if)
930 self.assertEqual(p.tra_sa_out.get_lost(), 1)
932 # lost of holes in the sequence
935 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
936 / p.scapy_tra_sa.encrypt(
937 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
941 for seq in range(100, 200, 2)
943 self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=50)
947 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
948 / p.scapy_tra_sa.encrypt(
949 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
953 for seq in range(200, 300)
955 self.send_and_expect(self.tra_if, pkts, self.tra_if)
957 self.assertEqual(p.tra_sa_out.get_lost(), 51)
959 # a big hole in the seq number space
962 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
963 / p.scapy_tra_sa.encrypt(
964 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
968 for seq in range(400, 500)
970 self.send_and_expect(self.tra_if, pkts, self.tra_if)
972 self.assertEqual(p.tra_sa_out.get_lost(), 151)
974 def verify_tra_basic4(self, count=1, payload_size=54):
975 """ipsec v4 transport basic test"""
976 self.vapi.cli("clear errors")
977 self.vapi.cli("clear ipsec sa")
979 p = self.params[socket.AF_INET]
980 send_pkts = self.gen_encrypt_pkts(
984 src=self.tra_if.remote_ip4,
985 dst=self.tra_if.local_ip4,
987 payload_size=payload_size,
989 recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
991 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
992 self.assert_packet_checksums_valid(rx)
994 decrypted = p.vpp_tra_sa.decrypt(rx[IP])
995 self.assert_packet_checksums_valid(decrypted)
997 self.logger.debug(ppp("Unexpected packet:", rx))
1000 self.logger.info(self.vapi.ppcli("show error"))
1001 self.logger.info(self.vapi.ppcli("show ipsec all"))
1003 pkts = p.tra_sa_in.get_stats()["packets"]
1005 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1007 pkts = p.tra_sa_out.get_stats()["packets"]
1009 pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
1011 self.assertEqual(p.tra_sa_out.get_lost(), 0)
1012 self.assertEqual(p.tra_sa_in.get_lost(), 0)
1014 self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
1015 self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count)
1018 class IpsecTra4Tests(IpsecTra4):
1019 """UT test methods for Transport v4"""
1021 def test_tra_anti_replay(self):
1022 """ipsec v4 transport anti-replay test"""
1023 self.verify_tra_anti_replay()
1025 def test_tra_lost(self):
1026 """ipsec v4 transport lost packet test"""
1027 self.verify_tra_lost()
1029 def test_tra_basic(self, count=1):
1030 """ipsec v4 transport basic test"""
1031 self.verify_tra_basic4(count=1)
1033 def test_tra_burst(self):
1034 """ipsec v4 transport burst test"""
1035 self.verify_tra_basic4(count=257)
1038 class IpsecTra6(object):
1039 """verify methods for Transport v6"""
1041 def verify_tra_basic6(self, count=1, payload_size=54):
1042 self.vapi.cli("clear errors")
1043 self.vapi.cli("clear ipsec sa")
1045 p = self.params[socket.AF_INET6]
1046 send_pkts = self.gen_encrypt_pkts6(
1050 src=self.tra_if.remote_ip6,
1051 dst=self.tra_if.local_ip6,
1053 payload_size=payload_size,
1055 recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
1056 for rx in recv_pkts:
1057 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen)
1059 decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
1060 self.assert_packet_checksums_valid(decrypted)
1062 self.logger.debug(ppp("Unexpected packet:", rx))
1065 self.logger.info(self.vapi.ppcli("show error"))
1066 self.logger.info(self.vapi.ppcli("show ipsec all"))
1068 pkts = p.tra_sa_in.get_stats()["packets"]
1070 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1072 pkts = p.tra_sa_out.get_stats()["packets"]
1074 pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
1076 self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
1077 self.assert_packet_counter_equal(self.tra6_decrypt_node_name[0], count)
1079 def gen_encrypt_pkts_ext_hdrs6(
1080 self, sa, sw_intf, src, dst, count=1, payload_size=54
1083 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1085 IPv6(src=src, dst=dst)
1086 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
1088 for i in range(count)
1091 def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
1093 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1094 / IPv6(src=src, dst=dst)
1095 / IPv6ExtHdrHopByHop()
1096 / IPv6ExtHdrFragment(id=2, offset=200)
1097 / Raw(b"\xff" * 200)
1098 for i in range(count)
1101 def verify_tra_encrypted6(self, p, sa, rxs):
1104 self.assert_packet_checksums_valid(rx)
1106 decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
1107 decrypted.append(decrypt_pkt)
1108 self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
1109 self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
1111 self.logger.debug(ppp("Unexpected packet:", rx))
1113 self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1119 def verify_tra_66_ext_hdrs(self, p):
1123 # check we can decrypt with options
1125 tx = self.gen_encrypt_pkts_ext_hdrs6(
1128 src=self.tra_if.remote_ip6,
1129 dst=self.tra_if.local_ip6,
1132 self.send_and_expect(self.tra_if, tx, self.tra_if)
1135 # injecting a packet from ourselves to be routed of box is a hack
1136 # but it matches an outbout policy, alors je ne regrette rien
1139 # one extension before ESP
1141 Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1142 / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1143 / IPv6ExtHdrFragment(id=2, offset=200)
1144 / Raw(b"\xff" * 200)
1147 rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
1148 dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
1151 # for reasons i'm not going to investigate scapy does not
1152 # created the correct headers after decrypt. but reparsing
1153 # the ipv6 packet fixes it
1154 dc = IPv6(raw(dc[IPv6]))
1155 self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
1157 # two extensions before ESP
1159 Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1160 / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1161 / IPv6ExtHdrHopByHop()
1162 / IPv6ExtHdrFragment(id=2, offset=200)
1163 / Raw(b"\xff" * 200)
1166 rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
1167 dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
1170 dc = IPv6(raw(dc[IPv6]))
1171 self.assertTrue(dc[IPv6ExtHdrHopByHop])
1172 self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
1174 # two extensions before ESP, one after
1176 Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1177 / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1178 / IPv6ExtHdrHopByHop()
1179 / IPv6ExtHdrFragment(id=2, offset=200)
1180 / IPv6ExtHdrDestOpt()
1181 / Raw(b"\xff" * 200)
1184 rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
1185 dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
1188 dc = IPv6(raw(dc[IPv6]))
1189 self.assertTrue(dc[IPv6ExtHdrDestOpt])
1190 self.assertTrue(dc[IPv6ExtHdrHopByHop])
1191 self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
1194 class IpsecTra6Tests(IpsecTra6):
1195 """UT test methods for Transport v6"""
1197 def test_tra_basic6(self):
1198 """ipsec v6 transport basic test"""
1199 self.verify_tra_basic6(count=1)
1201 def test_tra_burst6(self):
1202 """ipsec v6 transport burst test"""
1203 self.verify_tra_basic6(count=257)
1206 class IpsecTra6ExtTests(IpsecTra6):
1207 def test_tra_ext_hdrs_66(self):
1208 """ipsec 6o6 tra extension headers test"""
1209 self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
1212 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
1213 """UT test methods for Transport v6 and v4"""
1218 class IpsecTun4(object):
1219 """verify methods for Tunnel v4"""
1221 def verify_counters4(self, p, count, n_frags=None, worker=None):
1224 if hasattr(p, "spd_policy_in_any"):
1225 pkts = p.spd_policy_in_any.get_stats(worker)["packets"]
1229 "incorrect SPD any policy: expected %d != %d" % (count, pkts),
1232 if hasattr(p, "tun_sa_in"):
1233 pkts = p.tun_sa_in.get_stats(worker)["packets"]
1235 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1237 pkts = p.tun_sa_out.get_stats(worker)["packets"]
1241 "incorrect SA out counts: expected %d != %d" % (count, pkts),
1244 self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
1245 self.assert_packet_counter_equal(self.tun4_decrypt_node_name[0], count)
1247 def verify_decrypted(self, p, rxs):
1249 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1250 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1251 self.assert_packet_checksums_valid(rx)
1253 def verify_esp_padding(self, sa, esp_payload, decrypt_pkt):
1254 align = sa.crypt_algo.block_size
1257 exp_len = (len(decrypt_pkt) + 2 + (align - 1)) & ~(align - 1)
1258 exp_len += sa.crypt_algo.iv_size
1259 exp_len += sa.crypt_algo.icv_size or sa.auth_algo.icv_size
1260 self.assertEqual(exp_len, len(esp_payload))
1262 def verify_encrypted(self, p, sa, rxs):
1266 self.assertEqual(rx[UDP].dport, 4500)
1267 self.assert_packet_checksums_valid(rx)
1268 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
1271 decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
1272 if not decrypt_pkt.haslayer(IP):
1273 decrypt_pkt = IP(decrypt_pkt[Raw].load)
1274 if rx_ip.proto == socket.IPPROTO_ESP:
1275 self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
1276 decrypt_pkts.append(decrypt_pkt)
1277 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1278 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1280 self.logger.debug(ppp("Unexpected packet:", rx))
1282 self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1286 pkts = reassemble4(decrypt_pkts)
1288 self.assert_packet_checksums_valid(pkt)
1290 def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
1291 self.vapi.cli("clear errors")
1292 self.vapi.cli("clear ipsec counters")
1293 self.vapi.cli("clear ipsec sa")
1297 send_pkts = self.gen_encrypt_pkts(
1301 src=p.remote_tun_if_host,
1302 dst=self.pg1.remote_ip4,
1304 payload_size=payload_size,
1306 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1307 self.verify_decrypted(p, recv_pkts)
1309 send_pkts = self.gen_pkts(
1311 src=self.pg1.remote_ip4,
1312 dst=p.remote_tun_if_host,
1314 payload_size=payload_size,
1316 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if, n_rx)
1317 self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1319 for rx in recv_pkts:
1320 self.assertEqual(rx[IP].src, p.tun_src)
1321 self.assertEqual(rx[IP].dst, p.tun_dst)
1324 self.logger.info(self.vapi.ppcli("show error"))
1325 self.logger.info(self.vapi.ppcli("show ipsec all"))
1327 self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
1328 self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
1329 self.verify_counters4(p, count, n_rx)
1331 def verify_tun_dropped_44(self, p, count=1, payload_size=64, n_rx=None):
1332 self.vapi.cli("clear errors")
1336 send_pkts = self.gen_encrypt_pkts(
1340 src=p.remote_tun_if_host,
1341 dst=self.pg1.remote_ip4,
1344 self.send_and_assert_no_replies(self.tun_if, send_pkts)
1346 send_pkts = self.gen_pkts(
1348 src=self.pg1.remote_ip4,
1349 dst=p.remote_tun_if_host,
1351 payload_size=payload_size,
1353 self.send_and_assert_no_replies(self.pg1, send_pkts)
1356 self.logger.info(self.vapi.ppcli("show error"))
1357 self.logger.info(self.vapi.ppcli("show ipsec all"))
1359 def verify_tun_reass_44(self, p):
1360 self.vapi.cli("clear errors")
1361 self.vapi.ip_reassembly_enable_disable(
1362 sw_if_index=self.tun_if.sw_if_index, enable_ip4=True
1366 send_pkts = self.gen_encrypt_pkts(
1370 src=p.remote_tun_if_host,
1371 dst=self.pg1.remote_ip4,
1375 send_pkts = fragment_rfc791(send_pkts[0], 1400)
1376 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1)
1377 self.verify_decrypted(p, recv_pkts)
1379 send_pkts = self.gen_pkts(
1380 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host, count=1
1382 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1383 self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1386 self.logger.info(self.vapi.ppcli("show error"))
1387 self.logger.info(self.vapi.ppcli("show ipsec all"))
1389 self.verify_counters4(p, 1, 1)
1390 self.vapi.ip_reassembly_enable_disable(
1391 sw_if_index=self.tun_if.sw_if_index, enable_ip4=False
1394 def verify_tun_64(self, p, count=1):
1395 self.vapi.cli("clear errors")
1396 self.vapi.cli("clear ipsec sa")
1398 send_pkts = self.gen_encrypt_pkts6(
1402 src=p.remote_tun_if_host6,
1403 dst=self.pg1.remote_ip6,
1406 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1407 for recv_pkt in recv_pkts:
1408 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
1409 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
1410 self.assert_packet_checksums_valid(recv_pkt)
1411 send_pkts = self.gen_pkts6(
1414 src=self.pg1.remote_ip6,
1415 dst=p.remote_tun_if_host6,
1418 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1419 for recv_pkt in recv_pkts:
1421 decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
1422 if not decrypt_pkt.haslayer(IPv6):
1423 decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1424 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1425 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
1426 self.assert_packet_checksums_valid(decrypt_pkt)
1428 self.logger.error(ppp("Unexpected packet:", recv_pkt))
1430 self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1435 self.logger.info(self.vapi.ppcli("show error"))
1436 self.logger.info(self.vapi.ppcli("show ipsec all"))
1438 self.verify_counters4(p, count)
1440 def verify_keepalive(self, p):
1441 # the sizeof Raw is calculated to pad to the minimum ehternet
1442 # frame size of 64 btyes
1444 Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1445 / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
1446 / UDP(sport=333, dport=4500)
1450 self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1451 self.assert_error_counter_equal(
1452 "/err/%s/nat_keepalive" % self.tun4_input_node, 31
1456 Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1457 / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
1458 / UDP(sport=333, dport=4500)
1461 self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1462 self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 31)
1465 Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1466 / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
1467 / UDP(sport=333, dport=4500)
1471 self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1472 self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 62)
1475 class IpsecTun4Tests(IpsecTun4):
1476 """UT test methods for Tunnel v4"""
1478 def test_tun_basic44(self):
1479 """ipsec 4o4 tunnel basic test"""
1480 self.verify_tun_44(self.params[socket.AF_INET], count=1)
1481 self.tun_if.admin_down()
1482 self.tun_if.resolve_arp()
1483 self.tun_if.admin_up()
1484 self.verify_tun_44(self.params[socket.AF_INET], count=1)
1486 def test_tun_reass_basic44(self):
1487 """ipsec 4o4 tunnel basic reassembly test"""
1488 self.verify_tun_reass_44(self.params[socket.AF_INET])
1490 def test_tun_burst44(self):
1491 """ipsec 4o4 tunnel burst test"""
1492 self.verify_tun_44(self.params[socket.AF_INET], count=127)
1495 class IpsecTun6(object):
1496 """verify methods for Tunnel v6"""
1498 def verify_counters6(self, p_in, p_out, count, worker=None):
1499 if hasattr(p_in, "tun_sa_in"):
1500 pkts = p_in.tun_sa_in.get_stats(worker)["packets"]
1502 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1504 if hasattr(p_out, "tun_sa_out"):
1505 pkts = p_out.tun_sa_out.get_stats(worker)["packets"]
1509 "incorrect SA out counts: expected %d != %d" % (count, pkts),
1511 self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
1512 self.assert_packet_counter_equal(self.tun6_decrypt_node_name[0], count)
1514 def verify_decrypted6(self, p, rxs):
1516 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1517 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1518 self.assert_packet_checksums_valid(rx)
1520 def verify_encrypted6(self, p, sa, rxs):
1522 self.assert_packet_checksums_valid(rx)
1523 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen)
1524 self.assert_equal(rx[IPv6].hlim, p.outer_hop_limit)
1525 if p.outer_flow_label:
1526 self.assert_equal(rx[IPv6].fl, p.outer_flow_label)
1528 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
1529 if not decrypt_pkt.haslayer(IPv6):
1530 decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1531 self.assert_packet_checksums_valid(decrypt_pkt)
1532 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1533 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1534 self.assert_equal(decrypt_pkt.hlim, p.inner_hop_limit - 1)
1535 self.assert_equal(decrypt_pkt.fl, p.inner_flow_label)
1537 self.logger.debug(ppp("Unexpected packet:", rx))
1539 self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1544 def verify_drop_tun_tx_66(self, p_in, count=1, payload_size=64):
1545 self.vapi.cli("clear errors")
1546 self.vapi.cli("clear ipsec sa")
1548 send_pkts = self.gen_pkts6(
1551 src=self.pg1.remote_ip6,
1552 dst=p_in.remote_tun_if_host,
1554 payload_size=payload_size,
1556 self.send_and_assert_no_replies(self.tun_if, send_pkts)
1557 self.logger.info(self.vapi.cli("sh punt stats"))
1559 def verify_drop_tun_rx_66(self, p_in, count=1, payload_size=64):
1560 self.vapi.cli("clear errors")
1561 self.vapi.cli("clear ipsec sa")
1563 send_pkts = self.gen_encrypt_pkts6(
1567 src=p_in.remote_tun_if_host,
1568 dst=self.pg1.remote_ip6,
1571 self.send_and_assert_no_replies(self.tun_if, send_pkts)
1573 def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
1574 self.verify_drop_tun_tx_66(p_in, count=count, payload_size=payload_size)
1575 self.verify_drop_tun_rx_66(p_in, count=count, payload_size=payload_size)
1577 def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
1578 self.vapi.cli("clear errors")
1579 self.vapi.cli("clear ipsec sa")
1583 send_pkts = self.gen_encrypt_pkts6(
1587 src=p_in.remote_tun_if_host,
1588 dst=self.pg1.remote_ip6,
1590 payload_size=payload_size,
1592 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1593 self.verify_decrypted6(p_in, recv_pkts)
1595 send_pkts = self.gen_pkts6(
1598 src=self.pg1.remote_ip6,
1599 dst=p_out.remote_tun_if_host,
1601 payload_size=payload_size,
1603 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1604 self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
1606 for rx in recv_pkts:
1607 self.assertEqual(rx[IPv6].src, p_out.tun_src)
1608 self.assertEqual(rx[IPv6].dst, p_out.tun_dst)
1611 self.logger.info(self.vapi.ppcli("show error"))
1612 self.logger.info(self.vapi.ppcli("show ipsec all"))
1613 self.verify_counters6(p_in, p_out, count)
1615 def verify_tun_reass_66(self, p):
1616 self.vapi.cli("clear errors")
1617 self.vapi.ip_reassembly_enable_disable(
1618 sw_if_index=self.tun_if.sw_if_index, enable_ip6=True
1622 send_pkts = self.gen_encrypt_pkts6(
1626 src=p.remote_tun_if_host,
1627 dst=self.pg1.remote_ip6,
1631 send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
1632 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1)
1633 self.verify_decrypted6(p, recv_pkts)
1635 send_pkts = self.gen_pkts6(
1638 src=self.pg1.remote_ip6,
1639 dst=p.remote_tun_if_host,
1643 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1644 self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1646 self.logger.info(self.vapi.ppcli("show error"))
1647 self.logger.info(self.vapi.ppcli("show ipsec all"))
1648 self.verify_counters6(p, p, 1)
1649 self.vapi.ip_reassembly_enable_disable(
1650 sw_if_index=self.tun_if.sw_if_index, enable_ip6=False
1653 def verify_tun_46(self, p, count=1):
1654 """ipsec 4o6 tunnel basic test"""
1655 self.vapi.cli("clear errors")
1656 self.vapi.cli("clear ipsec sa")
1658 send_pkts = self.gen_encrypt_pkts(
1662 src=p.remote_tun_if_host4,
1663 dst=self.pg1.remote_ip4,
1666 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1667 for recv_pkt in recv_pkts:
1668 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
1669 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
1670 self.assert_packet_checksums_valid(recv_pkt)
1671 send_pkts = self.gen_pkts(
1673 src=self.pg1.remote_ip4,
1674 dst=p.remote_tun_if_host4,
1677 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1678 for recv_pkt in recv_pkts:
1680 decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
1681 if not decrypt_pkt.haslayer(IP):
1682 decrypt_pkt = IP(decrypt_pkt[Raw].load)
1683 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1684 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
1685 self.assert_packet_checksums_valid(decrypt_pkt)
1687 self.logger.debug(ppp("Unexpected packet:", recv_pkt))
1689 self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1694 self.logger.info(self.vapi.ppcli("show error"))
1695 self.logger.info(self.vapi.ppcli("show ipsec all"))
1696 self.verify_counters6(p, p, count)
1698 def verify_keepalive(self, p):
1699 # the sizeof Raw is calculated to pad to the minimum ehternet
1700 # frame size of 64 btyes
1702 Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1703 / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
1704 / UDP(sport=333, dport=4500)
1708 self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1709 self.assert_error_counter_equal(
1710 "/err/%s/nat_keepalive" % self.tun6_input_node, 31
1714 Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1715 / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
1716 / UDP(sport=333, dport=4500)
1719 self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1720 self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 31)
1723 Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1724 / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
1725 / UDP(sport=333, dport=4500)
1729 self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1730 self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 62)
1733 class IpsecTun6Tests(IpsecTun6):
1734 """UT test methods for Tunnel v6"""
1736 def test_tun_basic66(self):
1737 """ipsec 6o6 tunnel basic test"""
1738 self.verify_tun_66(self.params[socket.AF_INET6], count=1)
1740 def test_tun_reass_basic66(self):
1741 """ipsec 6o6 tunnel basic reassembly test"""
1742 self.verify_tun_reass_66(self.params[socket.AF_INET6])
1744 def test_tun_burst66(self):
1745 """ipsec 6o6 tunnel burst test"""
1746 self.verify_tun_66(self.params[socket.AF_INET6], count=257)
1749 class IpsecTun6HandoffTests(IpsecTun6):
1750 """UT test methods for Tunnel v6 with multiple workers"""
1752 vpp_worker_count = 2
1754 def test_tun_handoff_66(self):
1755 """ipsec 6o6 tunnel worker hand-off test"""
1756 self.vapi.cli("clear errors")
1757 self.vapi.cli("clear ipsec sa")
1760 p = self.params[socket.AF_INET6]
1762 # inject alternately on worker 0 and 1. all counts on the SA
1763 # should be against worker 0
1764 for worker in [0, 1, 0, 1]:
1765 send_pkts = self.gen_encrypt_pkts6(
1769 src=p.remote_tun_if_host,
1770 dst=self.pg1.remote_ip6,
1773 recv_pkts = self.send_and_expect(
1774 self.tun_if, send_pkts, self.pg1, worker=worker
1776 self.verify_decrypted6(p, recv_pkts)
1778 send_pkts = self.gen_pkts6(
1781 src=self.pg1.remote_ip6,
1782 dst=p.remote_tun_if_host,
1785 recv_pkts = self.send_and_expect(
1786 self.pg1, send_pkts, self.tun_if, worker=worker
1788 self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1790 # all counts against the first worker that was used
1791 self.verify_counters6(p, p, 4 * N_PKTS, worker=0)
1794 class IpsecTun4HandoffTests(IpsecTun4):
1795 """UT test methods for Tunnel v4 with multiple workers"""
1797 vpp_worker_count = 2
1799 def test_tun_handooff_44(self):
1800 """ipsec 4o4 tunnel worker hand-off test"""
1801 self.vapi.cli("clear errors")
1802 self.vapi.cli("clear ipsec sa")
1805 p = self.params[socket.AF_INET]
1807 # inject alternately on worker 0 and 1. all counts on the SA
1808 # should be against worker 0
1809 for worker in [0, 1, 0, 1]:
1810 send_pkts = self.gen_encrypt_pkts(
1814 src=p.remote_tun_if_host,
1815 dst=self.pg1.remote_ip4,
1818 recv_pkts = self.send_and_expect(
1819 self.tun_if, send_pkts, self.pg1, worker=worker
1821 self.verify_decrypted(p, recv_pkts)
1823 send_pkts = self.gen_pkts(
1825 src=self.pg1.remote_ip4,
1826 dst=p.remote_tun_if_host,
1829 recv_pkts = self.send_and_expect(
1830 self.pg1, send_pkts, self.tun_if, worker=worker
1832 self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1834 # all counts against the first worker that was used
1835 self.verify_counters4(p, 4 * N_PKTS, worker=0)
1838 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
1839 """UT test methods for Tunnel v6 & v4"""
1844 class IPSecIPv4Fwd(VppTestCase):
1845 """Test IPSec by capturing and verifying IPv4 forwarded pkts"""
1848 def setUpConstants(cls):
1849 super(IPSecIPv4Fwd, cls).setUpConstants()
1852 super(IPSecIPv4Fwd, self).setUp()
1853 # store SPD objects so we can remove configs on tear down
1855 self.spd_policies = []
1858 # remove SPD policies
1859 for obj in self.spd_policies:
1860 obj.remove_vpp_config()
1861 self.spd_policies = []
1862 # remove SPD items (interface bindings first, then SPD)
1863 for obj in reversed(self.spd_objs):
1864 obj.remove_vpp_config()
1866 # close down pg intfs
1867 for pg in self.pg_interfaces:
1870 super(IPSecIPv4Fwd, self).tearDown()
1872 def create_interfaces(self, num_ifs=2):
1873 # create interfaces pg0 ... pg<num_ifs>
1874 self.create_pg_interfaces(range(num_ifs))
1875 for pg in self.pg_interfaces:
1876 # put the interface up
1878 # configure IPv4 address on the interface
1880 # resolve ARP, so that we know VPP MAC
1882 self.logger.info(self.vapi.ppcli("show int addr"))
1884 def spd_create_and_intf_add(self, spd_id, pg_list):
1885 spd = VppIpsecSpd(self, spd_id)
1886 spd.add_vpp_config()
1887 self.spd_objs.append(spd)
1889 spdItf = VppIpsecSpdItfBinding(self, spd, pg)
1890 spdItf.add_vpp_config()
1891 self.spd_objs.append(spdItf)
1893 def get_policy(self, policy_type):
1894 e = VppEnum.vl_api_ipsec_spd_action_t
1895 if policy_type == "protect":
1896 return e.IPSEC_API_SPD_ACTION_PROTECT
1897 elif policy_type == "bypass":
1898 return e.IPSEC_API_SPD_ACTION_BYPASS
1899 elif policy_type == "discard":
1900 return e.IPSEC_API_SPD_ACTION_DISCARD
1902 raise Exception("Invalid policy type: %s", policy_type)
1904 def spd_add_rem_policy(
1916 local_ip_start=ip_address("0.0.0.0"),
1917 local_ip_stop=ip_address("255.255.255.255"),
1918 remote_ip_start=ip_address("0.0.0.0"),
1919 remote_ip_stop=ip_address("255.255.255.255"),
1920 remote_port_start=0,
1921 remote_port_stop=65535,
1923 local_port_stop=65535,
1925 spd = VppIpsecSpd(self, spd_id)
1928 src_range_low = ip_address("0.0.0.0")
1929 src_range_high = ip_address("255.255.255.255")
1930 dst_range_low = ip_address("0.0.0.0")
1931 dst_range_high = ip_address("255.255.255.255")
1934 src_range_low = local_ip_start
1935 src_range_high = local_ip_stop
1936 dst_range_low = remote_ip_start
1937 dst_range_high = remote_ip_stop
1940 src_range_low = src_if.remote_ip4
1941 src_range_high = src_if.remote_ip4
1942 dst_range_low = dst_if.remote_ip4
1943 dst_range_high = dst_if.remote_ip4
1945 spdEntry = VppIpsecSpdEntry(
1955 policy=self.get_policy(policy_type),
1957 remote_port_start=remote_port_start,
1958 remote_port_stop=remote_port_stop,
1959 local_port_start=local_port_start,
1960 local_port_stop=local_port_stop,
1964 spdEntry.add_vpp_config()
1965 self.spd_policies.append(spdEntry)
1967 spdEntry.remove_vpp_config()
1968 self.spd_policies.remove(spdEntry)
1969 self.logger.info(self.vapi.ppcli("show ipsec all"))
1972 def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
1974 for i in range(pkt_count):
1975 # create packet info stored in the test case instance
1976 info = self.create_packet_info(src_if, dst_if)
1977 # convert the info into packet payload
1978 payload = self.info_to_payload(info)
1979 # create the packet itself
1981 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
1982 / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
1983 / UDP(sport=src_prt, dport=dst_prt)
1986 # store a copy of the packet in the packet info
1987 info.data = p.copy()
1988 # append the packet to the list
1990 # return the created packet list
1993 def verify_capture(self, src_if, dst_if, capture):
1995 for packet in capture:
1999 # convert the payload to packet info object
2000 payload_info = self.payload_to_info(packet)
2001 # make sure the indexes match
2003 payload_info.src, src_if.sw_if_index, "source sw_if_index"
2006 payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
2008 packet_info = self.get_next_packet_info_for_interface2(
2009 src_if.sw_if_index, dst_if.sw_if_index, packet_info
2011 # make sure we didn't run out of saved packets
2012 self.assertIsNotNone(packet_info)
2014 payload_info.index, packet_info.index, "packet info index"
2016 saved_packet = packet_info.data # fetch the saved packet
2017 # assert the values match
2018 self.assert_equal(ip.src, saved_packet[IP].src, "IP source address")
2019 # ... more assertions here
2020 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
2021 except Exception as e:
2022 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2024 remaining_packet = self.get_next_packet_info_for_interface2(
2025 src_if.sw_if_index, dst_if.sw_if_index, packet_info
2029 "Interface %s: Packet expected from interface "
2030 "%s didn't arrive" % (dst_if.name, src_if.name),
2033 def verify_policy_match(self, pkt_count, spdEntry):
2034 self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
2035 matched_pkts = spdEntry.get_stats().get("packets")
2036 self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
2037 self.assert_equal(pkt_count, matched_pkts)
2040 class SpdFlowCacheTemplate(IPSecIPv4Fwd):
2042 def setUpConstants(cls):
2043 super(SpdFlowCacheTemplate, cls).setUpConstants()
2044 # Override this method with required cmdline parameters e.g.
2045 # cls.vpp_cmdline.extend(["ipsec", "{",
2046 # "ipv4-outbound-spd-flow-cache on",
2048 # cls.logger.info("VPP modified cmdline is %s" % " "
2049 # .join(cls.vpp_cmdline))
2052 super(SpdFlowCacheTemplate, self).setUp()
2055 super(SpdFlowCacheTemplate, self).tearDown()
2057 def get_spd_flow_cache_entries(self, outbound):
2058 """'show ipsec spd' output:
2059 ipv4-inbound-spd-flow-cache-entries: 0
2060 ipv4-outbound-spd-flow-cache-entries: 0
2062 show_ipsec_reply = self.vapi.cli("show ipsec spd")
2063 # match the relevant section of 'show ipsec spd' output
2065 regex_match = re.search(
2066 "ipv4-outbound-spd-flow-cache-entries: (.*)",
2071 regex_match = re.search(
2072 "ipv4-inbound-spd-flow-cache-entries: (.*)", show_ipsec_reply, re.DOTALL
2074 if regex_match is None:
2076 "Unable to find spd flow cache entries \
2077 in 'show ipsec spd' CLI output - regex failed to match"
2081 num_entries = int(regex_match.group(1))
2084 "Unable to get spd flow cache entries \
2085 from 'show ipsec spd' string: %s",
2086 regex_match.group(0),
2088 self.logger.info("%s", regex_match.group(0))
2091 def verify_num_outbound_flow_cache_entries(self, expected_elements):
2093 self.get_spd_flow_cache_entries(outbound=True), expected_elements
2096 def verify_num_inbound_flow_cache_entries(self, expected_elements):
2098 self.get_spd_flow_cache_entries(outbound=False), expected_elements
2101 def crc32_supported(self):
2102 # lscpu is part of util-linux package, available on all Linux Distros
2103 stream = os.popen("lscpu")
2104 cpu_info = stream.read()
2105 # feature/flag "crc32" on Aarch64 and "sse4_2" on x86
2106 # see vppinfra/crc32.h
2107 if "crc32" or "sse4_2" in cpu_info:
2108 self.logger.info("\ncrc32 supported:\n" + cpu_info)
2111 self.logger.info("\ncrc32 NOT supported:\n" + cpu_info)
2115 class IPSecIPv6Fwd(VppTestCase):
2116 """Test IPSec by capturing and verifying IPv6 forwarded pkts"""
2119 def setUpConstants(cls):
2120 super(IPSecIPv6Fwd, cls).setUpConstants()
2123 super(IPSecIPv6Fwd, self).setUp()
2124 # store SPD objects so we can remove configs on tear down
2126 self.spd_policies = []
2129 # remove SPD policies
2130 for obj in self.spd_policies:
2131 obj.remove_vpp_config()
2132 self.spd_policies = []
2133 # remove SPD items (interface bindings first, then SPD)
2134 for obj in reversed(self.spd_objs):
2135 obj.remove_vpp_config()
2137 # close down pg intfs
2138 for pg in self.pg_interfaces:
2141 super(IPSecIPv6Fwd, self).tearDown()
2143 def create_interfaces(self, num_ifs=2):
2144 # create interfaces pg0 ... pg<num_ifs>
2145 self.create_pg_interfaces(range(num_ifs))
2146 for pg in self.pg_interfaces:
2147 # put the interface up
2149 # configure IPv6 address on the interface
2152 self.logger.info(self.vapi.ppcli("show int addr"))
2154 def spd_create_and_intf_add(self, spd_id, pg_list):
2155 spd = VppIpsecSpd(self, spd_id)
2156 spd.add_vpp_config()
2157 self.spd_objs.append(spd)
2159 spdItf = VppIpsecSpdItfBinding(self, spd, pg)
2160 spdItf.add_vpp_config()
2161 self.spd_objs.append(spdItf)
2163 def get_policy(self, policy_type):
2164 e = VppEnum.vl_api_ipsec_spd_action_t
2165 if policy_type == "protect":
2166 return e.IPSEC_API_SPD_ACTION_PROTECT
2167 elif policy_type == "bypass":
2168 return e.IPSEC_API_SPD_ACTION_BYPASS
2169 elif policy_type == "discard":
2170 return e.IPSEC_API_SPD_ACTION_DISCARD
2172 raise Exception("Invalid policy type: %s", policy_type)
2174 def spd_add_rem_policy(
2186 local_ip_start=ip_address("0::0"),
2187 local_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
2188 remote_ip_start=ip_address("0::0"),
2189 remote_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
2190 remote_port_start=0,
2191 remote_port_stop=65535,
2193 local_port_stop=65535,
2195 spd = VppIpsecSpd(self, spd_id)
2198 src_range_low = ip_address("0::0")
2199 src_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
2200 dst_range_low = ip_address("0::0")
2201 dst_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
2204 src_range_low = local_ip_start
2205 src_range_high = local_ip_stop
2206 dst_range_low = remote_ip_start
2207 dst_range_high = remote_ip_stop
2210 src_range_low = src_if.remote_ip6
2211 src_range_high = src_if.remote_ip6
2212 dst_range_low = dst_if.remote_ip6
2213 dst_range_high = dst_if.remote_ip6
2215 spdEntry = VppIpsecSpdEntry(
2225 policy=self.get_policy(policy_type),
2227 remote_port_start=remote_port_start,
2228 remote_port_stop=remote_port_stop,
2229 local_port_start=local_port_start,
2230 local_port_stop=local_port_stop,
2234 spdEntry.add_vpp_config()
2235 self.spd_policies.append(spdEntry)
2237 spdEntry.remove_vpp_config()
2238 self.spd_policies.remove(spdEntry)
2239 self.logger.info(self.vapi.ppcli("show ipsec all"))
2242 def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
2244 for i in range(pkt_count):
2245 # create packet info stored in the test case instance
2246 info = self.create_packet_info(src_if, dst_if)
2247 # convert the info into packet payload
2248 payload = self.info_to_payload(info)
2249 # create the packet itself
2251 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
2252 / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
2253 / UDP(sport=src_prt, dport=dst_prt)
2256 # store a copy of the packet in the packet info
2257 info.data = p.copy()
2258 # append the packet to the list
2260 # return the created packet list
2263 def verify_capture(self, src_if, dst_if, capture):
2265 for packet in capture:
2269 # convert the payload to packet info object
2270 payload_info = self.payload_to_info(packet)
2271 # make sure the indexes match
2273 payload_info.src, src_if.sw_if_index, "source sw_if_index"
2276 payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
2278 packet_info = self.get_next_packet_info_for_interface2(
2279 src_if.sw_if_index, dst_if.sw_if_index, packet_info
2281 # make sure we didn't run out of saved packets
2282 self.assertIsNotNone(packet_info)
2284 payload_info.index, packet_info.index, "packet info index"
2286 saved_packet = packet_info.data # fetch the saved packet
2287 # assert the values match
2288 self.assert_equal(ip.src, saved_packet[IPv6].src, "IP source address")
2289 # ... more assertions here
2290 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
2291 except Exception as e:
2292 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2294 remaining_packet = self.get_next_packet_info_for_interface2(
2295 src_if.sw_if_index, dst_if.sw_if_index, packet_info
2299 "Interface %s: Packet expected from interface "
2300 "%s didn't arrive" % (dst_if.name, src_if.name),
2303 def verify_policy_match(self, pkt_count, spdEntry):
2304 self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
2305 matched_pkts = spdEntry.get_stats().get("packets")
2306 self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
2307 self.assert_equal(pkt_count, matched_pkts)
2310 if __name__ == "__main__":
2311 unittest.main(testRunner=VppTestRunner)