5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw, bind_layers
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204
12 from framework import VppTestRunner
13 from template_ipsec import (
21 IpsecTun6HandoffTests,
22 IpsecTun4HandoffTests,
25 from vpp_gre_interface import VppGreInterface
26 from vpp_ipip_tun_interface import VppIpIpTunInterface
27 from vpp_ip_route import (
36 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
37 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
38 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
39 from vpp_teib import VppTeib
41 from vpp_papi import VppEnum
42 from vpp_papi_provider import CliFailedCommandError
43 from vpp_acl import AclRule, VppAcl, VppAclInterface
44 from vpp_policer import PolicerAction, VppPolicer, Dir
47 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
48 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
50 p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
52 crypt_key = mk_scapy_crypt_key(p)
54 p.tun_dst = tun_if.remote_ip
55 p.tun_src = tun_if.local_ip
61 is_default_port = p.nat_header.dport == 4500
63 is_default_port = True
66 outbound_nat_header = p.nat_header
68 outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
69 bind_layers(UDP, ESP, dport=p.nat_header.dport)
71 p.scapy_tun_sa = SecurityAssociation(
74 crypt_algo=p.crypt_algo,
76 auth_algo=p.auth_algo,
78 tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
79 nat_t_header=outbound_nat_header,
82 p.vpp_tun_sa = SecurityAssociation(
85 crypt_algo=p.crypt_algo,
87 auth_algo=p.auth_algo,
89 tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
90 nat_t_header=p.nat_header,
95 def config_tra_params(p, encryption_type, tun_if):
96 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
98 p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
100 crypt_key = mk_scapy_crypt_key(p)
101 p.tun_dst = tun_if.remote_ip
102 p.tun_src = tun_if.local_ip
105 is_default_port = p.nat_header.dport == 4500
107 is_default_port = True
110 outbound_nat_header = p.nat_header
112 outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
113 bind_layers(UDP, ESP, dport=p.nat_header.dport)
115 p.scapy_tun_sa = SecurityAssociation(
118 crypt_algo=p.crypt_algo,
120 auth_algo=p.auth_algo,
123 nat_t_header=outbound_nat_header,
125 p.vpp_tun_sa = SecurityAssociation(
128 crypt_algo=p.crypt_algo,
130 auth_algo=p.auth_algo,
133 nat_t_header=p.nat_header,
137 class TemplateIpsec4TunProtect(object):
138 """IPsec IPv4 Tunnel protect"""
140 encryption_type = ESP
141 tun4_encrypt_node_name = "esp4-encrypt-tun"
142 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
143 tun4_input_node = "ipsec4-tun-input"
145 def config_sa_tra(self, p):
146 config_tun_params(p, self.encryption_type, p.tun_if)
148 p.tun_sa_out = VppIpsecSA(
156 self.vpp_esp_protocol,
159 p.tun_sa_out.add_vpp_config()
161 p.tun_sa_in = VppIpsecSA(
169 self.vpp_esp_protocol,
172 p.tun_sa_in.add_vpp_config()
174 def config_sa_tun(self, p):
175 config_tun_params(p, self.encryption_type, p.tun_if)
177 p.tun_sa_out = VppIpsecSA(
185 self.vpp_esp_protocol,
186 self.tun_if.local_addr[p.addr_type],
187 self.tun_if.remote_addr[p.addr_type],
190 p.tun_sa_out.add_vpp_config()
192 p.tun_sa_in = VppIpsecSA(
200 self.vpp_esp_protocol,
201 self.tun_if.remote_addr[p.addr_type],
202 self.tun_if.local_addr[p.addr_type],
205 p.tun_sa_in.add_vpp_config()
207 def config_protect(self, p):
208 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
209 p.tun_protect.add_vpp_config()
211 def config_network(self, p):
212 if hasattr(p, "tun_dst"):
215 tun_dst = self.pg0.remote_ip4
216 p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip4, tun_dst)
217 p.tun_if.add_vpp_config()
219 p.tun_if.config_ip4()
220 p.tun_if.config_ip6()
222 p.route = VppIpRoute(
224 p.remote_tun_if_host,
226 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
228 p.route.add_vpp_config()
231 p.remote_tun_if_host6,
235 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
241 def unconfig_network(self, p):
242 p.route.remove_vpp_config()
243 p.tun_if.remove_vpp_config()
245 def unconfig_protect(self, p):
246 p.tun_protect.remove_vpp_config()
248 def unconfig_sa(self, p):
249 p.tun_sa_out.remove_vpp_config()
250 p.tun_sa_in.remove_vpp_config()
253 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec):
254 """IPsec tunnel interface tests"""
256 encryption_type = ESP
260 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
263 def tearDownClass(cls):
264 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
267 super(TemplateIpsec4TunIfEsp, self).setUp()
269 self.tun_if = self.pg0
273 self.config_network(p)
274 self.config_sa_tra(p)
275 self.config_protect(p)
278 super(TemplateIpsec4TunIfEsp, self).tearDown()
281 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect, TemplateIpsec):
282 """IPsec UDP tunnel interface tests"""
284 tun4_encrypt_node_name = "esp4-encrypt-tun"
285 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
286 encryption_type = ESP
290 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
293 def tearDownClass(cls):
294 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
296 def verify_encrypted(self, p, sa, rxs):
299 # ensure the UDP ports are correct before we decrypt
301 self.assertTrue(rx.haslayer(UDP))
302 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
303 self.assert_equal(rx[UDP].dport, 4500)
305 pkt = sa.decrypt(rx[IP])
306 if not pkt.haslayer(IP):
307 pkt = IP(pkt[Raw].load)
309 self.assert_packet_checksums_valid(pkt)
310 self.assert_equal(pkt[IP].dst, "1.1.1.1")
311 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
312 except (IndexError, AssertionError):
313 self.logger.debug(ppp("Unexpected packet:", rx))
315 self.logger.debug(ppp("Decrypted packet:", pkt))
320 def config_sa_tra(self, p):
321 config_tun_params(p, self.encryption_type, p.tun_if)
323 p.tun_sa_out = VppIpsecSA(
331 self.vpp_esp_protocol,
333 udp_src=p.nat_header.sport,
334 udp_dst=p.nat_header.dport,
336 p.tun_sa_out.add_vpp_config()
338 p.tun_sa_in = VppIpsecSA(
346 self.vpp_esp_protocol,
348 udp_src=p.nat_header.sport,
349 udp_dst=p.nat_header.dport,
351 p.tun_sa_in.add_vpp_config()
354 super(TemplateIpsec4TunIfEspUdp, self).setUp()
357 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
358 p.nat_header = UDP(sport=5454, dport=4500)
360 self.tun_if = self.pg0
362 self.config_network(p)
363 self.config_sa_tra(p)
364 self.config_protect(p)
367 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
370 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
371 """Ipsec ESP - TUN tests"""
373 tun4_encrypt_node_name = "esp4-encrypt-tun"
374 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
376 def test_tun_basic64(self):
377 """ipsec 6o4 tunnel basic test"""
378 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
380 self.verify_tun_64(self.params[socket.AF_INET], count=1)
382 def test_tun_burst64(self):
383 """ipsec 6o4 tunnel basic test"""
384 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
386 self.verify_tun_64(self.params[socket.AF_INET], count=257)
388 def test_tun_basic_frag44(self):
389 """ipsec 4o4 tunnel frag basic test"""
390 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
394 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [1500, 0, 0, 0])
396 self.params[socket.AF_INET], count=1, payload_size=1800, n_rx=2
398 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [9000, 0, 0, 0])
401 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
402 """Ipsec ESP UDP tests"""
404 tun4_input_node = "ipsec4-tun-input"
407 super(TestIpsec4TunIfEspUdp, self).setUp()
409 def test_keepalive(self):
410 """IPSEC NAT Keepalive"""
411 self.verify_keepalive(self.ipv4_params)
414 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
415 """Ipsec ESP UDP GCM tests"""
417 tun4_input_node = "ipsec4-tun-input"
420 super(TestIpsec4TunIfEspUdpGCM, self).setUp()
422 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
423 p.crypt_algo_vpp_id = (
424 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
426 p.crypt_algo = "AES-GCM"
428 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
432 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
433 """Ipsec ESP - TCP tests"""
438 class TemplateIpsec6TunProtect(object):
439 """IPsec IPv6 Tunnel protect"""
441 def config_sa_tra(self, p):
442 config_tun_params(p, self.encryption_type, p.tun_if)
444 p.tun_sa_out = VppIpsecSA(
452 self.vpp_esp_protocol,
454 p.tun_sa_out.add_vpp_config()
456 p.tun_sa_in = VppIpsecSA(
464 self.vpp_esp_protocol,
466 p.tun_sa_in.add_vpp_config()
468 def config_sa_tun(self, p):
469 config_tun_params(p, self.encryption_type, p.tun_if)
471 p.tun_sa_out = VppIpsecSA(
479 self.vpp_esp_protocol,
480 self.tun_if.local_addr[p.addr_type],
481 self.tun_if.remote_addr[p.addr_type],
483 p.tun_sa_out.add_vpp_config()
485 p.tun_sa_in = VppIpsecSA(
493 self.vpp_esp_protocol,
494 self.tun_if.remote_addr[p.addr_type],
495 self.tun_if.local_addr[p.addr_type],
497 p.tun_sa_in.add_vpp_config()
499 def config_protect(self, p):
500 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
501 p.tun_protect.add_vpp_config()
503 def config_network(self, p):
504 if hasattr(p, "tun_dst"):
507 tun_dst = self.pg0.remote_ip6
508 p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip6, tun_dst)
509 p.tun_if.add_vpp_config()
511 p.tun_if.config_ip6()
512 p.tun_if.config_ip4()
514 p.route = VppIpRoute(
516 p.remote_tun_if_host,
520 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
524 p.route.add_vpp_config()
527 p.remote_tun_if_host4,
529 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
533 def unconfig_network(self, p):
534 p.route.remove_vpp_config()
535 p.tun_if.remove_vpp_config()
537 def unconfig_protect(self, p):
538 p.tun_protect.remove_vpp_config()
540 def unconfig_sa(self, p):
541 p.tun_sa_out.remove_vpp_config()
542 p.tun_sa_in.remove_vpp_config()
545 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec):
546 """IPsec tunnel interface tests"""
548 encryption_type = ESP
551 super(TemplateIpsec6TunIfEsp, self).setUp()
553 self.tun_if = self.pg0
556 self.config_network(p)
557 self.config_sa_tra(p)
558 self.config_protect(p)
561 super(TemplateIpsec6TunIfEsp, self).tearDown()
564 class TemplateIpsec6TunIfEspUdp(TemplateIpsec6TunProtect, TemplateIpsec):
565 """IPsec6 UDP tunnel interface tests"""
567 tun4_encrypt_node_name = "esp6-encrypt-tun"
568 tun4_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
569 encryption_type = ESP
573 super(TemplateIpsec6TunIfEspUdp, cls).setUpClass()
576 def tearDownClass(cls):
577 super(TemplateIpsec6TunIfEspUdp, cls).tearDownClass()
579 def verify_encrypted(self, p, sa, rxs):
582 # ensure the UDP ports are correct before we decrypt
584 self.assertTrue(rx.haslayer(UDP))
585 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
586 self.assert_equal(rx[UDP].dport, 4500)
588 pkt = sa.decrypt(rx[IP])
589 if not pkt.haslayer(IP):
590 pkt = IP(pkt[Raw].load)
592 self.assert_packet_checksums_valid(pkt)
594 pkt[IP].dst, "1111:1111:1111:1111:1111:1111:1111:1111"
596 self.assert_equal(pkt[IP].src, self.pg1.remote_ip6)
597 except (IndexError, AssertionError):
598 self.logger.debug(ppp("Unexpected packet:", rx))
600 self.logger.debug(ppp("Decrypted packet:", pkt))
605 def config_sa_tra(self, p):
606 config_tun_params(p, self.encryption_type, p.tun_if)
608 p.tun_sa_out = VppIpsecSA(
616 self.vpp_esp_protocol,
618 udp_src=p.nat_header.sport,
619 udp_dst=p.nat_header.dport,
621 p.tun_sa_out.add_vpp_config()
623 p.tun_sa_in = VppIpsecSA(
631 self.vpp_esp_protocol,
633 udp_src=p.nat_header.sport,
634 udp_dst=p.nat_header.dport,
636 p.tun_sa_in.add_vpp_config()
639 super(TemplateIpsec6TunIfEspUdp, self).setUp()
642 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
643 p.nat_header = UDP(sport=5454, dport=4500)
645 self.tun_if = self.pg0
647 self.config_network(p)
648 self.config_sa_tra(p)
649 self.config_protect(p)
652 super(TemplateIpsec6TunIfEspUdp, self).tearDown()
655 class TestIpsec6TunIfEspUdp(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
656 """Ipsec ESP 6 UDP tests"""
658 tun6_input_node = "ipsec6-tun-input"
659 tun6_encrypt_node_name = "esp6-encrypt-tun"
660 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
663 super(TestIpsec6TunIfEspUdp, self).setUp()
665 def test_keepalive(self):
666 """IPSEC6 NAT Keepalive"""
667 self.verify_keepalive(self.ipv6_params)
670 class TestIpsec6TunIfEspUdpGCM(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
671 """Ipsec ESP 6 UDP GCM tests"""
673 tun6_input_node = "ipsec6-tun-input"
674 tun6_encrypt_node_name = "esp6-encrypt-tun"
675 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
678 super(TestIpsec6TunIfEspUdpGCM, self).setUp()
680 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
681 p.crypt_algo_vpp_id = (
682 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
684 p.crypt_algo = "AES-GCM"
686 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
690 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
691 """Ipsec ESP - TUN tests"""
693 tun6_encrypt_node_name = "esp6-encrypt-tun"
694 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
696 def test_tun_basic46(self):
697 """ipsec 4o6 tunnel basic test"""
698 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
699 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
701 def test_tun_burst46(self):
702 """ipsec 4o6 tunnel burst test"""
703 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
704 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
707 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp, IpsecTun6HandoffTests):
708 """Ipsec ESP 6 Handoff tests"""
710 tun6_encrypt_node_name = "esp6-encrypt-tun"
711 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
713 def test_tun_handoff_66_police(self):
714 """ESP 6o6 tunnel with policer worker hand-off test"""
715 self.vapi.cli("clear errors")
716 self.vapi.cli("clear ipsec sa")
719 p = self.params[socket.AF_INET6]
721 action_tx = PolicerAction(
722 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
724 policer = VppPolicer(
731 conform_action=action_tx,
732 exceed_action=action_tx,
733 violate_action=action_tx,
735 policer.add_vpp_config()
737 # Start policing on tun
738 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
740 for pol_bind in [1, 0]:
741 policer.bind_vpp_config(pol_bind, True)
743 # inject alternately on worker 0 and 1.
744 for worker in [0, 1, 0, 1]:
745 send_pkts = self.gen_encrypt_pkts6(
749 src=p.remote_tun_if_host,
750 dst=self.pg1.remote_ip6,
753 recv_pkts = self.send_and_expect(
754 self.tun_if, send_pkts, self.pg1, worker=worker
756 self.verify_decrypted6(p, recv_pkts)
757 self.logger.debug(self.vapi.cli("show trace max 100"))
759 stats = policer.get_stats()
760 stats0 = policer.get_stats(worker=0)
761 stats1 = policer.get_stats(worker=1)
764 # First pass: Worker 1, should have done all the policing
765 self.assertEqual(stats, stats1)
767 # Worker 0, should have handed everything off
768 self.assertEqual(stats0["conform_packets"], 0)
769 self.assertEqual(stats0["exceed_packets"], 0)
770 self.assertEqual(stats0["violate_packets"], 0)
772 # Second pass: both workers should have policed equal amounts
773 self.assertGreater(stats1["conform_packets"], 0)
774 self.assertEqual(stats1["exceed_packets"], 0)
775 self.assertGreater(stats1["violate_packets"], 0)
777 self.assertGreater(stats0["conform_packets"], 0)
778 self.assertEqual(stats0["exceed_packets"], 0)
779 self.assertGreater(stats0["violate_packets"], 0)
782 stats0["conform_packets"] + stats0["violate_packets"],
783 stats1["conform_packets"] + stats1["violate_packets"],
786 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
787 policer.remove_vpp_config()
790 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp, IpsecTun4HandoffTests):
791 """Ipsec ESP 4 Handoff tests"""
793 tun4_encrypt_node_name = "esp4-encrypt-tun"
794 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
796 def test_tun_handoff_44_police(self):
797 """ESP 4o4 tunnel with policer worker hand-off test"""
798 self.vapi.cli("clear errors")
799 self.vapi.cli("clear ipsec sa")
802 p = self.params[socket.AF_INET]
804 action_tx = PolicerAction(
805 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
807 policer = VppPolicer(
814 conform_action=action_tx,
815 exceed_action=action_tx,
816 violate_action=action_tx,
818 policer.add_vpp_config()
820 # Start policing on tun
821 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
823 for pol_bind in [1, 0]:
824 policer.bind_vpp_config(pol_bind, True)
826 # inject alternately on worker 0 and 1.
827 for worker in [0, 1, 0, 1]:
828 send_pkts = self.gen_encrypt_pkts(
832 src=p.remote_tun_if_host,
833 dst=self.pg1.remote_ip4,
836 recv_pkts = self.send_and_expect(
837 self.tun_if, send_pkts, self.pg1, worker=worker
839 self.verify_decrypted(p, recv_pkts)
840 self.logger.debug(self.vapi.cli("show trace max 100"))
842 stats = policer.get_stats()
843 stats0 = policer.get_stats(worker=0)
844 stats1 = policer.get_stats(worker=1)
847 # First pass: Worker 1, should have done all the policing
848 self.assertEqual(stats, stats1)
850 # Worker 0, should have handed everything off
851 self.assertEqual(stats0["conform_packets"], 0)
852 self.assertEqual(stats0["exceed_packets"], 0)
853 self.assertEqual(stats0["violate_packets"], 0)
855 # Second pass: both workers should have policed equal amounts
856 self.assertGreater(stats1["conform_packets"], 0)
857 self.assertEqual(stats1["exceed_packets"], 0)
858 self.assertGreater(stats1["violate_packets"], 0)
860 self.assertGreater(stats0["conform_packets"], 0)
861 self.assertEqual(stats0["exceed_packets"], 0)
862 self.assertGreater(stats0["violate_packets"], 0)
865 stats0["conform_packets"] + stats0["violate_packets"],
866 stats1["conform_packets"] + stats1["violate_packets"],
869 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
870 policer.remove_vpp_config()
873 @tag_fixme_vpp_workers
874 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
875 """IPsec IPv4 Multi Tunnel interface"""
877 encryption_type = ESP
878 tun4_encrypt_node_name = "esp4-encrypt-tun"
879 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
882 super(TestIpsec4MultiTunIfEsp, self).setUp()
884 self.tun_if = self.pg0
886 self.multi_params = []
887 self.pg0.generate_remote_hosts(10)
888 self.pg0.configure_ipv4_neighbors()
891 p = copy.copy(self.ipv4_params)
893 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
894 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
895 p.scapy_tun_spi = p.scapy_tun_spi + ii
896 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
897 p.vpp_tun_spi = p.vpp_tun_spi + ii
899 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
900 p.scapy_tra_spi = p.scapy_tra_spi + ii
901 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
902 p.vpp_tra_spi = p.vpp_tra_spi + ii
903 p.tun_dst = self.pg0.remote_hosts[ii].ip4
905 self.multi_params.append(p)
906 self.config_network(p)
907 self.config_sa_tra(p)
908 self.config_protect(p)
911 super(TestIpsec4MultiTunIfEsp, self).tearDown()
913 def test_tun_44(self):
914 """Multiple IPSEC tunnel interfaces"""
915 for p in self.multi_params:
916 self.verify_tun_44(p, count=127)
917 self.assertEqual(p.tun_if.get_rx_stats(), 127)
918 self.assertEqual(p.tun_if.get_tx_stats(), 127)
920 def test_tun_rr_44(self):
921 """Round-robin packets acrros multiple interface"""
923 for p in self.multi_params:
924 tx = tx + self.gen_encrypt_pkts(
928 src=p.remote_tun_if_host,
929 dst=self.pg1.remote_ip4,
931 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
933 for rx, p in zip(rxs, self.multi_params):
934 self.verify_decrypted(p, [rx])
937 for p in self.multi_params:
938 tx = tx + self.gen_pkts(
939 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host
941 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
943 for rx, p in zip(rxs, self.multi_params):
944 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
947 @tag_fixme_ubuntu2204
948 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
949 """IPsec IPv4 Tunnel interface all Algos"""
951 encryption_type = ESP
952 tun4_encrypt_node_name = "esp4-encrypt-tun"
953 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
956 super(TestIpsec4TunIfEspAll, self).setUp()
958 self.tun_if = self.pg0
961 self.config_network(p)
962 self.config_sa_tra(p)
963 self.config_protect(p)
967 self.unconfig_protect(p)
968 self.unconfig_network(p)
971 super(TestIpsec4TunIfEspAll, self).tearDown()
975 # change the key and the SPI
978 p.crypt_key = b"X" + p.crypt_key[1:]
980 p.scapy_tun_sa_id += 1
983 p.tun_if.local_spi = p.vpp_tun_spi
984 p.tun_if.remote_spi = p.scapy_tun_spi
986 config_tun_params(p, self.encryption_type, p.tun_if)
988 p.tun_sa_out = VppIpsecSA(
996 self.vpp_esp_protocol,
1000 p.tun_sa_in = VppIpsecSA(
1006 p.crypt_algo_vpp_id,
1008 self.vpp_esp_protocol,
1012 p.tun_sa_in.add_vpp_config()
1013 p.tun_sa_out.add_vpp_config()
1015 self.config_protect(p)
1016 np.tun_sa_out.remove_vpp_config()
1017 np.tun_sa_in.remove_vpp_config()
1018 self.logger.info(self.vapi.cli("sh ipsec sa"))
1020 def test_tun_44(self):
1021 """IPSEC tunnel all algos"""
1023 # foreach VPP crypto engine
1024 engines = ["ia32", "ipsecmb", "openssl"]
1026 # foreach crypto algorithm
1030 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_128
1033 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1035 "scapy-crypto": "AES-GCM",
1036 "scapy-integ": "NULL",
1037 "key": b"JPjyOWBeVEQiMe7h",
1042 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_192
1045 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1047 "scapy-crypto": "AES-GCM",
1048 "scapy-integ": "NULL",
1049 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1054 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
1057 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1059 "scapy-crypto": "AES-GCM",
1060 "scapy-integ": "NULL",
1061 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1066 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
1069 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1071 "scapy-crypto": "AES-CBC",
1072 "scapy-integ": "HMAC-SHA1-96",
1074 "key": b"JPjyOWBeVEQiMe7h",
1078 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_192
1081 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256
1083 "scapy-crypto": "AES-CBC",
1084 "scapy-integ": "SHA2-512-256",
1086 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1090 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_256
1093 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128
1095 "scapy-crypto": "AES-CBC",
1096 "scapy-integ": "SHA2-256-128",
1098 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1102 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1105 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1107 "scapy-crypto": "NULL",
1108 "scapy-integ": "HMAC-SHA1-96",
1110 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1114 for engine in engines:
1115 self.vapi.cli("set crypto handler all %s" % engine)
1118 # loop through each of the algorithms
1121 # with self.subTest(algo=algo['scapy']):
1123 p = self.ipv4_params
1124 p.auth_algo_vpp_id = algo["vpp-integ"]
1125 p.crypt_algo_vpp_id = algo["vpp-crypto"]
1126 p.crypt_algo = algo["scapy-crypto"]
1127 p.auth_algo = algo["scapy-integ"]
1128 p.crypt_key = algo["key"]
1129 p.salt = algo["salt"]
1135 self.verify_tun_44(p, count=127)
1138 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
1139 """IPsec IPv4 Tunnel interface no Algos"""
1141 encryption_type = ESP
1142 tun4_encrypt_node_name = "esp4-encrypt-tun"
1143 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1146 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
1148 self.tun_if = self.pg0
1149 p = self.ipv4_params
1150 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1151 p.auth_algo = "NULL"
1154 p.crypt_algo_vpp_id = (
1155 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1157 p.crypt_algo = "NULL"
1161 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
1163 def test_tun_44(self):
1164 """IPSec SA with NULL algos"""
1165 p = self.ipv4_params
1167 self.config_network(p)
1168 self.config_sa_tra(p)
1169 self.config_protect(p)
1171 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host)
1172 self.send_and_assert_no_replies(self.pg1, tx)
1174 self.unconfig_protect(p)
1176 self.unconfig_network(p)
1179 @tag_fixme_vpp_workers
1180 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec, IpsecTun6):
1181 """IPsec IPv6 Multi Tunnel interface"""
1183 encryption_type = ESP
1184 tun6_encrypt_node_name = "esp6-encrypt-tun"
1185 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1188 super(TestIpsec6MultiTunIfEsp, self).setUp()
1190 self.tun_if = self.pg0
1192 self.multi_params = []
1193 self.pg0.generate_remote_hosts(10)
1194 self.pg0.configure_ipv6_neighbors()
1196 for ii in range(10):
1197 p = copy.copy(self.ipv6_params)
1199 p.remote_tun_if_host = "1111::%d" % (ii + 1)
1200 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1201 p.scapy_tun_spi = p.scapy_tun_spi + ii
1202 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1203 p.vpp_tun_spi = p.vpp_tun_spi + ii
1205 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1206 p.scapy_tra_spi = p.scapy_tra_spi + ii
1207 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1208 p.vpp_tra_spi = p.vpp_tra_spi + ii
1209 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1211 self.multi_params.append(p)
1212 self.config_network(p)
1213 self.config_sa_tra(p)
1214 self.config_protect(p)
1217 super(TestIpsec6MultiTunIfEsp, self).tearDown()
1219 def test_tun_66(self):
1220 """Multiple IPSEC tunnel interfaces"""
1221 for p in self.multi_params:
1222 self.verify_tun_66(p, count=127)
1223 self.assertEqual(p.tun_if.get_rx_stats(), 127)
1224 self.assertEqual(p.tun_if.get_tx_stats(), 127)
1227 class TestIpsecGreTebIfEsp(TemplateIpsec, IpsecTun4Tests):
1228 """Ipsec GRE TEB ESP - TUN tests"""
1230 tun4_encrypt_node_name = "esp4-encrypt-tun"
1231 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1232 encryption_type = ESP
1233 omac = "00:11:22:33:44:55"
1235 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1237 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1239 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1241 / Ether(dst=self.omac)
1242 / IP(src="1.1.1.1", dst="1.1.1.2")
1243 / UDP(sport=1144, dport=2233)
1244 / Raw(b"X" * payload_size)
1246 for i in range(count)
1249 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1251 Ether(dst=self.omac)
1252 / IP(src="1.1.1.1", dst="1.1.1.2")
1253 / UDP(sport=1144, dport=2233)
1254 / Raw(b"X" * payload_size)
1255 for i in range(count)
1258 def verify_decrypted(self, p, rxs):
1260 self.assert_equal(rx[Ether].dst, self.omac)
1261 self.assert_equal(rx[IP].dst, "1.1.1.2")
1263 def verify_encrypted(self, p, sa, rxs):
1266 pkt = sa.decrypt(rx[IP])
1267 if not pkt.haslayer(IP):
1268 pkt = IP(pkt[Raw].load)
1269 self.assert_packet_checksums_valid(pkt)
1270 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1271 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1272 self.assertTrue(pkt.haslayer(GRE))
1274 self.assertEqual(e[Ether].dst, self.omac)
1275 self.assertEqual(e[IP].dst, "1.1.1.2")
1276 except (IndexError, AssertionError):
1277 self.logger.debug(ppp("Unexpected packet:", rx))
1279 self.logger.debug(ppp("Decrypted packet:", pkt))
1285 super(TestIpsecGreTebIfEsp, self).setUp()
1287 self.tun_if = self.pg0
1289 p = self.ipv4_params
1291 bd1 = VppBridgeDomain(self, 1)
1292 bd1.add_vpp_config()
1294 p.tun_sa_out = VppIpsecSA(
1300 p.crypt_algo_vpp_id,
1302 self.vpp_esp_protocol,
1304 self.pg0.remote_ip4,
1306 p.tun_sa_out.add_vpp_config()
1308 p.tun_sa_in = VppIpsecSA(
1314 p.crypt_algo_vpp_id,
1316 self.vpp_esp_protocol,
1317 self.pg0.remote_ip4,
1320 p.tun_sa_in.add_vpp_config()
1322 p.tun_if = VppGreInterface(
1325 self.pg0.remote_ip4,
1326 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1328 p.tun_if.add_vpp_config()
1330 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1332 p.tun_protect.add_vpp_config()
1335 p.tun_if.config_ip4()
1336 config_tun_params(p, self.encryption_type, p.tun_if)
1338 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1339 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1341 self.vapi.cli("clear ipsec sa")
1342 self.vapi.cli("sh adj")
1343 self.vapi.cli("sh ipsec tun")
1346 p = self.ipv4_params
1347 p.tun_if.unconfig_ip4()
1348 super(TestIpsecGreTebIfEsp, self).tearDown()
1351 class TestIpsecGreTebVlanIfEsp(TemplateIpsec, IpsecTun4Tests):
1352 """Ipsec GRE TEB ESP - TUN tests"""
1354 tun4_encrypt_node_name = "esp4-encrypt-tun"
1355 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1356 encryption_type = ESP
1357 omac = "00:11:22:33:44:55"
1359 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1361 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1363 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1365 / Ether(dst=self.omac)
1366 / IP(src="1.1.1.1", dst="1.1.1.2")
1367 / UDP(sport=1144, dport=2233)
1368 / Raw(b"X" * payload_size)
1370 for i in range(count)
1373 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1375 Ether(dst=self.omac)
1377 / IP(src="1.1.1.1", dst="1.1.1.2")
1378 / UDP(sport=1144, dport=2233)
1379 / Raw(b"X" * payload_size)
1380 for i in range(count)
1383 def verify_decrypted(self, p, rxs):
1385 self.assert_equal(rx[Ether].dst, self.omac)
1386 self.assert_equal(rx[Dot1Q].vlan, 11)
1387 self.assert_equal(rx[IP].dst, "1.1.1.2")
1389 def verify_encrypted(self, p, sa, rxs):
1392 pkt = sa.decrypt(rx[IP])
1393 if not pkt.haslayer(IP):
1394 pkt = IP(pkt[Raw].load)
1395 self.assert_packet_checksums_valid(pkt)
1396 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1397 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1398 self.assertTrue(pkt.haslayer(GRE))
1400 self.assertEqual(e[Ether].dst, self.omac)
1401 self.assertFalse(e.haslayer(Dot1Q))
1402 self.assertEqual(e[IP].dst, "1.1.1.2")
1403 except (IndexError, AssertionError):
1404 self.logger.debug(ppp("Unexpected packet:", rx))
1406 self.logger.debug(ppp("Decrypted packet:", pkt))
1412 super(TestIpsecGreTebVlanIfEsp, self).setUp()
1414 self.tun_if = self.pg0
1416 p = self.ipv4_params
1418 bd1 = VppBridgeDomain(self, 1)
1419 bd1.add_vpp_config()
1421 self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1422 self.vapi.l2_interface_vlan_tag_rewrite(
1423 sw_if_index=self.pg1_11.sw_if_index,
1424 vtr_op=L2_VTR_OP.L2_POP_1,
1427 self.pg1_11.admin_up()
1429 p.tun_sa_out = VppIpsecSA(
1435 p.crypt_algo_vpp_id,
1437 self.vpp_esp_protocol,
1439 self.pg0.remote_ip4,
1441 p.tun_sa_out.add_vpp_config()
1443 p.tun_sa_in = VppIpsecSA(
1449 p.crypt_algo_vpp_id,
1451 self.vpp_esp_protocol,
1452 self.pg0.remote_ip4,
1455 p.tun_sa_in.add_vpp_config()
1457 p.tun_if = VppGreInterface(
1460 self.pg0.remote_ip4,
1461 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1463 p.tun_if.add_vpp_config()
1465 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1467 p.tun_protect.add_vpp_config()
1470 p.tun_if.config_ip4()
1471 config_tun_params(p, self.encryption_type, p.tun_if)
1473 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1474 VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1476 self.vapi.cli("clear ipsec sa")
1479 p = self.ipv4_params
1480 p.tun_if.unconfig_ip4()
1481 super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1482 self.pg1_11.admin_down()
1483 self.pg1_11.remove_vpp_config()
1486 class TestIpsecGreTebIfEspTra(TemplateIpsec, IpsecTun4Tests):
1487 """Ipsec GRE TEB ESP - Tra tests"""
1489 tun4_encrypt_node_name = "esp4-encrypt-tun"
1490 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1491 encryption_type = ESP
1492 omac = "00:11:22:33:44:55"
1494 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1496 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1498 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1500 / Ether(dst=self.omac)
1501 / IP(src="1.1.1.1", dst="1.1.1.2")
1502 / UDP(sport=1144, dport=2233)
1503 / Raw(b"X" * payload_size)
1505 for i in range(count)
1508 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1510 Ether(dst=self.omac)
1511 / IP(src="1.1.1.1", dst="1.1.1.2")
1512 / UDP(sport=1144, dport=2233)
1513 / Raw(b"X" * payload_size)
1514 for i in range(count)
1517 def verify_decrypted(self, p, rxs):
1519 self.assert_equal(rx[Ether].dst, self.omac)
1520 self.assert_equal(rx[IP].dst, "1.1.1.2")
1522 def verify_encrypted(self, p, sa, rxs):
1525 pkt = sa.decrypt(rx[IP])
1526 if not pkt.haslayer(IP):
1527 pkt = IP(pkt[Raw].load)
1528 self.assert_packet_checksums_valid(pkt)
1529 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1530 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1531 self.assertTrue(pkt.haslayer(GRE))
1533 self.assertEqual(e[Ether].dst, self.omac)
1534 self.assertEqual(e[IP].dst, "1.1.1.2")
1535 except (IndexError, AssertionError):
1536 self.logger.debug(ppp("Unexpected packet:", rx))
1538 self.logger.debug(ppp("Decrypted packet:", pkt))
1544 super(TestIpsecGreTebIfEspTra, self).setUp()
1546 self.tun_if = self.pg0
1548 p = self.ipv4_params
1550 bd1 = VppBridgeDomain(self, 1)
1551 bd1.add_vpp_config()
1553 p.tun_sa_out = VppIpsecSA(
1559 p.crypt_algo_vpp_id,
1561 self.vpp_esp_protocol,
1563 p.tun_sa_out.add_vpp_config()
1565 p.tun_sa_in = VppIpsecSA(
1571 p.crypt_algo_vpp_id,
1573 self.vpp_esp_protocol,
1575 p.tun_sa_in.add_vpp_config()
1577 p.tun_if = VppGreInterface(
1580 self.pg0.remote_ip4,
1581 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1583 p.tun_if.add_vpp_config()
1585 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1587 p.tun_protect.add_vpp_config()
1590 p.tun_if.config_ip4()
1591 config_tra_params(p, self.encryption_type, p.tun_if)
1593 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1594 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1596 self.vapi.cli("clear ipsec sa")
1599 p = self.ipv4_params
1600 p.tun_if.unconfig_ip4()
1601 super(TestIpsecGreTebIfEspTra, self).tearDown()
1604 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec, IpsecTun4Tests):
1605 """Ipsec GRE TEB UDP ESP - Tra tests"""
1607 tun4_encrypt_node_name = "esp4-encrypt-tun"
1608 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1609 encryption_type = ESP
1610 omac = "00:11:22:33:44:55"
1612 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1614 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1616 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1618 / Ether(dst=self.omac)
1619 / IP(src="1.1.1.1", dst="1.1.1.2")
1620 / UDP(sport=1144, dport=2233)
1621 / Raw(b"X" * payload_size)
1623 for i in range(count)
1626 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1628 Ether(dst=self.omac)
1629 / IP(src="1.1.1.1", dst="1.1.1.2")
1630 / UDP(sport=1144, dport=2233)
1631 / Raw(b"X" * payload_size)
1632 for i in range(count)
1635 def verify_decrypted(self, p, rxs):
1637 self.assert_equal(rx[Ether].dst, self.omac)
1638 self.assert_equal(rx[IP].dst, "1.1.1.2")
1640 def verify_encrypted(self, p, sa, rxs):
1642 self.assertTrue(rx.haslayer(UDP))
1643 self.assertEqual(rx[UDP].dport, 4545)
1644 self.assertEqual(rx[UDP].sport, 5454)
1646 pkt = sa.decrypt(rx[IP])
1647 if not pkt.haslayer(IP):
1648 pkt = IP(pkt[Raw].load)
1649 self.assert_packet_checksums_valid(pkt)
1650 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1651 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1652 self.assertTrue(pkt.haslayer(GRE))
1654 self.assertEqual(e[Ether].dst, self.omac)
1655 self.assertEqual(e[IP].dst, "1.1.1.2")
1656 except (IndexError, AssertionError):
1657 self.logger.debug(ppp("Unexpected packet:", rx))
1659 self.logger.debug(ppp("Decrypted packet:", pkt))
1665 super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1667 self.tun_if = self.pg0
1669 p = self.ipv4_params
1670 p = self.ipv4_params
1671 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
1672 p.nat_header = UDP(sport=5454, dport=4545)
1674 bd1 = VppBridgeDomain(self, 1)
1675 bd1.add_vpp_config()
1677 p.tun_sa_out = VppIpsecSA(
1683 p.crypt_algo_vpp_id,
1685 self.vpp_esp_protocol,
1690 p.tun_sa_out.add_vpp_config()
1692 p.tun_sa_in = VppIpsecSA(
1698 p.crypt_algo_vpp_id,
1700 self.vpp_esp_protocol,
1702 p.flags | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND
1707 p.tun_sa_in.add_vpp_config()
1709 p.tun_if = VppGreInterface(
1712 self.pg0.remote_ip4,
1713 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1715 p.tun_if.add_vpp_config()
1717 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1719 p.tun_protect.add_vpp_config()
1722 p.tun_if.config_ip4()
1723 config_tra_params(p, self.encryption_type, p.tun_if)
1725 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1726 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1728 self.vapi.cli("clear ipsec sa")
1729 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1732 p = self.ipv4_params
1733 p.tun_if.unconfig_ip4()
1734 super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1737 class TestIpsecGreIfEsp(TemplateIpsec, IpsecTun4Tests):
1738 """Ipsec GRE ESP - TUN tests"""
1740 tun4_encrypt_node_name = "esp4-encrypt-tun"
1741 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1742 encryption_type = ESP
1744 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1746 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1748 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1750 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1751 / UDP(sport=1144, dport=2233)
1752 / Raw(b"X" * payload_size)
1754 for i in range(count)
1757 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1759 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1760 / IP(src="1.1.1.1", dst="1.1.1.2")
1761 / UDP(sport=1144, dport=2233)
1762 / Raw(b"X" * payload_size)
1763 for i in range(count)
1766 def verify_decrypted(self, p, rxs):
1768 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1769 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1771 def verify_encrypted(self, p, sa, rxs):
1774 pkt = sa.decrypt(rx[IP])
1775 if not pkt.haslayer(IP):
1776 pkt = IP(pkt[Raw].load)
1777 self.assert_packet_checksums_valid(pkt)
1778 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1779 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1780 self.assertTrue(pkt.haslayer(GRE))
1782 self.assertEqual(e[IP].dst, "1.1.1.2")
1783 except (IndexError, AssertionError):
1784 self.logger.debug(ppp("Unexpected packet:", rx))
1786 self.logger.debug(ppp("Decrypted packet:", pkt))
1792 super(TestIpsecGreIfEsp, self).setUp()
1794 self.tun_if = self.pg0
1796 p = self.ipv4_params
1798 bd1 = VppBridgeDomain(self, 1)
1799 bd1.add_vpp_config()
1801 p.tun_sa_out = VppIpsecSA(
1807 p.crypt_algo_vpp_id,
1809 self.vpp_esp_protocol,
1811 self.pg0.remote_ip4,
1813 p.tun_sa_out.add_vpp_config()
1815 p.tun_sa_in = VppIpsecSA(
1821 p.crypt_algo_vpp_id,
1823 self.vpp_esp_protocol,
1824 self.pg0.remote_ip4,
1827 p.tun_sa_in.add_vpp_config()
1829 p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1830 p.tun_if.add_vpp_config()
1832 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1833 p.tun_protect.add_vpp_config()
1836 p.tun_if.config_ip4()
1837 config_tun_params(p, self.encryption_type, p.tun_if)
1840 self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1844 p = self.ipv4_params
1845 p.tun_if.unconfig_ip4()
1846 super(TestIpsecGreIfEsp, self).tearDown()
1849 class TestIpsecGreIfEspTra(TemplateIpsec, IpsecTun4Tests):
1850 """Ipsec GRE ESP - TRA tests"""
1852 tun4_encrypt_node_name = "esp4-encrypt-tun"
1853 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1854 encryption_type = ESP
1856 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1858 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1860 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1862 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1863 / UDP(sport=1144, dport=2233)
1864 / Raw(b"X" * payload_size)
1866 for i in range(count)
1869 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1, payload_size=100):
1871 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1873 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1875 / UDP(sport=1144, dport=2233)
1876 / Raw(b"X" * payload_size)
1878 for i in range(count)
1881 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1883 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1884 / IP(src="1.1.1.1", dst="1.1.1.2")
1885 / UDP(sport=1144, dport=2233)
1886 / Raw(b"X" * payload_size)
1887 for i in range(count)
1890 def verify_decrypted(self, p, rxs):
1892 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1893 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1895 def verify_encrypted(self, p, sa, rxs):
1898 pkt = sa.decrypt(rx[IP])
1899 if not pkt.haslayer(IP):
1900 pkt = IP(pkt[Raw].load)
1901 self.assert_packet_checksums_valid(pkt)
1902 self.assertTrue(pkt.haslayer(GRE))
1904 self.assertEqual(e[IP].dst, "1.1.1.2")
1905 except (IndexError, AssertionError):
1906 self.logger.debug(ppp("Unexpected packet:", rx))
1908 self.logger.debug(ppp("Decrypted packet:", pkt))
1914 super(TestIpsecGreIfEspTra, self).setUp()
1916 self.tun_if = self.pg0
1918 p = self.ipv4_params
1920 p.tun_sa_out = VppIpsecSA(
1926 p.crypt_algo_vpp_id,
1928 self.vpp_esp_protocol,
1930 p.tun_sa_out.add_vpp_config()
1932 p.tun_sa_in = VppIpsecSA(
1938 p.crypt_algo_vpp_id,
1940 self.vpp_esp_protocol,
1942 p.tun_sa_in.add_vpp_config()
1944 p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1945 p.tun_if.add_vpp_config()
1947 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1948 p.tun_protect.add_vpp_config()
1951 p.tun_if.config_ip4()
1952 config_tra_params(p, self.encryption_type, p.tun_if)
1955 self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1959 p = self.ipv4_params
1960 p.tun_if.unconfig_ip4()
1961 super(TestIpsecGreIfEspTra, self).tearDown()
1963 def test_gre_non_ip(self):
1964 p = self.ipv4_params
1965 tx = self.gen_encrypt_non_ip_pkts(
1968 src=p.remote_tun_if_host,
1969 dst=self.pg1.remote_ip6,
1971 self.send_and_assert_no_replies(self.tun_if, tx)
1972 node_name = "/err/%s/unsup_payload" % self.tun4_decrypt_node_name[0]
1973 self.assertEqual(1, self.statistics.get_err_counter(node_name))
1976 class TestIpsecGre6IfEspTra(TemplateIpsec, IpsecTun6Tests):
1977 """Ipsec GRE ESP - TRA tests"""
1979 tun6_encrypt_node_name = "esp6-encrypt-tun"
1980 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1981 encryption_type = ESP
1983 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1985 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1987 IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
1989 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
1990 / UDP(sport=1144, dport=2233)
1991 / Raw(b"X" * payload_size)
1993 for i in range(count)
1996 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
1998 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1999 / IPv6(src="1::1", dst="1::2")
2000 / UDP(sport=1144, dport=2233)
2001 / Raw(b"X" * payload_size)
2002 for i in range(count)
2005 def verify_decrypted6(self, p, rxs):
2007 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2008 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2010 def verify_encrypted6(self, p, sa, rxs):
2013 pkt = sa.decrypt(rx[IPv6])
2014 if not pkt.haslayer(IPv6):
2015 pkt = IPv6(pkt[Raw].load)
2016 self.assert_packet_checksums_valid(pkt)
2017 self.assertTrue(pkt.haslayer(GRE))
2019 self.assertEqual(e[IPv6].dst, "1::2")
2020 except (IndexError, AssertionError):
2021 self.logger.debug(ppp("Unexpected packet:", rx))
2023 self.logger.debug(ppp("Decrypted packet:", pkt))
2029 super(TestIpsecGre6IfEspTra, self).setUp()
2031 self.tun_if = self.pg0
2033 p = self.ipv6_params
2035 bd1 = VppBridgeDomain(self, 1)
2036 bd1.add_vpp_config()
2038 p.tun_sa_out = VppIpsecSA(
2044 p.crypt_algo_vpp_id,
2046 self.vpp_esp_protocol,
2048 p.tun_sa_out.add_vpp_config()
2050 p.tun_sa_in = VppIpsecSA(
2056 p.crypt_algo_vpp_id,
2058 self.vpp_esp_protocol,
2060 p.tun_sa_in.add_vpp_config()
2062 p.tun_if = VppGreInterface(self, self.pg0.local_ip6, self.pg0.remote_ip6)
2063 p.tun_if.add_vpp_config()
2065 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2066 p.tun_protect.add_vpp_config()
2069 p.tun_if.config_ip6()
2070 config_tra_params(p, self.encryption_type, p.tun_if)
2078 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
2085 p = self.ipv6_params
2086 p.tun_if.unconfig_ip6()
2087 super(TestIpsecGre6IfEspTra, self).tearDown()
2090 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
2091 """Ipsec mGRE ESP v4 TRA tests"""
2093 tun4_encrypt_node_name = "esp4-encrypt-tun"
2094 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2095 encryption_type = ESP
2097 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2099 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2101 IP(src=p.tun_dst, dst=self.pg0.local_ip4)
2103 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
2104 / UDP(sport=1144, dport=2233)
2105 / Raw(b"X" * payload_size)
2107 for i in range(count)
2110 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2112 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2113 / IP(src="1.1.1.1", dst=dst)
2114 / UDP(sport=1144, dport=2233)
2115 / Raw(b"X" * payload_size)
2116 for i in range(count)
2119 def verify_decrypted(self, p, rxs):
2121 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2122 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2124 def verify_encrypted(self, p, sa, rxs):
2127 pkt = sa.decrypt(rx[IP])
2128 if not pkt.haslayer(IP):
2129 pkt = IP(pkt[Raw].load)
2130 self.assert_packet_checksums_valid(pkt)
2131 self.assertTrue(pkt.haslayer(GRE))
2133 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2134 except (IndexError, AssertionError):
2135 self.logger.debug(ppp("Unexpected packet:", rx))
2137 self.logger.debug(ppp("Decrypted packet:", pkt))
2143 super(TestIpsecMGreIfEspTra4, self).setUp()
2146 self.tun_if = self.pg0
2147 p = self.ipv4_params
2148 p.tun_if = VppGreInterface(
2152 mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2154 p.tun_if.add_vpp_config()
2156 p.tun_if.config_ip4()
2157 p.tun_if.generate_remote_hosts(N_NHS)
2158 self.pg0.generate_remote_hosts(N_NHS)
2159 self.pg0.configure_ipv4_neighbors()
2161 # setup some SAs for several next-hops on the interface
2162 self.multi_params = []
2164 for ii in range(N_NHS):
2165 p = copy.copy(self.ipv4_params)
2167 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2168 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2169 p.scapy_tun_spi = p.scapy_tun_spi + ii
2170 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2171 p.vpp_tun_spi = p.vpp_tun_spi + ii
2173 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2174 p.scapy_tra_spi = p.scapy_tra_spi + ii
2175 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2176 p.vpp_tra_spi = p.vpp_tra_spi + ii
2177 p.tun_sa_out = VppIpsecSA(
2183 p.crypt_algo_vpp_id,
2185 self.vpp_esp_protocol,
2187 p.tun_sa_out.add_vpp_config()
2189 p.tun_sa_in = VppIpsecSA(
2195 p.crypt_algo_vpp_id,
2197 self.vpp_esp_protocol,
2199 p.tun_sa_in.add_vpp_config()
2201 p.tun_protect = VppIpsecTunProtect(
2206 nh=p.tun_if.remote_hosts[ii].ip4,
2208 p.tun_protect.add_vpp_config()
2209 config_tra_params(p, self.encryption_type, p.tun_if)
2210 self.multi_params.append(p)
2214 p.remote_tun_if_host,
2216 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
2219 # in this v4 variant add the teibs after the protect
2223 p.tun_if.remote_hosts[ii].ip4,
2224 self.pg0.remote_hosts[ii].ip4,
2226 p.tun_dst = self.pg0.remote_hosts[ii].ip4
2227 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2230 p = self.ipv4_params
2231 p.tun_if.unconfig_ip4()
2232 super(TestIpsecMGreIfEspTra4, self).tearDown()
2234 def test_tun_44(self):
2237 for p in self.multi_params:
2238 self.verify_tun_44(p, count=N_PKTS)
2239 p.teib.remove_vpp_config()
2240 self.verify_tun_dropped_44(p, count=N_PKTS)
2241 p.teib.add_vpp_config()
2242 self.verify_tun_44(p, count=N_PKTS)
2245 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
2246 """Ipsec mGRE ESP v6 TRA tests"""
2248 tun6_encrypt_node_name = "esp6-encrypt-tun"
2249 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2250 encryption_type = ESP
2252 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2254 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2256 IPv6(src=p.tun_dst, dst=self.pg0.local_ip6)
2258 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2259 / UDP(sport=1144, dport=2233)
2260 / Raw(b"X" * payload_size)
2262 for i in range(count)
2265 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2267 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2268 / IPv6(src="1::1", dst=dst)
2269 / UDP(sport=1144, dport=2233)
2270 / Raw(b"X" * payload_size)
2271 for i in range(count)
2274 def verify_decrypted6(self, p, rxs):
2276 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2277 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2279 def verify_encrypted6(self, p, sa, rxs):
2282 pkt = sa.decrypt(rx[IPv6])
2283 if not pkt.haslayer(IPv6):
2284 pkt = IPv6(pkt[Raw].load)
2285 self.assert_packet_checksums_valid(pkt)
2286 self.assertTrue(pkt.haslayer(GRE))
2288 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
2289 except (IndexError, AssertionError):
2290 self.logger.debug(ppp("Unexpected packet:", rx))
2292 self.logger.debug(ppp("Decrypted packet:", pkt))
2298 super(TestIpsecMGreIfEspTra6, self).setUp()
2300 self.vapi.cli("set logging class ipsec level debug")
2303 self.tun_if = self.pg0
2304 p = self.ipv6_params
2305 p.tun_if = VppGreInterface(
2309 mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2311 p.tun_if.add_vpp_config()
2313 p.tun_if.config_ip6()
2314 p.tun_if.generate_remote_hosts(N_NHS)
2315 self.pg0.generate_remote_hosts(N_NHS)
2316 self.pg0.configure_ipv6_neighbors()
2318 # setup some SAs for several next-hops on the interface
2319 self.multi_params = []
2321 for ii in range(N_NHS):
2322 p = copy.copy(self.ipv6_params)
2324 p.remote_tun_if_host = "1::%d" % (ii + 1)
2325 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2326 p.scapy_tun_spi = p.scapy_tun_spi + ii
2327 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2328 p.vpp_tun_spi = p.vpp_tun_spi + ii
2330 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2331 p.scapy_tra_spi = p.scapy_tra_spi + ii
2332 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2333 p.vpp_tra_spi = p.vpp_tra_spi + ii
2334 p.tun_sa_out = VppIpsecSA(
2340 p.crypt_algo_vpp_id,
2342 self.vpp_esp_protocol,
2344 p.tun_sa_out.add_vpp_config()
2346 p.tun_sa_in = VppIpsecSA(
2352 p.crypt_algo_vpp_id,
2354 self.vpp_esp_protocol,
2356 p.tun_sa_in.add_vpp_config()
2358 # in this v6 variant add the teibs first then the protection
2359 p.tun_dst = self.pg0.remote_hosts[ii].ip6
2361 self, p.tun_if, p.tun_if.remote_hosts[ii].ip6, p.tun_dst
2364 p.tun_protect = VppIpsecTunProtect(
2369 nh=p.tun_if.remote_hosts[ii].ip6,
2371 p.tun_protect.add_vpp_config()
2372 config_tra_params(p, self.encryption_type, p.tun_if)
2373 self.multi_params.append(p)
2377 p.remote_tun_if_host,
2379 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6, p.tun_if.sw_if_index)],
2381 p.tun_dst = self.pg0.remote_hosts[ii].ip6
2383 self.logger.info(self.vapi.cli("sh log"))
2384 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2385 self.logger.info(self.vapi.cli("sh adj 41"))
2388 p = self.ipv6_params
2389 p.tun_if.unconfig_ip6()
2390 super(TestIpsecMGreIfEspTra6, self).tearDown()
2392 def test_tun_66(self):
2394 for p in self.multi_params:
2395 self.verify_tun_66(p, count=63)
2398 @tag_fixme_vpp_workers
2399 class TestIpsec4TunProtect(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2400 """IPsec IPv4 Tunnel protect - transport mode"""
2403 super(TestIpsec4TunProtect, self).setUp()
2405 self.tun_if = self.pg0
2408 super(TestIpsec4TunProtect, self).tearDown()
2410 def test_tun_44(self):
2411 """IPSEC tunnel protect"""
2413 p = self.ipv4_params
2415 self.config_network(p)
2416 self.config_sa_tra(p)
2417 self.config_protect(p)
2419 self.verify_tun_44(p, count=127)
2420 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2421 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2423 self.vapi.cli("clear ipsec sa")
2424 self.verify_tun_64(p, count=127)
2425 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2426 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2428 # rekey - create new SAs and update the tunnel protection
2430 np.crypt_key = b"X" + p.crypt_key[1:]
2431 np.scapy_tun_spi += 100
2432 np.scapy_tun_sa_id += 1
2433 np.vpp_tun_spi += 100
2434 np.vpp_tun_sa_id += 1
2435 np.tun_if.local_spi = p.vpp_tun_spi
2436 np.tun_if.remote_spi = p.scapy_tun_spi
2438 self.config_sa_tra(np)
2439 self.config_protect(np)
2442 self.verify_tun_44(np, count=127)
2443 self.assertEqual(p.tun_if.get_rx_stats(), 381)
2444 self.assertEqual(p.tun_if.get_tx_stats(), 381)
2447 self.unconfig_protect(np)
2448 self.unconfig_sa(np)
2449 self.unconfig_network(p)
2452 @tag_fixme_vpp_workers
2453 class TestIpsec4TunProtectUdp(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2454 """IPsec IPv4 Tunnel protect - transport mode"""
2457 super(TestIpsec4TunProtectUdp, self).setUp()
2459 self.tun_if = self.pg0
2461 p = self.ipv4_params
2462 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
2463 p.nat_header = UDP(sport=4500, dport=4500)
2464 self.config_network(p)
2465 self.config_sa_tra(p)
2466 self.config_protect(p)
2469 p = self.ipv4_params
2470 self.unconfig_protect(p)
2472 self.unconfig_network(p)
2473 super(TestIpsec4TunProtectUdp, self).tearDown()
2475 def verify_encrypted(self, p, sa, rxs):
2476 # ensure encrypted packets are recieved with the default UDP ports
2478 self.assertEqual(rx[UDP].sport, 4500)
2479 self.assertEqual(rx[UDP].dport, 4500)
2480 super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2482 def test_tun_44(self):
2483 """IPSEC UDP tunnel protect"""
2485 p = self.ipv4_params
2487 self.verify_tun_44(p, count=127)
2488 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2489 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2491 def test_keepalive(self):
2492 """IPSEC NAT Keepalive"""
2493 self.verify_keepalive(self.ipv4_params)
2496 @tag_fixme_vpp_workers
2497 class TestIpsec4TunProtectTun(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2498 """IPsec IPv4 Tunnel protect - tunnel mode"""
2500 encryption_type = ESP
2501 tun4_encrypt_node_name = "esp4-encrypt-tun"
2502 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2505 super(TestIpsec4TunProtectTun, self).setUp()
2507 self.tun_if = self.pg0
2510 super(TestIpsec4TunProtectTun, self).tearDown()
2512 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2514 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2516 IP(src=sw_intf.remote_ip4, dst=sw_intf.local_ip4)
2517 / IP(src=src, dst=dst)
2518 / UDP(sport=1144, dport=2233)
2519 / Raw(b"X" * payload_size)
2521 for i in range(count)
2524 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2526 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2527 / IP(src=src, dst=dst)
2528 / UDP(sport=1144, dport=2233)
2529 / Raw(b"X" * payload_size)
2530 for i in range(count)
2533 def verify_decrypted(self, p, rxs):
2535 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2536 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2537 self.assert_packet_checksums_valid(rx)
2539 def verify_encrypted(self, p, sa, rxs):
2542 pkt = sa.decrypt(rx[IP])
2543 if not pkt.haslayer(IP):
2544 pkt = IP(pkt[Raw].load)
2545 self.assert_packet_checksums_valid(pkt)
2546 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2547 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2548 inner = pkt[IP].payload
2549 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2551 except (IndexError, AssertionError):
2552 self.logger.debug(ppp("Unexpected packet:", rx))
2554 self.logger.debug(ppp("Decrypted packet:", pkt))
2559 def test_tun_44(self):
2560 """IPSEC tunnel protect"""
2562 p = self.ipv4_params
2564 self.config_network(p)
2565 self.config_sa_tun(p)
2566 self.config_protect(p)
2568 # also add an output features on the tunnel and physical interface
2569 # so we test they still work
2570 r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
2571 a = VppAcl(self, [r_all]).add_vpp_config()
2573 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2574 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2576 self.verify_tun_44(p, count=127)
2578 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2579 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2581 # rekey - create new SAs and update the tunnel protection
2583 np.crypt_key = b"X" + p.crypt_key[1:]
2584 np.scapy_tun_spi += 100
2585 np.scapy_tun_sa_id += 1
2586 np.vpp_tun_spi += 100
2587 np.vpp_tun_sa_id += 1
2588 np.tun_if.local_spi = p.vpp_tun_spi
2589 np.tun_if.remote_spi = p.scapy_tun_spi
2591 self.config_sa_tun(np)
2592 self.config_protect(np)
2595 self.verify_tun_44(np, count=127)
2596 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2597 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2600 self.unconfig_protect(np)
2601 self.unconfig_sa(np)
2602 self.unconfig_network(p)
2605 class TestIpsec4TunProtectTunDrop(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2606 """IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2608 encryption_type = ESP
2609 tun4_encrypt_node_name = "esp4-encrypt-tun"
2610 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2613 super(TestIpsec4TunProtectTunDrop, self).setUp()
2615 self.tun_if = self.pg0
2618 super(TestIpsec4TunProtectTunDrop, self).tearDown()
2620 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2622 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2624 IP(src=sw_intf.remote_ip4, dst="5.5.5.5")
2625 / IP(src=src, dst=dst)
2626 / UDP(sport=1144, dport=2233)
2627 / Raw(b"X" * payload_size)
2629 for i in range(count)
2632 def test_tun_drop_44(self):
2633 """IPSEC tunnel protect bogus tunnel header"""
2635 p = self.ipv4_params
2637 self.config_network(p)
2638 self.config_sa_tun(p)
2639 self.config_protect(p)
2641 tx = self.gen_encrypt_pkts(
2645 src=p.remote_tun_if_host,
2646 dst=self.pg1.remote_ip4,
2649 self.send_and_assert_no_replies(self.tun_if, tx)
2652 self.unconfig_protect(p)
2654 self.unconfig_network(p)
2657 @tag_fixme_vpp_workers
2658 class TestIpsec6TunProtect(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2659 """IPsec IPv6 Tunnel protect - transport mode"""
2661 encryption_type = ESP
2662 tun6_encrypt_node_name = "esp6-encrypt-tun"
2663 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2666 super(TestIpsec6TunProtect, self).setUp()
2668 self.tun_if = self.pg0
2671 super(TestIpsec6TunProtect, self).tearDown()
2673 def test_tun_66(self):
2674 """IPSEC tunnel protect 6o6"""
2676 p = self.ipv6_params
2678 self.config_network(p)
2679 self.config_sa_tra(p)
2680 self.config_protect(p)
2682 self.verify_tun_66(p, count=127)
2683 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2684 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2686 # rekey - create new SAs and update the tunnel protection
2688 np.crypt_key = b"X" + p.crypt_key[1:]
2689 np.scapy_tun_spi += 100
2690 np.scapy_tun_sa_id += 1
2691 np.vpp_tun_spi += 100
2692 np.vpp_tun_sa_id += 1
2693 np.tun_if.local_spi = p.vpp_tun_spi
2694 np.tun_if.remote_spi = p.scapy_tun_spi
2696 self.config_sa_tra(np)
2697 self.config_protect(np)
2700 self.verify_tun_66(np, count=127)
2701 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2702 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2704 # bounce the interface state
2705 p.tun_if.admin_down()
2706 self.verify_drop_tun_66(np, count=127)
2707 node = "/err/ipsec6-tun-input/disabled"
2708 self.assertEqual(127, self.statistics.get_err_counter(node))
2710 self.verify_tun_66(np, count=127)
2713 # 1) add two input SAs [old, new]
2714 # 2) swap output SA to [new]
2715 # 3) use only [new] input SA
2717 np3.crypt_key = b"Z" + p.crypt_key[1:]
2718 np3.scapy_tun_spi += 100
2719 np3.scapy_tun_sa_id += 1
2720 np3.vpp_tun_spi += 100
2721 np3.vpp_tun_sa_id += 1
2722 np3.tun_if.local_spi = p.vpp_tun_spi
2723 np3.tun_if.remote_spi = p.scapy_tun_spi
2725 self.config_sa_tra(np3)
2728 p.tun_protect.update_vpp_config(np.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2729 self.verify_tun_66(np, np, count=127)
2730 self.verify_tun_66(np3, np, count=127)
2733 p.tun_protect.update_vpp_config(np3.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2734 self.verify_tun_66(np, np3, count=127)
2735 self.verify_tun_66(np3, np3, count=127)
2738 p.tun_protect.update_vpp_config(np3.tun_sa_out, [np3.tun_sa_in])
2739 self.verify_tun_66(np3, np3, count=127)
2740 self.verify_drop_tun_rx_66(np, count=127)
2742 self.assertEqual(p.tun_if.get_rx_stats(), 127 * 9)
2743 self.assertEqual(p.tun_if.get_tx_stats(), 127 * 8)
2744 self.unconfig_sa(np)
2747 self.unconfig_protect(np3)
2748 self.unconfig_sa(np3)
2749 self.unconfig_network(p)
2751 def test_tun_46(self):
2752 """IPSEC tunnel protect 4o6"""
2754 p = self.ipv6_params
2756 self.config_network(p)
2757 self.config_sa_tra(p)
2758 self.config_protect(p)
2760 self.verify_tun_46(p, count=127)
2761 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2762 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2765 self.unconfig_protect(p)
2767 self.unconfig_network(p)
2770 @tag_fixme_vpp_workers
2771 class TestIpsec6TunProtectTun(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2772 """IPsec IPv6 Tunnel protect - tunnel mode"""
2774 encryption_type = ESP
2775 tun6_encrypt_node_name = "esp6-encrypt-tun"
2776 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2779 super(TestIpsec6TunProtectTun, self).setUp()
2781 self.tun_if = self.pg0
2784 super(TestIpsec6TunProtectTun, self).tearDown()
2786 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2788 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2790 IPv6(src=sw_intf.remote_ip6, dst=sw_intf.local_ip6)
2791 / IPv6(src=src, dst=dst)
2792 / UDP(sport=1166, dport=2233)
2793 / Raw(b"X" * payload_size)
2795 for i in range(count)
2798 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2800 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2801 / IPv6(src=src, dst=dst)
2802 / UDP(sport=1166, dport=2233)
2803 / Raw(b"X" * payload_size)
2804 for i in range(count)
2807 def verify_decrypted6(self, p, rxs):
2809 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2810 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2811 self.assert_packet_checksums_valid(rx)
2813 def verify_encrypted6(self, p, sa, rxs):
2816 pkt = sa.decrypt(rx[IPv6])
2817 if not pkt.haslayer(IPv6):
2818 pkt = IPv6(pkt[Raw].load)
2819 self.assert_packet_checksums_valid(pkt)
2820 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2821 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2822 inner = pkt[IPv6].payload
2823 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2825 except (IndexError, AssertionError):
2826 self.logger.debug(ppp("Unexpected packet:", rx))
2828 self.logger.debug(ppp("Decrypted packet:", pkt))
2833 def test_tun_66(self):
2834 """IPSEC tunnel protect"""
2836 p = self.ipv6_params
2838 self.config_network(p)
2839 self.config_sa_tun(p)
2840 self.config_protect(p)
2842 self.verify_tun_66(p, count=127)
2844 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2845 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2847 # rekey - create new SAs and update the tunnel protection
2849 np.crypt_key = b"X" + p.crypt_key[1:]
2850 np.scapy_tun_spi += 100
2851 np.scapy_tun_sa_id += 1
2852 np.vpp_tun_spi += 100
2853 np.vpp_tun_sa_id += 1
2854 np.tun_if.local_spi = p.vpp_tun_spi
2855 np.tun_if.remote_spi = p.scapy_tun_spi
2857 self.config_sa_tun(np)
2858 self.config_protect(np)
2861 self.verify_tun_66(np, count=127)
2862 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2863 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2866 self.unconfig_protect(np)
2867 self.unconfig_sa(np)
2868 self.unconfig_network(p)
2871 class TestIpsec6TunProtectTunDrop(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2872 """IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2874 encryption_type = ESP
2875 tun6_encrypt_node_name = "esp6-encrypt-tun"
2876 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2879 super(TestIpsec6TunProtectTunDrop, self).setUp()
2881 self.tun_if = self.pg0
2884 super(TestIpsec6TunProtectTunDrop, self).tearDown()
2886 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2887 # the IP destination of the revelaed packet does not match
2888 # that assigned to the tunnel
2890 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2892 IPv6(src=sw_intf.remote_ip6, dst="5::5")
2893 / IPv6(src=src, dst=dst)
2894 / UDP(sport=1144, dport=2233)
2895 / Raw(b"X" * payload_size)
2897 for i in range(count)
2900 def test_tun_drop_66(self):
2901 """IPSEC 6 tunnel protect bogus tunnel header"""
2903 p = self.ipv6_params
2905 self.config_network(p)
2906 self.config_sa_tun(p)
2907 self.config_protect(p)
2909 tx = self.gen_encrypt_pkts6(
2913 src=p.remote_tun_if_host,
2914 dst=self.pg1.remote_ip6,
2917 self.send_and_assert_no_replies(self.tun_if, tx)
2919 self.unconfig_protect(p)
2921 self.unconfig_network(p)
2924 class TemplateIpsecItf4(object):
2925 """IPsec Interface IPv4"""
2927 encryption_type = ESP
2928 tun4_encrypt_node_name = "esp4-encrypt-tun"
2929 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2930 tun4_input_node = "ipsec4-tun-input"
2932 def config_sa_tun(self, p, src, dst):
2933 config_tun_params(p, self.encryption_type, None, src, dst)
2935 p.tun_sa_out = VppIpsecSA(
2941 p.crypt_algo_vpp_id,
2943 self.vpp_esp_protocol,
2948 p.tun_sa_out.add_vpp_config()
2950 p.tun_sa_in = VppIpsecSA(
2956 p.crypt_algo_vpp_id,
2958 self.vpp_esp_protocol,
2963 p.tun_sa_in.add_vpp_config()
2965 def config_protect(self, p):
2966 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2967 p.tun_protect.add_vpp_config()
2969 def config_network(self, p, instance=0xFFFFFFFF):
2970 p.tun_if = VppIpsecInterface(self, instance=instance)
2972 p.tun_if.add_vpp_config()
2974 p.tun_if.config_ip4()
2975 p.tun_if.config_ip6()
2977 p.route = VppIpRoute(
2979 p.remote_tun_if_host,
2981 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
2983 p.route.add_vpp_config()
2986 p.remote_tun_if_host6,
2990 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
2996 def unconfig_network(self, p):
2997 p.route.remove_vpp_config()
2998 p.tun_if.remove_vpp_config()
3000 def unconfig_protect(self, p):
3001 p.tun_protect.remove_vpp_config()
3003 def unconfig_sa(self, p):
3004 p.tun_sa_out.remove_vpp_config()
3005 p.tun_sa_in.remove_vpp_config()
3008 @tag_fixme_vpp_workers
3009 class TestIpsecItf4(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3010 """IPsec Interface IPv4"""
3013 super(TestIpsecItf4, self).setUp()
3015 self.tun_if = self.pg0
3018 super(TestIpsecItf4, self).tearDown()
3020 def test_tun_instance_44(self):
3021 p = self.ipv4_params
3022 self.config_network(p, instance=3)
3024 with self.assertRaises(CliFailedCommandError):
3025 self.vapi.cli("show interface ipsec0")
3027 output = self.vapi.cli("show interface ipsec3")
3028 self.assertTrue("unknown" not in output)
3030 self.unconfig_network(p)
3032 def test_tun_44(self):
3033 """IPSEC interface IPv4"""
3036 p = self.ipv4_params
3038 self.config_network(p)
3040 p, self.encryption_type, None, self.pg0.local_ip4, self.pg0.remote_ip4
3042 self.verify_tun_dropped_44(p, count=n_pkts)
3043 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3044 self.config_protect(p)
3046 self.verify_tun_44(p, count=n_pkts)
3047 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3048 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3050 p.tun_if.admin_down()
3051 self.verify_tun_dropped_44(p, count=n_pkts)
3053 self.verify_tun_44(p, count=n_pkts)
3055 self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3056 self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3058 # it's a v6 packet when its encrypted
3059 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
3061 self.verify_tun_64(p, count=n_pkts)
3062 self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3063 self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3065 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
3067 self.vapi.cli("clear interfaces")
3069 # rekey - create new SAs and update the tunnel protection
3071 np.crypt_key = b"X" + p.crypt_key[1:]
3072 np.scapy_tun_spi += 100
3073 np.scapy_tun_sa_id += 1
3074 np.vpp_tun_spi += 100
3075 np.vpp_tun_sa_id += 1
3076 np.tun_if.local_spi = p.vpp_tun_spi
3077 np.tun_if.remote_spi = p.scapy_tun_spi
3079 self.config_sa_tun(np, self.pg0.local_ip4, self.pg0.remote_ip4)
3080 self.config_protect(np)
3083 self.verify_tun_44(np, count=n_pkts)
3084 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3085 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3088 self.unconfig_protect(np)
3089 self.unconfig_sa(np)
3090 self.unconfig_network(p)
3092 def test_tun_44_null(self):
3093 """IPSEC interface IPv4 NULL auth/crypto"""
3096 p = copy.copy(self.ipv4_params)
3098 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
3099 p.crypt_algo_vpp_id = (
3100 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
3102 p.crypt_algo = "NULL"
3103 p.auth_algo = "NULL"
3105 self.config_network(p)
3106 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3107 self.config_protect(p)
3109 self.logger.info(self.vapi.cli("sh ipsec sa"))
3110 self.verify_tun_44(p, count=n_pkts)
3113 self.unconfig_protect(p)
3115 self.unconfig_network(p)
3117 def test_tun_44_police(self):
3118 """IPSEC interface IPv4 with input policer"""
3120 p = self.ipv4_params
3122 self.config_network(p)
3123 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3124 self.config_protect(p)
3126 action_tx = PolicerAction(
3127 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3129 policer = VppPolicer(
3136 conform_action=action_tx,
3137 exceed_action=action_tx,
3138 violate_action=action_tx,
3140 policer.add_vpp_config()
3142 # Start policing on tun
3143 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3145 self.verify_tun_44(p, count=n_pkts)
3146 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3147 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3149 stats = policer.get_stats()
3151 # Single rate, 2 colour policer - expect conform, violate but no exceed
3152 self.assertGreater(stats["conform_packets"], 0)
3153 self.assertEqual(stats["exceed_packets"], 0)
3154 self.assertGreater(stats["violate_packets"], 0)
3156 # Stop policing on tun
3157 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3158 self.verify_tun_44(p, count=n_pkts)
3160 # No new policer stats
3161 statsnew = policer.get_stats()
3162 self.assertEqual(stats, statsnew)
3165 policer.remove_vpp_config()
3166 self.unconfig_protect(p)
3168 self.unconfig_network(p)
3171 class TestIpsecItf4MPLS(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3172 """IPsec Interface MPLSoIPv4"""
3174 tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
3177 super(TestIpsecItf4MPLS, self).setUp()
3179 self.tun_if = self.pg0
3182 super(TestIpsecItf4MPLS, self).tearDown()
3184 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3186 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3188 MPLS(label=44, ttl=3)
3189 / IP(src=src, dst=dst)
3190 / UDP(sport=1166, dport=2233)
3191 / Raw(b"X" * payload_size)
3193 for i in range(count)
3196 def verify_encrypted(self, p, sa, rxs):
3199 pkt = sa.decrypt(rx[IP])
3200 if not pkt.haslayer(IP):
3201 pkt = IP(pkt[Raw].load)
3202 self.assert_packet_checksums_valid(pkt)
3203 self.assert_equal(pkt[MPLS].label, 44)
3204 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
3205 except (IndexError, AssertionError):
3206 self.logger.debug(ppp("Unexpected packet:", rx))
3208 self.logger.debug(ppp("Decrypted packet:", pkt))
3213 def test_tun_mpls_o_ip4(self):
3214 """IPSEC interface MPLS over IPv4"""
3217 p = self.ipv4_params
3220 tbl = VppMplsTable(self, 0)
3221 tbl.add_vpp_config()
3223 self.config_network(p)
3224 # deag MPLS routes from the tunnel
3226 self, 44, 1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
3231 p.tun_if.remote_ip4, p.tun_if.sw_if_index, labels=[VppMplsLabel(44)]
3235 p.tun_if.enable_mpls()
3237 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3238 self.config_protect(p)
3240 self.verify_tun_44(p, count=n_pkts)
3243 p.tun_if.disable_mpls()
3244 self.unconfig_protect(p)
3246 self.unconfig_network(p)
3249 class TemplateIpsecItf6(object):
3250 """IPsec Interface IPv6"""
3252 encryption_type = ESP
3253 tun6_encrypt_node_name = "esp6-encrypt-tun"
3254 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
3255 tun6_input_node = "ipsec6-tun-input"
3257 def config_sa_tun(self, p, src, dst):
3258 config_tun_params(p, self.encryption_type, None, src, dst)
3260 if not hasattr(p, "tun_flags"):
3262 if not hasattr(p, "hop_limit"):
3265 p.tun_sa_out = VppIpsecSA(
3271 p.crypt_algo_vpp_id,
3273 self.vpp_esp_protocol,
3277 tun_flags=p.tun_flags,
3278 hop_limit=p.hop_limit,
3280 p.tun_sa_out.add_vpp_config()
3282 p.tun_sa_in = VppIpsecSA(
3288 p.crypt_algo_vpp_id,
3290 self.vpp_esp_protocol,
3295 p.tun_sa_in.add_vpp_config()
3297 def config_protect(self, p):
3298 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
3299 p.tun_protect.add_vpp_config()
3301 def config_network(self, p):
3302 p.tun_if = VppIpsecInterface(self)
3304 p.tun_if.add_vpp_config()
3306 p.tun_if.config_ip4()
3307 p.tun_if.config_ip6()
3311 p.remote_tun_if_host4,
3313 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3317 p.route = VppIpRoute(
3319 p.remote_tun_if_host,
3323 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3327 p.route.add_vpp_config()
3329 def unconfig_network(self, p):
3330 p.route.remove_vpp_config()
3331 p.tun_if.remove_vpp_config()
3333 def unconfig_protect(self, p):
3334 p.tun_protect.remove_vpp_config()
3336 def unconfig_sa(self, p):
3337 p.tun_sa_out.remove_vpp_config()
3338 p.tun_sa_in.remove_vpp_config()
3341 @tag_fixme_vpp_workers
3342 class TestIpsecItf6(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3343 """IPsec Interface IPv6"""
3346 super(TestIpsecItf6, self).setUp()
3348 self.tun_if = self.pg0
3351 super(TestIpsecItf6, self).tearDown()
3353 def test_tun_66(self):
3354 """IPSEC interface IPv6"""
3356 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3358 p = self.ipv6_params
3359 p.inner_hop_limit = 24
3360 p.outer_hop_limit = 23
3361 p.outer_flow_label = 243224
3362 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3364 self.config_network(p)
3366 p, self.encryption_type, None, self.pg0.local_ip6, self.pg0.remote_ip6
3368 self.verify_drop_tun_66(p, count=n_pkts)
3369 self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3370 self.config_protect(p)
3372 self.verify_tun_66(p, count=n_pkts)
3373 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3374 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3376 p.tun_if.admin_down()
3377 self.verify_drop_tun_66(p, count=n_pkts)
3379 self.verify_tun_66(p, count=n_pkts)
3381 self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3382 self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3384 # it's a v4 packet when its encrypted
3385 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
3387 self.verify_tun_46(p, count=n_pkts)
3388 self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3389 self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3391 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
3393 self.vapi.cli("clear interfaces")
3395 # rekey - create new SAs and update the tunnel protection
3397 np.crypt_key = b"X" + p.crypt_key[1:]
3398 np.scapy_tun_spi += 100
3399 np.scapy_tun_sa_id += 1
3400 np.vpp_tun_spi += 100
3401 np.vpp_tun_sa_id += 1
3402 np.tun_if.local_spi = p.vpp_tun_spi
3403 np.tun_if.remote_spi = p.scapy_tun_spi
3404 np.inner_hop_limit = 24
3405 np.outer_hop_limit = 128
3406 np.inner_flow_label = 0xABCDE
3407 np.outer_flow_label = 0xABCDE
3409 np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
3411 self.config_sa_tun(np, self.pg0.local_ip6, self.pg0.remote_ip6)
3412 self.config_protect(np)
3415 self.verify_tun_66(np, count=n_pkts)
3416 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3417 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3420 self.unconfig_protect(np)
3421 self.unconfig_sa(np)
3422 self.unconfig_network(p)
3424 def test_tun_66_police(self):
3425 """IPSEC interface IPv6 with input policer"""
3426 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3428 p = self.ipv6_params
3429 p.inner_hop_limit = 24
3430 p.outer_hop_limit = 23
3431 p.outer_flow_label = 243224
3432 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3434 self.config_network(p)
3435 self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3436 self.config_protect(p)
3438 action_tx = PolicerAction(
3439 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3441 policer = VppPolicer(
3448 conform_action=action_tx,
3449 exceed_action=action_tx,
3450 violate_action=action_tx,
3452 policer.add_vpp_config()
3454 # Start policing on tun
3455 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3457 self.verify_tun_66(p, count=n_pkts)
3458 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3459 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3461 stats = policer.get_stats()
3463 # Single rate, 2 colour policer - expect conform, violate but no exceed
3464 self.assertGreater(stats["conform_packets"], 0)
3465 self.assertEqual(stats["exceed_packets"], 0)
3466 self.assertGreater(stats["violate_packets"], 0)
3468 # Stop policing on tun
3469 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3470 self.verify_tun_66(p, count=n_pkts)
3472 # No new policer stats
3473 statsnew = policer.get_stats()
3474 self.assertEqual(stats, statsnew)
3477 policer.remove_vpp_config()
3478 self.unconfig_protect(p)
3480 self.unconfig_network(p)
3483 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3484 """Ipsec P2MP ESP v4 tests"""
3486 tun4_encrypt_node_name = "esp4-encrypt-tun"
3487 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3488 encryption_type = ESP
3490 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3492 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3494 IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
3495 / UDP(sport=1144, dport=2233)
3496 / Raw(b"X" * payload_size)
3498 for i in range(count)
3501 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
3503 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3504 / IP(src="1.1.1.1", dst=dst)
3505 / UDP(sport=1144, dport=2233)
3506 / Raw(b"X" * payload_size)
3507 for i in range(count)
3510 def verify_decrypted(self, p, rxs):
3512 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3513 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3515 def verify_encrypted(self, p, sa, rxs):
3519 rx[IP].tos, VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2
3521 self.assertEqual(rx[IP].ttl, p.hop_limit)
3522 pkt = sa.decrypt(rx[IP])
3523 if not pkt.haslayer(IP):
3524 pkt = IP(pkt[Raw].load)
3525 self.assert_packet_checksums_valid(pkt)
3527 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3528 except (IndexError, AssertionError):
3529 self.logger.debug(ppp("Unexpected packet:", rx))
3531 self.logger.debug(ppp("Decrypted packet:", pkt))
3537 super(TestIpsecMIfEsp4, self).setUp()
3540 self.tun_if = self.pg0
3541 p = self.ipv4_params
3542 p.tun_if = VppIpsecInterface(
3543 self, mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP)
3545 p.tun_if.add_vpp_config()
3547 p.tun_if.config_ip4()
3548 p.tun_if.unconfig_ip4()
3549 p.tun_if.config_ip4()
3550 p.tun_if.generate_remote_hosts(N_NHS)
3551 self.pg0.generate_remote_hosts(N_NHS)
3552 self.pg0.configure_ipv4_neighbors()
3554 r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
3555 a = VppAcl(self, [r_all]).add_vpp_config()
3557 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
3558 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
3560 # setup some SAs for several next-hops on the interface
3561 self.multi_params = []
3563 for ii in range(N_NHS):
3564 p = copy.copy(self.ipv4_params)
3566 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3567 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3568 p.scapy_tun_spi = p.scapy_tun_spi + ii
3569 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3570 p.vpp_tun_spi = p.vpp_tun_spi + ii
3572 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3573 p.scapy_tra_spi = p.scapy_tra_spi + ii
3574 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3575 p.vpp_tra_spi = p.vpp_tra_spi + ii
3576 p.hop_limit = ii + 10
3577 p.tun_sa_out = VppIpsecSA(
3583 p.crypt_algo_vpp_id,
3585 self.vpp_esp_protocol,
3587 self.pg0.remote_hosts[ii].ip4,
3588 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3589 hop_limit=p.hop_limit,
3591 p.tun_sa_out.add_vpp_config()
3593 p.tun_sa_in = VppIpsecSA(
3599 p.crypt_algo_vpp_id,
3601 self.vpp_esp_protocol,
3602 self.pg0.remote_hosts[ii].ip4,
3604 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3605 hop_limit=p.hop_limit,
3607 p.tun_sa_in.add_vpp_config()
3609 p.tun_protect = VppIpsecTunProtect(
3614 nh=p.tun_if.remote_hosts[ii].ip4,
3616 p.tun_protect.add_vpp_config()
3619 self.encryption_type,
3622 self.pg0.remote_hosts[ii].ip4,
3624 self.multi_params.append(p)
3626 p.via_tun_route = VppIpRoute(
3628 p.remote_tun_if_host,
3630 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
3633 p.tun_dst = self.pg0.remote_hosts[ii].ip4
3636 p = self.ipv4_params
3637 p.tun_if.unconfig_ip4()
3638 super(TestIpsecMIfEsp4, self).tearDown()
3640 def test_tun_44(self):
3643 for p in self.multi_params:
3644 self.verify_tun_44(p, count=N_PKTS)
3646 # remove one tunnel protect, the rest should still work
3647 self.multi_params[0].tun_protect.remove_vpp_config()
3648 self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3649 self.multi_params[0].via_tun_route.remove_vpp_config()
3650 self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3652 for p in self.multi_params[1:]:
3653 self.verify_tun_44(p, count=N_PKTS)
3655 self.multi_params[0].tun_protect.add_vpp_config()
3656 self.multi_params[0].via_tun_route.add_vpp_config()
3658 for p in self.multi_params:
3659 self.verify_tun_44(p, count=N_PKTS)
3662 class TestIpsecItf6MPLS(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3663 """IPsec Interface MPLSoIPv6"""
3665 tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3668 super(TestIpsecItf6MPLS, self).setUp()
3670 self.tun_if = self.pg0
3673 super(TestIpsecItf6MPLS, self).tearDown()
3675 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3677 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3679 MPLS(label=66, ttl=3)
3680 / IPv6(src=src, dst=dst)
3681 / UDP(sport=1166, dport=2233)
3682 / Raw(b"X" * payload_size)
3684 for i in range(count)
3687 def verify_encrypted6(self, p, sa, rxs):
3690 pkt = sa.decrypt(rx[IPv6])
3691 if not pkt.haslayer(IPv6):
3692 pkt = IP(pkt[Raw].load)
3693 self.assert_packet_checksums_valid(pkt)
3694 self.assert_equal(pkt[MPLS].label, 66)
3695 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3696 except (IndexError, AssertionError):
3697 self.logger.debug(ppp("Unexpected packet:", rx))
3699 self.logger.debug(ppp("Decrypted packet:", pkt))
3704 def test_tun_mpls_o_ip6(self):
3705 """IPSEC interface MPLS over IPv6"""
3708 p = self.ipv6_params
3711 tbl = VppMplsTable(self, 0)
3712 tbl.add_vpp_config()
3714 self.config_network(p)
3715 # deag MPLS routes from the tunnel
3720 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3721 eos_proto=f.FIB_PATH_NH_PROTO_IP6,
3726 p.tun_if.remote_ip6, p.tun_if.sw_if_index, labels=[VppMplsLabel(66)]
3730 p.tun_if.enable_mpls()
3732 self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3733 self.config_protect(p)
3735 self.verify_tun_66(p, count=n_pkts)
3738 p.tun_if.disable_mpls()
3739 self.unconfig_protect(p)
3741 self.unconfig_network(p)
3744 if __name__ == "__main__":
3745 unittest.main(testRunner=VppTestRunner)