5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw, bind_layers
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from framework import tag_fixme_vpp_workers
12 from framework import VppTestRunner
13 from template_ipsec import (
21 IpsecTun6HandoffTests,
22 IpsecTun4HandoffTests,
25 from vpp_gre_interface import VppGreInterface
26 from vpp_ipip_tun_interface import VppIpIpTunInterface
27 from vpp_ip_route import (
36 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
37 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
38 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
39 from vpp_teib import VppTeib
41 from vpp_papi import VppEnum
42 from vpp_papi_provider import CliFailedCommandError
43 from vpp_acl import AclRule, VppAcl, VppAclInterface
44 from vpp_policer import PolicerAction, VppPolicer, Dir
47 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
48 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
50 p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
52 crypt_key = mk_scapy_crypt_key(p)
54 p.tun_dst = tun_if.remote_ip
55 p.tun_src = tun_if.local_ip
61 is_default_port = p.nat_header.dport == 4500
63 is_default_port = True
66 outbound_nat_header = p.nat_header
68 outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
69 bind_layers(UDP, ESP, dport=p.nat_header.dport)
71 p.scapy_tun_sa = SecurityAssociation(
74 crypt_algo=p.crypt_algo,
76 auth_algo=p.auth_algo,
78 tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
79 nat_t_header=outbound_nat_header,
82 p.vpp_tun_sa = SecurityAssociation(
85 crypt_algo=p.crypt_algo,
87 auth_algo=p.auth_algo,
89 tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
90 nat_t_header=p.nat_header,
95 def config_tra_params(p, encryption_type, tun_if):
96 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
98 p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
100 crypt_key = mk_scapy_crypt_key(p)
101 p.tun_dst = tun_if.remote_ip
102 p.tun_src = tun_if.local_ip
105 is_default_port = p.nat_header.dport == 4500
107 is_default_port = True
110 outbound_nat_header = p.nat_header
112 outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
113 bind_layers(UDP, ESP, dport=p.nat_header.dport)
115 p.scapy_tun_sa = SecurityAssociation(
118 crypt_algo=p.crypt_algo,
120 auth_algo=p.auth_algo,
123 nat_t_header=outbound_nat_header,
125 p.vpp_tun_sa = SecurityAssociation(
128 crypt_algo=p.crypt_algo,
130 auth_algo=p.auth_algo,
133 nat_t_header=p.nat_header,
137 class TemplateIpsec4TunProtect(object):
138 """IPsec IPv4 Tunnel protect"""
140 encryption_type = ESP
141 tun4_encrypt_node_name = "esp4-encrypt-tun"
142 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
143 tun4_input_node = "ipsec4-tun-input"
145 def config_sa_tra(self, p):
146 config_tun_params(p, self.encryption_type, p.tun_if)
148 p.tun_sa_out = VppIpsecSA(
156 self.vpp_esp_protocol,
159 p.tun_sa_out.add_vpp_config()
161 p.tun_sa_in = VppIpsecSA(
169 self.vpp_esp_protocol,
172 p.tun_sa_in.add_vpp_config()
174 def config_sa_tun(self, p):
175 config_tun_params(p, self.encryption_type, p.tun_if)
177 p.tun_sa_out = VppIpsecSA(
185 self.vpp_esp_protocol,
186 self.tun_if.local_addr[p.addr_type],
187 self.tun_if.remote_addr[p.addr_type],
190 p.tun_sa_out.add_vpp_config()
192 p.tun_sa_in = VppIpsecSA(
200 self.vpp_esp_protocol,
201 self.tun_if.remote_addr[p.addr_type],
202 self.tun_if.local_addr[p.addr_type],
205 p.tun_sa_in.add_vpp_config()
207 def config_protect(self, p):
208 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
209 p.tun_protect.add_vpp_config()
211 def config_network(self, p):
212 if hasattr(p, "tun_dst"):
215 tun_dst = self.pg0.remote_ip4
216 p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip4, tun_dst)
217 p.tun_if.add_vpp_config()
219 p.tun_if.config_ip4()
220 p.tun_if.config_ip6()
222 p.route = VppIpRoute(
224 p.remote_tun_if_host,
226 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
228 p.route.add_vpp_config()
231 p.remote_tun_if_host6,
235 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
241 def unconfig_network(self, p):
242 p.route.remove_vpp_config()
243 p.tun_if.remove_vpp_config()
245 def unconfig_protect(self, p):
246 p.tun_protect.remove_vpp_config()
248 def unconfig_sa(self, p):
249 p.tun_sa_out.remove_vpp_config()
250 p.tun_sa_in.remove_vpp_config()
253 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec):
254 """IPsec tunnel interface tests"""
256 encryption_type = ESP
260 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
263 def tearDownClass(cls):
264 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
267 super(TemplateIpsec4TunIfEsp, self).setUp()
269 self.tun_if = self.pg0
273 self.config_network(p)
274 self.config_sa_tra(p)
275 self.config_protect(p)
278 super(TemplateIpsec4TunIfEsp, self).tearDown()
281 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect, TemplateIpsec):
282 """IPsec UDP tunnel interface tests"""
284 tun4_encrypt_node_name = "esp4-encrypt-tun"
285 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
286 encryption_type = ESP
290 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
293 def tearDownClass(cls):
294 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
296 def verify_encrypted(self, p, sa, rxs):
299 # ensure the UDP ports are correct before we decrypt
301 self.assertTrue(rx.haslayer(UDP))
302 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
303 self.assert_equal(rx[UDP].dport, 4500)
305 pkt = sa.decrypt(rx[IP])
306 if not pkt.haslayer(IP):
307 pkt = IP(pkt[Raw].load)
309 self.assert_packet_checksums_valid(pkt)
310 self.assert_equal(pkt[IP].dst, "1.1.1.1")
311 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
312 except (IndexError, AssertionError):
313 self.logger.debug(ppp("Unexpected packet:", rx))
315 self.logger.debug(ppp("Decrypted packet:", pkt))
320 def config_sa_tra(self, p):
321 config_tun_params(p, self.encryption_type, p.tun_if)
323 p.tun_sa_out = VppIpsecSA(
331 self.vpp_esp_protocol,
333 udp_src=p.nat_header.sport,
334 udp_dst=p.nat_header.dport,
336 p.tun_sa_out.add_vpp_config()
338 p.tun_sa_in = VppIpsecSA(
346 self.vpp_esp_protocol,
348 udp_src=p.nat_header.sport,
349 udp_dst=p.nat_header.dport,
351 p.tun_sa_in.add_vpp_config()
354 super(TemplateIpsec4TunIfEspUdp, self).setUp()
357 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
358 p.nat_header = UDP(sport=5454, dport=4500)
360 self.tun_if = self.pg0
362 self.config_network(p)
363 self.config_sa_tra(p)
364 self.config_protect(p)
367 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
370 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
371 """Ipsec ESP - TUN tests"""
373 tun4_encrypt_node_name = "esp4-encrypt-tun"
374 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
376 def test_tun_basic64(self):
377 """ipsec 6o4 tunnel basic test"""
378 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
380 self.verify_tun_64(self.params[socket.AF_INET], count=1)
382 def test_tun_burst64(self):
383 """ipsec 6o4 tunnel basic test"""
384 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
386 self.verify_tun_64(self.params[socket.AF_INET], count=257)
388 def test_tun_basic_frag44(self):
389 """ipsec 4o4 tunnel frag basic test"""
390 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
394 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [1500, 0, 0, 0])
396 self.params[socket.AF_INET], count=1, payload_size=1800, n_rx=2
398 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [9000, 0, 0, 0])
401 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
402 """Ipsec ESP UDP tests"""
404 tun4_input_node = "ipsec4-tun-input"
407 super(TestIpsec4TunIfEspUdp, self).setUp()
409 def test_keepalive(self):
410 """IPSEC NAT Keepalive"""
411 self.verify_keepalive(self.ipv4_params)
414 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
415 """Ipsec ESP UDP GCM tests"""
417 tun4_input_node = "ipsec4-tun-input"
420 super(TestIpsec4TunIfEspUdpGCM, self).setUp()
422 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
423 p.crypt_algo_vpp_id = (
424 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
426 p.crypt_algo = "AES-GCM"
428 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
432 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
433 """Ipsec ESP - TCP tests"""
438 class TemplateIpsec6TunProtect(object):
439 """IPsec IPv6 Tunnel protect"""
441 def config_sa_tra(self, p):
442 config_tun_params(p, self.encryption_type, p.tun_if)
444 p.tun_sa_out = VppIpsecSA(
452 self.vpp_esp_protocol,
454 p.tun_sa_out.add_vpp_config()
456 p.tun_sa_in = VppIpsecSA(
464 self.vpp_esp_protocol,
466 p.tun_sa_in.add_vpp_config()
468 def config_sa_tun(self, p):
469 config_tun_params(p, self.encryption_type, p.tun_if)
471 p.tun_sa_out = VppIpsecSA(
479 self.vpp_esp_protocol,
480 self.tun_if.local_addr[p.addr_type],
481 self.tun_if.remote_addr[p.addr_type],
483 p.tun_sa_out.add_vpp_config()
485 p.tun_sa_in = VppIpsecSA(
493 self.vpp_esp_protocol,
494 self.tun_if.remote_addr[p.addr_type],
495 self.tun_if.local_addr[p.addr_type],
497 p.tun_sa_in.add_vpp_config()
499 def config_protect(self, p):
500 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
501 p.tun_protect.add_vpp_config()
503 def config_network(self, p):
504 if hasattr(p, "tun_dst"):
507 tun_dst = self.pg0.remote_ip6
508 p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip6, tun_dst)
509 p.tun_if.add_vpp_config()
511 p.tun_if.config_ip6()
512 p.tun_if.config_ip4()
514 p.route = VppIpRoute(
516 p.remote_tun_if_host,
520 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
524 p.route.add_vpp_config()
527 p.remote_tun_if_host4,
529 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
533 def unconfig_network(self, p):
534 p.route.remove_vpp_config()
535 p.tun_if.remove_vpp_config()
537 def unconfig_protect(self, p):
538 p.tun_protect.remove_vpp_config()
540 def unconfig_sa(self, p):
541 p.tun_sa_out.remove_vpp_config()
542 p.tun_sa_in.remove_vpp_config()
545 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec):
546 """IPsec tunnel interface tests"""
548 encryption_type = ESP
551 super(TemplateIpsec6TunIfEsp, self).setUp()
553 self.tun_if = self.pg0
556 self.config_network(p)
557 self.config_sa_tra(p)
558 self.config_protect(p)
561 super(TemplateIpsec6TunIfEsp, self).tearDown()
564 class TemplateIpsec6TunIfEspUdp(TemplateIpsec6TunProtect, TemplateIpsec):
565 """IPsec6 UDP tunnel interface tests"""
567 tun4_encrypt_node_name = "esp6-encrypt-tun"
568 tun4_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
569 encryption_type = ESP
573 super(TemplateIpsec6TunIfEspUdp, cls).setUpClass()
576 def tearDownClass(cls):
577 super(TemplateIpsec6TunIfEspUdp, cls).tearDownClass()
579 def verify_encrypted(self, p, sa, rxs):
582 # ensure the UDP ports are correct before we decrypt
584 self.assertTrue(rx.haslayer(UDP))
585 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
586 self.assert_equal(rx[UDP].dport, 4500)
588 pkt = sa.decrypt(rx[IP])
589 if not pkt.haslayer(IP):
590 pkt = IP(pkt[Raw].load)
592 self.assert_packet_checksums_valid(pkt)
594 pkt[IP].dst, "1111:1111:1111:1111:1111:1111:1111:1111"
596 self.assert_equal(pkt[IP].src, self.pg1.remote_ip6)
597 except (IndexError, AssertionError):
598 self.logger.debug(ppp("Unexpected packet:", rx))
600 self.logger.debug(ppp("Decrypted packet:", pkt))
605 def config_sa_tra(self, p):
606 config_tun_params(p, self.encryption_type, p.tun_if)
608 p.tun_sa_out = VppIpsecSA(
616 self.vpp_esp_protocol,
618 udp_src=p.nat_header.sport,
619 udp_dst=p.nat_header.dport,
621 p.tun_sa_out.add_vpp_config()
623 p.tun_sa_in = VppIpsecSA(
631 self.vpp_esp_protocol,
633 udp_src=p.nat_header.sport,
634 udp_dst=p.nat_header.dport,
636 p.tun_sa_in.add_vpp_config()
639 super(TemplateIpsec6TunIfEspUdp, self).setUp()
642 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
643 p.nat_header = UDP(sport=5454, dport=4500)
645 self.tun_if = self.pg0
647 self.config_network(p)
648 self.config_sa_tra(p)
649 self.config_protect(p)
652 super(TemplateIpsec6TunIfEspUdp, self).tearDown()
655 class TestIpsec6TunIfEspUdp(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
656 """Ipsec ESP 6 UDP tests"""
658 tun6_input_node = "ipsec6-tun-input"
659 tun6_encrypt_node_name = "esp6-encrypt-tun"
660 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
663 super(TestIpsec6TunIfEspUdp, self).setUp()
665 def test_keepalive(self):
666 """IPSEC6 NAT Keepalive"""
667 self.verify_keepalive(self.ipv6_params)
670 class TestIpsec6TunIfEspUdpGCM(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
671 """Ipsec ESP 6 UDP GCM tests"""
673 tun6_input_node = "ipsec6-tun-input"
674 tun6_encrypt_node_name = "esp6-encrypt-tun"
675 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
678 super(TestIpsec6TunIfEspUdpGCM, self).setUp()
680 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
681 p.crypt_algo_vpp_id = (
682 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
684 p.crypt_algo = "AES-GCM"
686 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
690 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
691 """Ipsec ESP - TUN tests"""
693 tun6_encrypt_node_name = "esp6-encrypt-tun"
694 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
696 def test_tun_basic46(self):
697 """ipsec 4o6 tunnel basic test"""
698 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
699 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
701 def test_tun_burst46(self):
702 """ipsec 4o6 tunnel burst test"""
703 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
704 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
707 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp, IpsecTun6HandoffTests):
708 """Ipsec ESP 6 Handoff tests"""
710 tun6_encrypt_node_name = "esp6-encrypt-tun"
711 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
713 def test_tun_handoff_66_police(self):
714 """ESP 6o6 tunnel with policer worker hand-off test"""
715 self.vapi.cli("clear errors")
716 self.vapi.cli("clear ipsec sa")
719 p = self.params[socket.AF_INET6]
721 action_tx = PolicerAction(
722 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
724 policer = VppPolicer(
731 conform_action=action_tx,
732 exceed_action=action_tx,
733 violate_action=action_tx,
735 policer.add_vpp_config()
737 # Start policing on tun
738 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
740 for pol_bind in [1, 0]:
741 policer.bind_vpp_config(pol_bind, True)
743 # inject alternately on worker 0 and 1.
744 for worker in [0, 1, 0, 1]:
745 send_pkts = self.gen_encrypt_pkts6(
749 src=p.remote_tun_if_host,
750 dst=self.pg1.remote_ip6,
753 recv_pkts = self.send_and_expect(
754 self.tun_if, send_pkts, self.pg1, worker=worker
756 self.verify_decrypted6(p, recv_pkts)
757 self.logger.debug(self.vapi.cli("show trace max 100"))
759 stats = policer.get_stats()
760 stats0 = policer.get_stats(worker=0)
761 stats1 = policer.get_stats(worker=1)
764 # First pass: Worker 1, should have done all the policing
765 self.assertEqual(stats, stats1)
767 # Worker 0, should have handed everything off
768 self.assertEqual(stats0["conform_packets"], 0)
769 self.assertEqual(stats0["exceed_packets"], 0)
770 self.assertEqual(stats0["violate_packets"], 0)
772 # Second pass: both workers should have policed equal amounts
773 self.assertGreater(stats1["conform_packets"], 0)
774 self.assertEqual(stats1["exceed_packets"], 0)
775 self.assertGreater(stats1["violate_packets"], 0)
777 self.assertGreater(stats0["conform_packets"], 0)
778 self.assertEqual(stats0["exceed_packets"], 0)
779 self.assertGreater(stats0["violate_packets"], 0)
782 stats0["conform_packets"] + stats0["violate_packets"],
783 stats1["conform_packets"] + stats1["violate_packets"],
786 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
787 policer.remove_vpp_config()
790 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp, IpsecTun4HandoffTests):
791 """Ipsec ESP 4 Handoff tests"""
793 tun4_encrypt_node_name = "esp4-encrypt-tun"
794 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
796 def test_tun_handoff_44_police(self):
797 """ESP 4o4 tunnel with policer worker hand-off test"""
798 self.vapi.cli("clear errors")
799 self.vapi.cli("clear ipsec sa")
802 p = self.params[socket.AF_INET]
804 action_tx = PolicerAction(
805 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
807 policer = VppPolicer(
814 conform_action=action_tx,
815 exceed_action=action_tx,
816 violate_action=action_tx,
818 policer.add_vpp_config()
820 # Start policing on tun
821 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
823 for pol_bind in [1, 0]:
824 policer.bind_vpp_config(pol_bind, True)
826 # inject alternately on worker 0 and 1.
827 for worker in [0, 1, 0, 1]:
828 send_pkts = self.gen_encrypt_pkts(
832 src=p.remote_tun_if_host,
833 dst=self.pg1.remote_ip4,
836 recv_pkts = self.send_and_expect(
837 self.tun_if, send_pkts, self.pg1, worker=worker
839 self.verify_decrypted(p, recv_pkts)
840 self.logger.debug(self.vapi.cli("show trace max 100"))
842 stats = policer.get_stats()
843 stats0 = policer.get_stats(worker=0)
844 stats1 = policer.get_stats(worker=1)
847 # First pass: Worker 1, should have done all the policing
848 self.assertEqual(stats, stats1)
850 # Worker 0, should have handed everything off
851 self.assertEqual(stats0["conform_packets"], 0)
852 self.assertEqual(stats0["exceed_packets"], 0)
853 self.assertEqual(stats0["violate_packets"], 0)
855 # Second pass: both workers should have policed equal amounts
856 self.assertGreater(stats1["conform_packets"], 0)
857 self.assertEqual(stats1["exceed_packets"], 0)
858 self.assertGreater(stats1["violate_packets"], 0)
860 self.assertGreater(stats0["conform_packets"], 0)
861 self.assertEqual(stats0["exceed_packets"], 0)
862 self.assertGreater(stats0["violate_packets"], 0)
865 stats0["conform_packets"] + stats0["violate_packets"],
866 stats1["conform_packets"] + stats1["violate_packets"],
869 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
870 policer.remove_vpp_config()
873 @tag_fixme_vpp_workers
874 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
875 """IPsec IPv4 Multi Tunnel interface"""
877 encryption_type = ESP
878 tun4_encrypt_node_name = "esp4-encrypt-tun"
879 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
882 super(TestIpsec4MultiTunIfEsp, self).setUp()
884 self.tun_if = self.pg0
886 self.multi_params = []
887 self.pg0.generate_remote_hosts(10)
888 self.pg0.configure_ipv4_neighbors()
891 p = copy.copy(self.ipv4_params)
893 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
894 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
895 p.scapy_tun_spi = p.scapy_tun_spi + ii
896 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
897 p.vpp_tun_spi = p.vpp_tun_spi + ii
899 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
900 p.scapy_tra_spi = p.scapy_tra_spi + ii
901 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
902 p.vpp_tra_spi = p.vpp_tra_spi + ii
903 p.tun_dst = self.pg0.remote_hosts[ii].ip4
905 self.multi_params.append(p)
906 self.config_network(p)
907 self.config_sa_tra(p)
908 self.config_protect(p)
911 super(TestIpsec4MultiTunIfEsp, self).tearDown()
913 def test_tun_44(self):
914 """Multiple IPSEC tunnel interfaces"""
915 for p in self.multi_params:
916 self.verify_tun_44(p, count=127)
917 self.assertEqual(p.tun_if.get_rx_stats(), 127)
918 self.assertEqual(p.tun_if.get_tx_stats(), 127)
920 def test_tun_rr_44(self):
921 """Round-robin packets acrros multiple interface"""
923 for p in self.multi_params:
924 tx = tx + self.gen_encrypt_pkts(
928 src=p.remote_tun_if_host,
929 dst=self.pg1.remote_ip4,
931 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
933 for rx, p in zip(rxs, self.multi_params):
934 self.verify_decrypted(p, [rx])
937 for p in self.multi_params:
938 tx = tx + self.gen_pkts(
939 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host
941 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
943 for rx, p in zip(rxs, self.multi_params):
944 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
947 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
948 """IPsec IPv4 Tunnel interface all Algos"""
950 encryption_type = ESP
951 tun4_encrypt_node_name = "esp4-encrypt-tun"
952 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
955 super(TestIpsec4TunIfEspAll, self).setUp()
957 self.tun_if = self.pg0
960 self.config_network(p)
961 self.config_sa_tra(p)
962 self.config_protect(p)
966 self.unconfig_protect(p)
967 self.unconfig_network(p)
970 super(TestIpsec4TunIfEspAll, self).tearDown()
974 # change the key and the SPI
977 p.crypt_key = b"X" + p.crypt_key[1:]
979 p.scapy_tun_sa_id += 1
982 p.tun_if.local_spi = p.vpp_tun_spi
983 p.tun_if.remote_spi = p.scapy_tun_spi
985 config_tun_params(p, self.encryption_type, p.tun_if)
987 p.tun_sa_out = VppIpsecSA(
995 self.vpp_esp_protocol,
999 p.tun_sa_in = VppIpsecSA(
1005 p.crypt_algo_vpp_id,
1007 self.vpp_esp_protocol,
1011 p.tun_sa_in.add_vpp_config()
1012 p.tun_sa_out.add_vpp_config()
1014 self.config_protect(p)
1015 np.tun_sa_out.remove_vpp_config()
1016 np.tun_sa_in.remove_vpp_config()
1017 self.logger.info(self.vapi.cli("sh ipsec sa"))
1019 def test_tun_44(self):
1020 """IPSEC tunnel all algos"""
1022 # foreach VPP crypto engine
1023 engines = ["ia32", "ipsecmb", "openssl"]
1025 # foreach crypto algorithm
1029 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_128
1032 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1034 "scapy-crypto": "AES-GCM",
1035 "scapy-integ": "NULL",
1036 "key": b"JPjyOWBeVEQiMe7h",
1041 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_192
1044 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1046 "scapy-crypto": "AES-GCM",
1047 "scapy-integ": "NULL",
1048 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1053 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
1056 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1058 "scapy-crypto": "AES-GCM",
1059 "scapy-integ": "NULL",
1060 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1065 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
1068 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1070 "scapy-crypto": "AES-CBC",
1071 "scapy-integ": "HMAC-SHA1-96",
1073 "key": b"JPjyOWBeVEQiMe7h",
1077 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_192
1080 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256
1082 "scapy-crypto": "AES-CBC",
1083 "scapy-integ": "SHA2-512-256",
1085 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1089 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_256
1092 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128
1094 "scapy-crypto": "AES-CBC",
1095 "scapy-integ": "SHA2-256-128",
1097 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1101 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1104 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1106 "scapy-crypto": "NULL",
1107 "scapy-integ": "HMAC-SHA1-96",
1109 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1113 for engine in engines:
1114 self.vapi.cli("set crypto handler all %s" % engine)
1117 # loop through each of the algorithms
1120 # with self.subTest(algo=algo['scapy']):
1122 p = self.ipv4_params
1123 p.auth_algo_vpp_id = algo["vpp-integ"]
1124 p.crypt_algo_vpp_id = algo["vpp-crypto"]
1125 p.crypt_algo = algo["scapy-crypto"]
1126 p.auth_algo = algo["scapy-integ"]
1127 p.crypt_key = algo["key"]
1128 p.salt = algo["salt"]
1134 self.verify_tun_44(p, count=127)
1137 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
1138 """IPsec IPv4 Tunnel interface no Algos"""
1140 encryption_type = ESP
1141 tun4_encrypt_node_name = "esp4-encrypt-tun"
1142 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1145 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
1147 self.tun_if = self.pg0
1148 p = self.ipv4_params
1149 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1150 p.auth_algo = "NULL"
1153 p.crypt_algo_vpp_id = (
1154 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1156 p.crypt_algo = "NULL"
1160 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
1162 def test_tun_44(self):
1163 """IPSec SA with NULL algos"""
1164 p = self.ipv4_params
1166 self.config_network(p)
1167 self.config_sa_tra(p)
1168 self.config_protect(p)
1170 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host)
1171 self.send_and_assert_no_replies(self.pg1, tx)
1173 self.unconfig_protect(p)
1175 self.unconfig_network(p)
1178 @tag_fixme_vpp_workers
1179 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec, IpsecTun6):
1180 """IPsec IPv6 Multi Tunnel interface"""
1182 encryption_type = ESP
1183 tun6_encrypt_node_name = "esp6-encrypt-tun"
1184 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1187 super(TestIpsec6MultiTunIfEsp, self).setUp()
1189 self.tun_if = self.pg0
1191 self.multi_params = []
1192 self.pg0.generate_remote_hosts(10)
1193 self.pg0.configure_ipv6_neighbors()
1195 for ii in range(10):
1196 p = copy.copy(self.ipv6_params)
1198 p.remote_tun_if_host = "1111::%d" % (ii + 1)
1199 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1200 p.scapy_tun_spi = p.scapy_tun_spi + ii
1201 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1202 p.vpp_tun_spi = p.vpp_tun_spi + ii
1204 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1205 p.scapy_tra_spi = p.scapy_tra_spi + ii
1206 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1207 p.vpp_tra_spi = p.vpp_tra_spi + ii
1208 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1210 self.multi_params.append(p)
1211 self.config_network(p)
1212 self.config_sa_tra(p)
1213 self.config_protect(p)
1216 super(TestIpsec6MultiTunIfEsp, self).tearDown()
1218 def test_tun_66(self):
1219 """Multiple IPSEC tunnel interfaces"""
1220 for p in self.multi_params:
1221 self.verify_tun_66(p, count=127)
1222 self.assertEqual(p.tun_if.get_rx_stats(), 127)
1223 self.assertEqual(p.tun_if.get_tx_stats(), 127)
1226 class TestIpsecGreTebIfEsp(TemplateIpsec, IpsecTun4Tests):
1227 """Ipsec GRE TEB ESP - TUN tests"""
1229 tun4_encrypt_node_name = "esp4-encrypt-tun"
1230 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1231 encryption_type = ESP
1232 omac = "00:11:22:33:44:55"
1234 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1236 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1238 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1240 / Ether(dst=self.omac)
1241 / IP(src="1.1.1.1", dst="1.1.1.2")
1242 / UDP(sport=1144, dport=2233)
1243 / Raw(b"X" * payload_size)
1245 for i in range(count)
1248 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1250 Ether(dst=self.omac)
1251 / IP(src="1.1.1.1", dst="1.1.1.2")
1252 / UDP(sport=1144, dport=2233)
1253 / Raw(b"X" * payload_size)
1254 for i in range(count)
1257 def verify_decrypted(self, p, rxs):
1259 self.assert_equal(rx[Ether].dst, self.omac)
1260 self.assert_equal(rx[IP].dst, "1.1.1.2")
1262 def verify_encrypted(self, p, sa, rxs):
1265 pkt = sa.decrypt(rx[IP])
1266 if not pkt.haslayer(IP):
1267 pkt = IP(pkt[Raw].load)
1268 self.assert_packet_checksums_valid(pkt)
1269 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1270 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1271 self.assertTrue(pkt.haslayer(GRE))
1273 self.assertEqual(e[Ether].dst, self.omac)
1274 self.assertEqual(e[IP].dst, "1.1.1.2")
1275 except (IndexError, AssertionError):
1276 self.logger.debug(ppp("Unexpected packet:", rx))
1278 self.logger.debug(ppp("Decrypted packet:", pkt))
1284 super(TestIpsecGreTebIfEsp, self).setUp()
1286 self.tun_if = self.pg0
1288 p = self.ipv4_params
1290 bd1 = VppBridgeDomain(self, 1)
1291 bd1.add_vpp_config()
1293 p.tun_sa_out = VppIpsecSA(
1299 p.crypt_algo_vpp_id,
1301 self.vpp_esp_protocol,
1303 self.pg0.remote_ip4,
1305 p.tun_sa_out.add_vpp_config()
1307 p.tun_sa_in = VppIpsecSA(
1313 p.crypt_algo_vpp_id,
1315 self.vpp_esp_protocol,
1316 self.pg0.remote_ip4,
1319 p.tun_sa_in.add_vpp_config()
1321 p.tun_if = VppGreInterface(
1324 self.pg0.remote_ip4,
1325 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1327 p.tun_if.add_vpp_config()
1329 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1331 p.tun_protect.add_vpp_config()
1334 p.tun_if.config_ip4()
1335 config_tun_params(p, self.encryption_type, p.tun_if)
1337 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1338 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1340 self.vapi.cli("clear ipsec sa")
1341 self.vapi.cli("sh adj")
1342 self.vapi.cli("sh ipsec tun")
1345 p = self.ipv4_params
1346 p.tun_if.unconfig_ip4()
1347 super(TestIpsecGreTebIfEsp, self).tearDown()
1350 class TestIpsecGreTebVlanIfEsp(TemplateIpsec, IpsecTun4Tests):
1351 """Ipsec GRE TEB ESP - TUN tests"""
1353 tun4_encrypt_node_name = "esp4-encrypt-tun"
1354 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1355 encryption_type = ESP
1356 omac = "00:11:22:33:44:55"
1358 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1360 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1362 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1364 / Ether(dst=self.omac)
1365 / IP(src="1.1.1.1", dst="1.1.1.2")
1366 / UDP(sport=1144, dport=2233)
1367 / Raw(b"X" * payload_size)
1369 for i in range(count)
1372 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1374 Ether(dst=self.omac)
1376 / IP(src="1.1.1.1", dst="1.1.1.2")
1377 / UDP(sport=1144, dport=2233)
1378 / Raw(b"X" * payload_size)
1379 for i in range(count)
1382 def verify_decrypted(self, p, rxs):
1384 self.assert_equal(rx[Ether].dst, self.omac)
1385 self.assert_equal(rx[Dot1Q].vlan, 11)
1386 self.assert_equal(rx[IP].dst, "1.1.1.2")
1388 def verify_encrypted(self, p, sa, rxs):
1391 pkt = sa.decrypt(rx[IP])
1392 if not pkt.haslayer(IP):
1393 pkt = IP(pkt[Raw].load)
1394 self.assert_packet_checksums_valid(pkt)
1395 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1396 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1397 self.assertTrue(pkt.haslayer(GRE))
1399 self.assertEqual(e[Ether].dst, self.omac)
1400 self.assertFalse(e.haslayer(Dot1Q))
1401 self.assertEqual(e[IP].dst, "1.1.1.2")
1402 except (IndexError, AssertionError):
1403 self.logger.debug(ppp("Unexpected packet:", rx))
1405 self.logger.debug(ppp("Decrypted packet:", pkt))
1411 super(TestIpsecGreTebVlanIfEsp, self).setUp()
1413 self.tun_if = self.pg0
1415 p = self.ipv4_params
1417 bd1 = VppBridgeDomain(self, 1)
1418 bd1.add_vpp_config()
1420 self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1421 self.vapi.l2_interface_vlan_tag_rewrite(
1422 sw_if_index=self.pg1_11.sw_if_index,
1423 vtr_op=L2_VTR_OP.L2_POP_1,
1426 self.pg1_11.admin_up()
1428 p.tun_sa_out = VppIpsecSA(
1434 p.crypt_algo_vpp_id,
1436 self.vpp_esp_protocol,
1438 self.pg0.remote_ip4,
1440 p.tun_sa_out.add_vpp_config()
1442 p.tun_sa_in = VppIpsecSA(
1448 p.crypt_algo_vpp_id,
1450 self.vpp_esp_protocol,
1451 self.pg0.remote_ip4,
1454 p.tun_sa_in.add_vpp_config()
1456 p.tun_if = VppGreInterface(
1459 self.pg0.remote_ip4,
1460 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1462 p.tun_if.add_vpp_config()
1464 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1466 p.tun_protect.add_vpp_config()
1469 p.tun_if.config_ip4()
1470 config_tun_params(p, self.encryption_type, p.tun_if)
1472 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1473 VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1475 self.vapi.cli("clear ipsec sa")
1478 p = self.ipv4_params
1479 p.tun_if.unconfig_ip4()
1480 super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1481 self.pg1_11.admin_down()
1482 self.pg1_11.remove_vpp_config()
1485 class TestIpsecGreTebIfEspTra(TemplateIpsec, IpsecTun4Tests):
1486 """Ipsec GRE TEB ESP - Tra tests"""
1488 tun4_encrypt_node_name = "esp4-encrypt-tun"
1489 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1490 encryption_type = ESP
1491 omac = "00:11:22:33:44:55"
1493 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1495 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1497 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1499 / Ether(dst=self.omac)
1500 / IP(src="1.1.1.1", dst="1.1.1.2")
1501 / UDP(sport=1144, dport=2233)
1502 / Raw(b"X" * payload_size)
1504 for i in range(count)
1507 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1509 Ether(dst=self.omac)
1510 / IP(src="1.1.1.1", dst="1.1.1.2")
1511 / UDP(sport=1144, dport=2233)
1512 / Raw(b"X" * payload_size)
1513 for i in range(count)
1516 def verify_decrypted(self, p, rxs):
1518 self.assert_equal(rx[Ether].dst, self.omac)
1519 self.assert_equal(rx[IP].dst, "1.1.1.2")
1521 def verify_encrypted(self, p, sa, rxs):
1524 pkt = sa.decrypt(rx[IP])
1525 if not pkt.haslayer(IP):
1526 pkt = IP(pkt[Raw].load)
1527 self.assert_packet_checksums_valid(pkt)
1528 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1529 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1530 self.assertTrue(pkt.haslayer(GRE))
1532 self.assertEqual(e[Ether].dst, self.omac)
1533 self.assertEqual(e[IP].dst, "1.1.1.2")
1534 except (IndexError, AssertionError):
1535 self.logger.debug(ppp("Unexpected packet:", rx))
1537 self.logger.debug(ppp("Decrypted packet:", pkt))
1543 super(TestIpsecGreTebIfEspTra, self).setUp()
1545 self.tun_if = self.pg0
1547 p = self.ipv4_params
1549 bd1 = VppBridgeDomain(self, 1)
1550 bd1.add_vpp_config()
1552 p.tun_sa_out = VppIpsecSA(
1558 p.crypt_algo_vpp_id,
1560 self.vpp_esp_protocol,
1562 p.tun_sa_out.add_vpp_config()
1564 p.tun_sa_in = VppIpsecSA(
1570 p.crypt_algo_vpp_id,
1572 self.vpp_esp_protocol,
1574 p.tun_sa_in.add_vpp_config()
1576 p.tun_if = VppGreInterface(
1579 self.pg0.remote_ip4,
1580 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1582 p.tun_if.add_vpp_config()
1584 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1586 p.tun_protect.add_vpp_config()
1589 p.tun_if.config_ip4()
1590 config_tra_params(p, self.encryption_type, p.tun_if)
1592 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1593 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1595 self.vapi.cli("clear ipsec sa")
1598 p = self.ipv4_params
1599 p.tun_if.unconfig_ip4()
1600 super(TestIpsecGreTebIfEspTra, self).tearDown()
1603 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec, IpsecTun4Tests):
1604 """Ipsec GRE TEB UDP ESP - Tra tests"""
1606 tun4_encrypt_node_name = "esp4-encrypt-tun"
1607 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1608 encryption_type = ESP
1609 omac = "00:11:22:33:44:55"
1611 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1613 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1615 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1617 / Ether(dst=self.omac)
1618 / IP(src="1.1.1.1", dst="1.1.1.2")
1619 / UDP(sport=1144, dport=2233)
1620 / Raw(b"X" * payload_size)
1622 for i in range(count)
1625 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1627 Ether(dst=self.omac)
1628 / IP(src="1.1.1.1", dst="1.1.1.2")
1629 / UDP(sport=1144, dport=2233)
1630 / Raw(b"X" * payload_size)
1631 for i in range(count)
1634 def verify_decrypted(self, p, rxs):
1636 self.assert_equal(rx[Ether].dst, self.omac)
1637 self.assert_equal(rx[IP].dst, "1.1.1.2")
1639 def verify_encrypted(self, p, sa, rxs):
1641 self.assertTrue(rx.haslayer(UDP))
1642 self.assertEqual(rx[UDP].dport, 4545)
1643 self.assertEqual(rx[UDP].sport, 5454)
1645 pkt = sa.decrypt(rx[IP])
1646 if not pkt.haslayer(IP):
1647 pkt = IP(pkt[Raw].load)
1648 self.assert_packet_checksums_valid(pkt)
1649 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1650 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1651 self.assertTrue(pkt.haslayer(GRE))
1653 self.assertEqual(e[Ether].dst, self.omac)
1654 self.assertEqual(e[IP].dst, "1.1.1.2")
1655 except (IndexError, AssertionError):
1656 self.logger.debug(ppp("Unexpected packet:", rx))
1658 self.logger.debug(ppp("Decrypted packet:", pkt))
1664 super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1666 self.tun_if = self.pg0
1668 p = self.ipv4_params
1669 p = self.ipv4_params
1670 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
1671 p.nat_header = UDP(sport=5454, dport=4545)
1673 bd1 = VppBridgeDomain(self, 1)
1674 bd1.add_vpp_config()
1676 p.tun_sa_out = VppIpsecSA(
1682 p.crypt_algo_vpp_id,
1684 self.vpp_esp_protocol,
1689 p.tun_sa_out.add_vpp_config()
1691 p.tun_sa_in = VppIpsecSA(
1697 p.crypt_algo_vpp_id,
1699 self.vpp_esp_protocol,
1701 p.flags | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND
1706 p.tun_sa_in.add_vpp_config()
1708 p.tun_if = VppGreInterface(
1711 self.pg0.remote_ip4,
1712 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1714 p.tun_if.add_vpp_config()
1716 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1718 p.tun_protect.add_vpp_config()
1721 p.tun_if.config_ip4()
1722 config_tra_params(p, self.encryption_type, p.tun_if)
1724 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1725 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1727 self.vapi.cli("clear ipsec sa")
1728 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1731 p = self.ipv4_params
1732 p.tun_if.unconfig_ip4()
1733 super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1736 class TestIpsecGreIfEsp(TemplateIpsec, IpsecTun4Tests):
1737 """Ipsec GRE ESP - TUN tests"""
1739 tun4_encrypt_node_name = "esp4-encrypt-tun"
1740 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1741 encryption_type = ESP
1743 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1745 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1747 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1749 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1750 / UDP(sport=1144, dport=2233)
1751 / Raw(b"X" * payload_size)
1753 for i in range(count)
1756 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1758 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1759 / IP(src="1.1.1.1", dst="1.1.1.2")
1760 / UDP(sport=1144, dport=2233)
1761 / Raw(b"X" * payload_size)
1762 for i in range(count)
1765 def verify_decrypted(self, p, rxs):
1767 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1768 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1770 def verify_encrypted(self, p, sa, rxs):
1773 pkt = sa.decrypt(rx[IP])
1774 if not pkt.haslayer(IP):
1775 pkt = IP(pkt[Raw].load)
1776 self.assert_packet_checksums_valid(pkt)
1777 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1778 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1779 self.assertTrue(pkt.haslayer(GRE))
1781 self.assertEqual(e[IP].dst, "1.1.1.2")
1782 except (IndexError, AssertionError):
1783 self.logger.debug(ppp("Unexpected packet:", rx))
1785 self.logger.debug(ppp("Decrypted packet:", pkt))
1791 super(TestIpsecGreIfEsp, self).setUp()
1793 self.tun_if = self.pg0
1795 p = self.ipv4_params
1797 bd1 = VppBridgeDomain(self, 1)
1798 bd1.add_vpp_config()
1800 p.tun_sa_out = VppIpsecSA(
1806 p.crypt_algo_vpp_id,
1808 self.vpp_esp_protocol,
1810 self.pg0.remote_ip4,
1812 p.tun_sa_out.add_vpp_config()
1814 p.tun_sa_in = VppIpsecSA(
1820 p.crypt_algo_vpp_id,
1822 self.vpp_esp_protocol,
1823 self.pg0.remote_ip4,
1826 p.tun_sa_in.add_vpp_config()
1828 p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1829 p.tun_if.add_vpp_config()
1831 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1832 p.tun_protect.add_vpp_config()
1835 p.tun_if.config_ip4()
1836 config_tun_params(p, self.encryption_type, p.tun_if)
1839 self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1843 p = self.ipv4_params
1844 p.tun_if.unconfig_ip4()
1845 super(TestIpsecGreIfEsp, self).tearDown()
1848 class TestIpsecGreIfEspTra(TemplateIpsec, IpsecTun4Tests):
1849 """Ipsec GRE ESP - TRA tests"""
1851 tun4_encrypt_node_name = "esp4-encrypt-tun"
1852 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1853 encryption_type = ESP
1855 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1857 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1859 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1861 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1862 / UDP(sport=1144, dport=2233)
1863 / Raw(b"X" * payload_size)
1865 for i in range(count)
1868 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1, payload_size=100):
1870 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1872 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1874 / UDP(sport=1144, dport=2233)
1875 / Raw(b"X" * payload_size)
1877 for i in range(count)
1880 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1882 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1883 / IP(src="1.1.1.1", dst="1.1.1.2")
1884 / UDP(sport=1144, dport=2233)
1885 / Raw(b"X" * payload_size)
1886 for i in range(count)
1889 def verify_decrypted(self, p, rxs):
1891 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1892 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1894 def verify_encrypted(self, p, sa, rxs):
1897 pkt = sa.decrypt(rx[IP])
1898 if not pkt.haslayer(IP):
1899 pkt = IP(pkt[Raw].load)
1900 self.assert_packet_checksums_valid(pkt)
1901 self.assertTrue(pkt.haslayer(GRE))
1903 self.assertEqual(e[IP].dst, "1.1.1.2")
1904 except (IndexError, AssertionError):
1905 self.logger.debug(ppp("Unexpected packet:", rx))
1907 self.logger.debug(ppp("Decrypted packet:", pkt))
1913 super(TestIpsecGreIfEspTra, self).setUp()
1915 self.tun_if = self.pg0
1917 p = self.ipv4_params
1919 p.tun_sa_out = VppIpsecSA(
1925 p.crypt_algo_vpp_id,
1927 self.vpp_esp_protocol,
1929 p.tun_sa_out.add_vpp_config()
1931 p.tun_sa_in = VppIpsecSA(
1937 p.crypt_algo_vpp_id,
1939 self.vpp_esp_protocol,
1941 p.tun_sa_in.add_vpp_config()
1943 p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1944 p.tun_if.add_vpp_config()
1946 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1947 p.tun_protect.add_vpp_config()
1950 p.tun_if.config_ip4()
1951 config_tra_params(p, self.encryption_type, p.tun_if)
1954 self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1958 p = self.ipv4_params
1959 p.tun_if.unconfig_ip4()
1960 super(TestIpsecGreIfEspTra, self).tearDown()
1962 def test_gre_non_ip(self):
1963 p = self.ipv4_params
1964 tx = self.gen_encrypt_non_ip_pkts(
1967 src=p.remote_tun_if_host,
1968 dst=self.pg1.remote_ip6,
1970 self.send_and_assert_no_replies(self.tun_if, tx)
1971 node_name = "/err/%s/unsup_payload" % self.tun4_decrypt_node_name[0]
1972 self.assertEqual(1, self.statistics.get_err_counter(node_name))
1975 class TestIpsecGre6IfEspTra(TemplateIpsec, IpsecTun6Tests):
1976 """Ipsec GRE ESP - TRA tests"""
1978 tun6_encrypt_node_name = "esp6-encrypt-tun"
1979 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1980 encryption_type = ESP
1982 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1984 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1986 IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
1988 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
1989 / UDP(sport=1144, dport=2233)
1990 / Raw(b"X" * payload_size)
1992 for i in range(count)
1995 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
1997 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1998 / IPv6(src="1::1", dst="1::2")
1999 / UDP(sport=1144, dport=2233)
2000 / Raw(b"X" * payload_size)
2001 for i in range(count)
2004 def verify_decrypted6(self, p, rxs):
2006 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2007 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2009 def verify_encrypted6(self, p, sa, rxs):
2012 pkt = sa.decrypt(rx[IPv6])
2013 if not pkt.haslayer(IPv6):
2014 pkt = IPv6(pkt[Raw].load)
2015 self.assert_packet_checksums_valid(pkt)
2016 self.assertTrue(pkt.haslayer(GRE))
2018 self.assertEqual(e[IPv6].dst, "1::2")
2019 except (IndexError, AssertionError):
2020 self.logger.debug(ppp("Unexpected packet:", rx))
2022 self.logger.debug(ppp("Decrypted packet:", pkt))
2028 super(TestIpsecGre6IfEspTra, self).setUp()
2030 self.tun_if = self.pg0
2032 p = self.ipv6_params
2034 bd1 = VppBridgeDomain(self, 1)
2035 bd1.add_vpp_config()
2037 p.tun_sa_out = VppIpsecSA(
2043 p.crypt_algo_vpp_id,
2045 self.vpp_esp_protocol,
2047 p.tun_sa_out.add_vpp_config()
2049 p.tun_sa_in = VppIpsecSA(
2055 p.crypt_algo_vpp_id,
2057 self.vpp_esp_protocol,
2059 p.tun_sa_in.add_vpp_config()
2061 p.tun_if = VppGreInterface(self, self.pg0.local_ip6, self.pg0.remote_ip6)
2062 p.tun_if.add_vpp_config()
2064 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2065 p.tun_protect.add_vpp_config()
2068 p.tun_if.config_ip6()
2069 config_tra_params(p, self.encryption_type, p.tun_if)
2077 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
2084 p = self.ipv6_params
2085 p.tun_if.unconfig_ip6()
2086 super(TestIpsecGre6IfEspTra, self).tearDown()
2089 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
2090 """Ipsec mGRE ESP v4 TRA tests"""
2092 tun4_encrypt_node_name = "esp4-encrypt-tun"
2093 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2094 encryption_type = ESP
2096 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2098 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2100 IP(src=p.tun_dst, dst=self.pg0.local_ip4)
2102 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
2103 / UDP(sport=1144, dport=2233)
2104 / Raw(b"X" * payload_size)
2106 for i in range(count)
2109 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2111 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2112 / IP(src="1.1.1.1", dst=dst)
2113 / UDP(sport=1144, dport=2233)
2114 / Raw(b"X" * payload_size)
2115 for i in range(count)
2118 def verify_decrypted(self, p, rxs):
2120 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2121 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2123 def verify_encrypted(self, p, sa, rxs):
2126 pkt = sa.decrypt(rx[IP])
2127 if not pkt.haslayer(IP):
2128 pkt = IP(pkt[Raw].load)
2129 self.assert_packet_checksums_valid(pkt)
2130 self.assertTrue(pkt.haslayer(GRE))
2132 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2133 except (IndexError, AssertionError):
2134 self.logger.debug(ppp("Unexpected packet:", rx))
2136 self.logger.debug(ppp("Decrypted packet:", pkt))
2142 super(TestIpsecMGreIfEspTra4, self).setUp()
2145 self.tun_if = self.pg0
2146 p = self.ipv4_params
2147 p.tun_if = VppGreInterface(
2151 mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2153 p.tun_if.add_vpp_config()
2155 p.tun_if.config_ip4()
2156 p.tun_if.generate_remote_hosts(N_NHS)
2157 self.pg0.generate_remote_hosts(N_NHS)
2158 self.pg0.configure_ipv4_neighbors()
2160 # setup some SAs for several next-hops on the interface
2161 self.multi_params = []
2163 for ii in range(N_NHS):
2164 p = copy.copy(self.ipv4_params)
2166 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2167 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2168 p.scapy_tun_spi = p.scapy_tun_spi + ii
2169 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2170 p.vpp_tun_spi = p.vpp_tun_spi + ii
2172 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2173 p.scapy_tra_spi = p.scapy_tra_spi + ii
2174 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2175 p.vpp_tra_spi = p.vpp_tra_spi + ii
2176 p.tun_sa_out = VppIpsecSA(
2182 p.crypt_algo_vpp_id,
2184 self.vpp_esp_protocol,
2186 p.tun_sa_out.add_vpp_config()
2188 p.tun_sa_in = VppIpsecSA(
2194 p.crypt_algo_vpp_id,
2196 self.vpp_esp_protocol,
2198 p.tun_sa_in.add_vpp_config()
2200 p.tun_protect = VppIpsecTunProtect(
2205 nh=p.tun_if.remote_hosts[ii].ip4,
2207 p.tun_protect.add_vpp_config()
2208 config_tra_params(p, self.encryption_type, p.tun_if)
2209 self.multi_params.append(p)
2213 p.remote_tun_if_host,
2215 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
2218 # in this v4 variant add the teibs after the protect
2222 p.tun_if.remote_hosts[ii].ip4,
2223 self.pg0.remote_hosts[ii].ip4,
2225 p.tun_dst = self.pg0.remote_hosts[ii].ip4
2226 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2229 p = self.ipv4_params
2230 p.tun_if.unconfig_ip4()
2231 super(TestIpsecMGreIfEspTra4, self).tearDown()
2233 def test_tun_44(self):
2236 for p in self.multi_params:
2237 self.verify_tun_44(p, count=N_PKTS)
2238 p.teib.remove_vpp_config()
2239 self.verify_tun_dropped_44(p, count=N_PKTS)
2240 p.teib.add_vpp_config()
2241 self.verify_tun_44(p, count=N_PKTS)
2244 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
2245 """Ipsec mGRE ESP v6 TRA tests"""
2247 tun6_encrypt_node_name = "esp6-encrypt-tun"
2248 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2249 encryption_type = ESP
2251 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2253 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2255 IPv6(src=p.tun_dst, dst=self.pg0.local_ip6)
2257 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2258 / UDP(sport=1144, dport=2233)
2259 / Raw(b"X" * payload_size)
2261 for i in range(count)
2264 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2266 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2267 / IPv6(src="1::1", dst=dst)
2268 / UDP(sport=1144, dport=2233)
2269 / Raw(b"X" * payload_size)
2270 for i in range(count)
2273 def verify_decrypted6(self, p, rxs):
2275 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2276 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2278 def verify_encrypted6(self, p, sa, rxs):
2281 pkt = sa.decrypt(rx[IPv6])
2282 if not pkt.haslayer(IPv6):
2283 pkt = IPv6(pkt[Raw].load)
2284 self.assert_packet_checksums_valid(pkt)
2285 self.assertTrue(pkt.haslayer(GRE))
2287 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
2288 except (IndexError, AssertionError):
2289 self.logger.debug(ppp("Unexpected packet:", rx))
2291 self.logger.debug(ppp("Decrypted packet:", pkt))
2297 super(TestIpsecMGreIfEspTra6, self).setUp()
2299 self.vapi.cli("set logging class ipsec level debug")
2302 self.tun_if = self.pg0
2303 p = self.ipv6_params
2304 p.tun_if = VppGreInterface(
2308 mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2310 p.tun_if.add_vpp_config()
2312 p.tun_if.config_ip6()
2313 p.tun_if.generate_remote_hosts(N_NHS)
2314 self.pg0.generate_remote_hosts(N_NHS)
2315 self.pg0.configure_ipv6_neighbors()
2317 # setup some SAs for several next-hops on the interface
2318 self.multi_params = []
2320 for ii in range(N_NHS):
2321 p = copy.copy(self.ipv6_params)
2323 p.remote_tun_if_host = "1::%d" % (ii + 1)
2324 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2325 p.scapy_tun_spi = p.scapy_tun_spi + ii
2326 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2327 p.vpp_tun_spi = p.vpp_tun_spi + ii
2329 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2330 p.scapy_tra_spi = p.scapy_tra_spi + ii
2331 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2332 p.vpp_tra_spi = p.vpp_tra_spi + ii
2333 p.tun_sa_out = VppIpsecSA(
2339 p.crypt_algo_vpp_id,
2341 self.vpp_esp_protocol,
2343 p.tun_sa_out.add_vpp_config()
2345 p.tun_sa_in = VppIpsecSA(
2351 p.crypt_algo_vpp_id,
2353 self.vpp_esp_protocol,
2355 p.tun_sa_in.add_vpp_config()
2357 # in this v6 variant add the teibs first then the protection
2358 p.tun_dst = self.pg0.remote_hosts[ii].ip6
2360 self, p.tun_if, p.tun_if.remote_hosts[ii].ip6, p.tun_dst
2363 p.tun_protect = VppIpsecTunProtect(
2368 nh=p.tun_if.remote_hosts[ii].ip6,
2370 p.tun_protect.add_vpp_config()
2371 config_tra_params(p, self.encryption_type, p.tun_if)
2372 self.multi_params.append(p)
2376 p.remote_tun_if_host,
2378 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6, p.tun_if.sw_if_index)],
2380 p.tun_dst = self.pg0.remote_hosts[ii].ip6
2382 self.logger.info(self.vapi.cli("sh log"))
2383 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2384 self.logger.info(self.vapi.cli("sh adj 41"))
2387 p = self.ipv6_params
2388 p.tun_if.unconfig_ip6()
2389 super(TestIpsecMGreIfEspTra6, self).tearDown()
2391 def test_tun_66(self):
2393 for p in self.multi_params:
2394 self.verify_tun_66(p, count=63)
2397 @tag_fixme_vpp_workers
2398 class TestIpsec4TunProtect(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2399 """IPsec IPv4 Tunnel protect - transport mode"""
2402 super(TestIpsec4TunProtect, self).setUp()
2404 self.tun_if = self.pg0
2407 super(TestIpsec4TunProtect, self).tearDown()
2409 def test_tun_44(self):
2410 """IPSEC tunnel protect"""
2412 p = self.ipv4_params
2414 self.config_network(p)
2415 self.config_sa_tra(p)
2416 self.config_protect(p)
2418 self.verify_tun_44(p, count=127)
2419 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2420 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2422 self.vapi.cli("clear ipsec sa")
2423 self.verify_tun_64(p, count=127)
2424 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2425 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2427 # rekey - create new SAs and update the tunnel protection
2429 np.crypt_key = b"X" + p.crypt_key[1:]
2430 np.scapy_tun_spi += 100
2431 np.scapy_tun_sa_id += 1
2432 np.vpp_tun_spi += 100
2433 np.vpp_tun_sa_id += 1
2434 np.tun_if.local_spi = p.vpp_tun_spi
2435 np.tun_if.remote_spi = p.scapy_tun_spi
2437 self.config_sa_tra(np)
2438 self.config_protect(np)
2441 self.verify_tun_44(np, count=127)
2442 self.assertEqual(p.tun_if.get_rx_stats(), 381)
2443 self.assertEqual(p.tun_if.get_tx_stats(), 381)
2446 self.unconfig_protect(np)
2447 self.unconfig_sa(np)
2448 self.unconfig_network(p)
2451 @tag_fixme_vpp_workers
2452 class TestIpsec4TunProtectUdp(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2453 """IPsec IPv4 Tunnel protect - transport mode"""
2456 super(TestIpsec4TunProtectUdp, self).setUp()
2458 self.tun_if = self.pg0
2460 p = self.ipv4_params
2461 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
2462 p.nat_header = UDP(sport=4500, dport=4500)
2463 self.config_network(p)
2464 self.config_sa_tra(p)
2465 self.config_protect(p)
2468 p = self.ipv4_params
2469 self.unconfig_protect(p)
2471 self.unconfig_network(p)
2472 super(TestIpsec4TunProtectUdp, self).tearDown()
2474 def verify_encrypted(self, p, sa, rxs):
2475 # ensure encrypted packets are recieved with the default UDP ports
2477 self.assertEqual(rx[UDP].sport, 4500)
2478 self.assertEqual(rx[UDP].dport, 4500)
2479 super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2481 def test_tun_44(self):
2482 """IPSEC UDP tunnel protect"""
2484 p = self.ipv4_params
2486 self.verify_tun_44(p, count=127)
2487 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2488 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2490 def test_keepalive(self):
2491 """IPSEC NAT Keepalive"""
2492 self.verify_keepalive(self.ipv4_params)
2495 @tag_fixme_vpp_workers
2496 class TestIpsec4TunProtectTun(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2497 """IPsec IPv4 Tunnel protect - tunnel mode"""
2499 encryption_type = ESP
2500 tun4_encrypt_node_name = "esp4-encrypt-tun"
2501 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2504 super(TestIpsec4TunProtectTun, self).setUp()
2506 self.tun_if = self.pg0
2509 super(TestIpsec4TunProtectTun, self).tearDown()
2511 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2513 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2515 IP(src=sw_intf.remote_ip4, dst=sw_intf.local_ip4)
2516 / IP(src=src, dst=dst)
2517 / UDP(sport=1144, dport=2233)
2518 / Raw(b"X" * payload_size)
2520 for i in range(count)
2523 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2525 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2526 / IP(src=src, dst=dst)
2527 / UDP(sport=1144, dport=2233)
2528 / Raw(b"X" * payload_size)
2529 for i in range(count)
2532 def verify_decrypted(self, p, rxs):
2534 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2535 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2536 self.assert_packet_checksums_valid(rx)
2538 def verify_encrypted(self, p, sa, rxs):
2541 pkt = sa.decrypt(rx[IP])
2542 if not pkt.haslayer(IP):
2543 pkt = IP(pkt[Raw].load)
2544 self.assert_packet_checksums_valid(pkt)
2545 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2546 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2547 inner = pkt[IP].payload
2548 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2550 except (IndexError, AssertionError):
2551 self.logger.debug(ppp("Unexpected packet:", rx))
2553 self.logger.debug(ppp("Decrypted packet:", pkt))
2558 def test_tun_44(self):
2559 """IPSEC tunnel protect"""
2561 p = self.ipv4_params
2563 self.config_network(p)
2564 self.config_sa_tun(p)
2565 self.config_protect(p)
2567 # also add an output features on the tunnel and physical interface
2568 # so we test they still work
2569 r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
2570 a = VppAcl(self, [r_all]).add_vpp_config()
2572 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2573 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2575 self.verify_tun_44(p, count=127)
2577 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2578 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2580 # rekey - create new SAs and update the tunnel protection
2582 np.crypt_key = b"X" + p.crypt_key[1:]
2583 np.scapy_tun_spi += 100
2584 np.scapy_tun_sa_id += 1
2585 np.vpp_tun_spi += 100
2586 np.vpp_tun_sa_id += 1
2587 np.tun_if.local_spi = p.vpp_tun_spi
2588 np.tun_if.remote_spi = p.scapy_tun_spi
2590 self.config_sa_tun(np)
2591 self.config_protect(np)
2594 self.verify_tun_44(np, count=127)
2595 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2596 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2599 self.unconfig_protect(np)
2600 self.unconfig_sa(np)
2601 self.unconfig_network(p)
2604 class TestIpsec4TunProtectTunDrop(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2605 """IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2607 encryption_type = ESP
2608 tun4_encrypt_node_name = "esp4-encrypt-tun"
2609 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2612 super(TestIpsec4TunProtectTunDrop, self).setUp()
2614 self.tun_if = self.pg0
2617 super(TestIpsec4TunProtectTunDrop, self).tearDown()
2619 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2621 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2623 IP(src=sw_intf.remote_ip4, dst="5.5.5.5")
2624 / IP(src=src, dst=dst)
2625 / UDP(sport=1144, dport=2233)
2626 / Raw(b"X" * payload_size)
2628 for i in range(count)
2631 def test_tun_drop_44(self):
2632 """IPSEC tunnel protect bogus tunnel header"""
2634 p = self.ipv4_params
2636 self.config_network(p)
2637 self.config_sa_tun(p)
2638 self.config_protect(p)
2640 tx = self.gen_encrypt_pkts(
2644 src=p.remote_tun_if_host,
2645 dst=self.pg1.remote_ip4,
2648 self.send_and_assert_no_replies(self.tun_if, tx)
2651 self.unconfig_protect(p)
2653 self.unconfig_network(p)
2656 @tag_fixme_vpp_workers
2657 class TestIpsec6TunProtect(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2658 """IPsec IPv6 Tunnel protect - transport mode"""
2660 encryption_type = ESP
2661 tun6_encrypt_node_name = "esp6-encrypt-tun"
2662 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2665 super(TestIpsec6TunProtect, self).setUp()
2667 self.tun_if = self.pg0
2670 super(TestIpsec6TunProtect, self).tearDown()
2672 def test_tun_66(self):
2673 """IPSEC tunnel protect 6o6"""
2675 p = self.ipv6_params
2677 self.config_network(p)
2678 self.config_sa_tra(p)
2679 self.config_protect(p)
2681 self.verify_tun_66(p, count=127)
2682 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2683 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2685 # rekey - create new SAs and update the tunnel protection
2687 np.crypt_key = b"X" + p.crypt_key[1:]
2688 np.scapy_tun_spi += 100
2689 np.scapy_tun_sa_id += 1
2690 np.vpp_tun_spi += 100
2691 np.vpp_tun_sa_id += 1
2692 np.tun_if.local_spi = p.vpp_tun_spi
2693 np.tun_if.remote_spi = p.scapy_tun_spi
2695 self.config_sa_tra(np)
2696 self.config_protect(np)
2699 self.verify_tun_66(np, count=127)
2700 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2701 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2703 # bounce the interface state
2704 p.tun_if.admin_down()
2705 self.verify_drop_tun_66(np, count=127)
2706 node = "/err/ipsec6-tun-input/disabled"
2707 self.assertEqual(127, self.statistics.get_err_counter(node))
2709 self.verify_tun_66(np, count=127)
2712 # 1) add two input SAs [old, new]
2713 # 2) swap output SA to [new]
2714 # 3) use only [new] input SA
2716 np3.crypt_key = b"Z" + p.crypt_key[1:]
2717 np3.scapy_tun_spi += 100
2718 np3.scapy_tun_sa_id += 1
2719 np3.vpp_tun_spi += 100
2720 np3.vpp_tun_sa_id += 1
2721 np3.tun_if.local_spi = p.vpp_tun_spi
2722 np3.tun_if.remote_spi = p.scapy_tun_spi
2724 self.config_sa_tra(np3)
2727 p.tun_protect.update_vpp_config(np.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2728 self.verify_tun_66(np, np, count=127)
2729 self.verify_tun_66(np3, np, count=127)
2732 p.tun_protect.update_vpp_config(np3.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2733 self.verify_tun_66(np, np3, count=127)
2734 self.verify_tun_66(np3, np3, count=127)
2737 p.tun_protect.update_vpp_config(np3.tun_sa_out, [np3.tun_sa_in])
2738 self.verify_tun_66(np3, np3, count=127)
2739 self.verify_drop_tun_rx_66(np, count=127)
2741 self.assertEqual(p.tun_if.get_rx_stats(), 127 * 9)
2742 self.assertEqual(p.tun_if.get_tx_stats(), 127 * 8)
2743 self.unconfig_sa(np)
2746 self.unconfig_protect(np3)
2747 self.unconfig_sa(np3)
2748 self.unconfig_network(p)
2750 def test_tun_46(self):
2751 """IPSEC tunnel protect 4o6"""
2753 p = self.ipv6_params
2755 self.config_network(p)
2756 self.config_sa_tra(p)
2757 self.config_protect(p)
2759 self.verify_tun_46(p, count=127)
2760 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2761 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2764 self.unconfig_protect(p)
2766 self.unconfig_network(p)
2769 @tag_fixme_vpp_workers
2770 class TestIpsec6TunProtectTun(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2771 """IPsec IPv6 Tunnel protect - tunnel mode"""
2773 encryption_type = ESP
2774 tun6_encrypt_node_name = "esp6-encrypt-tun"
2775 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2778 super(TestIpsec6TunProtectTun, self).setUp()
2780 self.tun_if = self.pg0
2783 super(TestIpsec6TunProtectTun, self).tearDown()
2785 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2787 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2789 IPv6(src=sw_intf.remote_ip6, dst=sw_intf.local_ip6)
2790 / IPv6(src=src, dst=dst)
2791 / UDP(sport=1166, dport=2233)
2792 / Raw(b"X" * payload_size)
2794 for i in range(count)
2797 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2799 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2800 / IPv6(src=src, dst=dst)
2801 / UDP(sport=1166, dport=2233)
2802 / Raw(b"X" * payload_size)
2803 for i in range(count)
2806 def verify_decrypted6(self, p, rxs):
2808 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2809 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2810 self.assert_packet_checksums_valid(rx)
2812 def verify_encrypted6(self, p, sa, rxs):
2815 pkt = sa.decrypt(rx[IPv6])
2816 if not pkt.haslayer(IPv6):
2817 pkt = IPv6(pkt[Raw].load)
2818 self.assert_packet_checksums_valid(pkt)
2819 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2820 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2821 inner = pkt[IPv6].payload
2822 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2824 except (IndexError, AssertionError):
2825 self.logger.debug(ppp("Unexpected packet:", rx))
2827 self.logger.debug(ppp("Decrypted packet:", pkt))
2832 def test_tun_66(self):
2833 """IPSEC tunnel protect"""
2835 p = self.ipv6_params
2837 self.config_network(p)
2838 self.config_sa_tun(p)
2839 self.config_protect(p)
2841 self.verify_tun_66(p, count=127)
2843 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2844 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2846 # rekey - create new SAs and update the tunnel protection
2848 np.crypt_key = b"X" + p.crypt_key[1:]
2849 np.scapy_tun_spi += 100
2850 np.scapy_tun_sa_id += 1
2851 np.vpp_tun_spi += 100
2852 np.vpp_tun_sa_id += 1
2853 np.tun_if.local_spi = p.vpp_tun_spi
2854 np.tun_if.remote_spi = p.scapy_tun_spi
2856 self.config_sa_tun(np)
2857 self.config_protect(np)
2860 self.verify_tun_66(np, count=127)
2861 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2862 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2865 self.unconfig_protect(np)
2866 self.unconfig_sa(np)
2867 self.unconfig_network(p)
2870 class TestIpsec6TunProtectTunDrop(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2871 """IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2873 encryption_type = ESP
2874 tun6_encrypt_node_name = "esp6-encrypt-tun"
2875 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2878 super(TestIpsec6TunProtectTunDrop, self).setUp()
2880 self.tun_if = self.pg0
2883 super(TestIpsec6TunProtectTunDrop, self).tearDown()
2885 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2886 # the IP destination of the revelaed packet does not match
2887 # that assigned to the tunnel
2889 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2891 IPv6(src=sw_intf.remote_ip6, dst="5::5")
2892 / IPv6(src=src, dst=dst)
2893 / UDP(sport=1144, dport=2233)
2894 / Raw(b"X" * payload_size)
2896 for i in range(count)
2899 def test_tun_drop_66(self):
2900 """IPSEC 6 tunnel protect bogus tunnel header"""
2902 p = self.ipv6_params
2904 self.config_network(p)
2905 self.config_sa_tun(p)
2906 self.config_protect(p)
2908 tx = self.gen_encrypt_pkts6(
2912 src=p.remote_tun_if_host,
2913 dst=self.pg1.remote_ip6,
2916 self.send_and_assert_no_replies(self.tun_if, tx)
2918 self.unconfig_protect(p)
2920 self.unconfig_network(p)
2923 class TemplateIpsecItf4(object):
2924 """IPsec Interface IPv4"""
2926 encryption_type = ESP
2927 tun4_encrypt_node_name = "esp4-encrypt-tun"
2928 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2929 tun4_input_node = "ipsec4-tun-input"
2931 def config_sa_tun(self, p, src, dst):
2932 config_tun_params(p, self.encryption_type, None, src, dst)
2934 p.tun_sa_out = VppIpsecSA(
2940 p.crypt_algo_vpp_id,
2942 self.vpp_esp_protocol,
2947 p.tun_sa_out.add_vpp_config()
2949 p.tun_sa_in = VppIpsecSA(
2955 p.crypt_algo_vpp_id,
2957 self.vpp_esp_protocol,
2962 p.tun_sa_in.add_vpp_config()
2964 def config_protect(self, p):
2965 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2966 p.tun_protect.add_vpp_config()
2968 def config_network(self, p, instance=0xFFFFFFFF):
2969 p.tun_if = VppIpsecInterface(self, instance=instance)
2971 p.tun_if.add_vpp_config()
2973 p.tun_if.config_ip4()
2974 p.tun_if.config_ip6()
2976 p.route = VppIpRoute(
2978 p.remote_tun_if_host,
2980 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
2982 p.route.add_vpp_config()
2985 p.remote_tun_if_host6,
2989 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
2995 def unconfig_network(self, p):
2996 p.route.remove_vpp_config()
2997 p.tun_if.remove_vpp_config()
2999 def unconfig_protect(self, p):
3000 p.tun_protect.remove_vpp_config()
3002 def unconfig_sa(self, p):
3003 p.tun_sa_out.remove_vpp_config()
3004 p.tun_sa_in.remove_vpp_config()
3007 @tag_fixme_vpp_workers
3008 class TestIpsecItf4(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3009 """IPsec Interface IPv4"""
3012 super(TestIpsecItf4, self).setUp()
3014 self.tun_if = self.pg0
3017 super(TestIpsecItf4, self).tearDown()
3019 def test_tun_instance_44(self):
3020 p = self.ipv4_params
3021 self.config_network(p, instance=3)
3023 with self.assertRaises(CliFailedCommandError):
3024 self.vapi.cli("show interface ipsec0")
3026 output = self.vapi.cli("show interface ipsec3")
3027 self.assertTrue("unknown" not in output)
3029 self.unconfig_network(p)
3031 def test_tun_44(self):
3032 """IPSEC interface IPv4"""
3035 p = self.ipv4_params
3037 self.config_network(p)
3039 p, self.encryption_type, None, self.pg0.local_ip4, self.pg0.remote_ip4
3041 self.verify_tun_dropped_44(p, count=n_pkts)
3042 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3043 self.config_protect(p)
3045 self.verify_tun_44(p, count=n_pkts)
3046 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3047 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3049 p.tun_if.admin_down()
3050 self.verify_tun_dropped_44(p, count=n_pkts)
3052 self.verify_tun_44(p, count=n_pkts)
3054 self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3055 self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3057 # it's a v6 packet when its encrypted
3058 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
3060 self.verify_tun_64(p, count=n_pkts)
3061 self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3062 self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3064 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
3066 self.vapi.cli("clear interfaces")
3068 # rekey - create new SAs and update the tunnel protection
3070 np.crypt_key = b"X" + p.crypt_key[1:]
3071 np.scapy_tun_spi += 100
3072 np.scapy_tun_sa_id += 1
3073 np.vpp_tun_spi += 100
3074 np.vpp_tun_sa_id += 1
3075 np.tun_if.local_spi = p.vpp_tun_spi
3076 np.tun_if.remote_spi = p.scapy_tun_spi
3078 self.config_sa_tun(np, self.pg0.local_ip4, self.pg0.remote_ip4)
3079 self.config_protect(np)
3082 self.verify_tun_44(np, count=n_pkts)
3083 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3084 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3087 self.unconfig_protect(np)
3088 self.unconfig_sa(np)
3089 self.unconfig_network(p)
3091 def test_tun_44_null(self):
3092 """IPSEC interface IPv4 NULL auth/crypto"""
3095 p = copy.copy(self.ipv4_params)
3097 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
3098 p.crypt_algo_vpp_id = (
3099 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
3101 p.crypt_algo = "NULL"
3102 p.auth_algo = "NULL"
3104 self.config_network(p)
3105 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3106 self.config_protect(p)
3108 self.logger.info(self.vapi.cli("sh ipsec sa"))
3109 self.verify_tun_44(p, count=n_pkts)
3112 self.unconfig_protect(p)
3114 self.unconfig_network(p)
3116 def test_tun_44_police(self):
3117 """IPSEC interface IPv4 with input policer"""
3119 p = self.ipv4_params
3121 self.config_network(p)
3122 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3123 self.config_protect(p)
3125 action_tx = PolicerAction(
3126 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3128 policer = VppPolicer(
3135 conform_action=action_tx,
3136 exceed_action=action_tx,
3137 violate_action=action_tx,
3139 policer.add_vpp_config()
3141 # Start policing on tun
3142 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3144 self.verify_tun_44(p, count=n_pkts)
3145 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3146 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3148 stats = policer.get_stats()
3150 # Single rate, 2 colour policer - expect conform, violate but no exceed
3151 self.assertGreater(stats["conform_packets"], 0)
3152 self.assertEqual(stats["exceed_packets"], 0)
3153 self.assertGreater(stats["violate_packets"], 0)
3155 # Stop policing on tun
3156 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3157 self.verify_tun_44(p, count=n_pkts)
3159 # No new policer stats
3160 statsnew = policer.get_stats()
3161 self.assertEqual(stats, statsnew)
3164 policer.remove_vpp_config()
3165 self.unconfig_protect(p)
3167 self.unconfig_network(p)
3170 class TestIpsecItf4MPLS(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3171 """IPsec Interface MPLSoIPv4"""
3173 tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
3176 super(TestIpsecItf4MPLS, self).setUp()
3178 self.tun_if = self.pg0
3181 super(TestIpsecItf4MPLS, self).tearDown()
3183 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3185 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3187 MPLS(label=44, ttl=3)
3188 / IP(src=src, dst=dst)
3189 / UDP(sport=1166, dport=2233)
3190 / Raw(b"X" * payload_size)
3192 for i in range(count)
3195 def verify_encrypted(self, p, sa, rxs):
3198 pkt = sa.decrypt(rx[IP])
3199 if not pkt.haslayer(IP):
3200 pkt = IP(pkt[Raw].load)
3201 self.assert_packet_checksums_valid(pkt)
3202 self.assert_equal(pkt[MPLS].label, 44)
3203 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
3204 except (IndexError, AssertionError):
3205 self.logger.debug(ppp("Unexpected packet:", rx))
3207 self.logger.debug(ppp("Decrypted packet:", pkt))
3212 def test_tun_mpls_o_ip4(self):
3213 """IPSEC interface MPLS over IPv4"""
3216 p = self.ipv4_params
3219 tbl = VppMplsTable(self, 0)
3220 tbl.add_vpp_config()
3222 self.config_network(p)
3223 # deag MPLS routes from the tunnel
3225 self, 44, 1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
3230 p.tun_if.remote_ip4, p.tun_if.sw_if_index, labels=[VppMplsLabel(44)]
3234 p.tun_if.enable_mpls()
3236 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3237 self.config_protect(p)
3239 self.verify_tun_44(p, count=n_pkts)
3242 p.tun_if.disable_mpls()
3243 self.unconfig_protect(p)
3245 self.unconfig_network(p)
3248 class TemplateIpsecItf6(object):
3249 """IPsec Interface IPv6"""
3251 encryption_type = ESP
3252 tun6_encrypt_node_name = "esp6-encrypt-tun"
3253 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
3254 tun6_input_node = "ipsec6-tun-input"
3256 def config_sa_tun(self, p, src, dst):
3257 config_tun_params(p, self.encryption_type, None, src, dst)
3259 if not hasattr(p, "tun_flags"):
3261 if not hasattr(p, "hop_limit"):
3264 p.tun_sa_out = VppIpsecSA(
3270 p.crypt_algo_vpp_id,
3272 self.vpp_esp_protocol,
3276 tun_flags=p.tun_flags,
3277 hop_limit=p.hop_limit,
3279 p.tun_sa_out.add_vpp_config()
3281 p.tun_sa_in = VppIpsecSA(
3287 p.crypt_algo_vpp_id,
3289 self.vpp_esp_protocol,
3294 p.tun_sa_in.add_vpp_config()
3296 def config_protect(self, p):
3297 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
3298 p.tun_protect.add_vpp_config()
3300 def config_network(self, p):
3301 p.tun_if = VppIpsecInterface(self)
3303 p.tun_if.add_vpp_config()
3305 p.tun_if.config_ip4()
3306 p.tun_if.config_ip6()
3310 p.remote_tun_if_host4,
3312 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3316 p.route = VppIpRoute(
3318 p.remote_tun_if_host,
3322 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3326 p.route.add_vpp_config()
3328 def unconfig_network(self, p):
3329 p.route.remove_vpp_config()
3330 p.tun_if.remove_vpp_config()
3332 def unconfig_protect(self, p):
3333 p.tun_protect.remove_vpp_config()
3335 def unconfig_sa(self, p):
3336 p.tun_sa_out.remove_vpp_config()
3337 p.tun_sa_in.remove_vpp_config()
3340 @tag_fixme_vpp_workers
3341 class TestIpsecItf6(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3342 """IPsec Interface IPv6"""
3345 super(TestIpsecItf6, self).setUp()
3347 self.tun_if = self.pg0
3350 super(TestIpsecItf6, self).tearDown()
3352 def test_tun_66(self):
3353 """IPSEC interface IPv6"""
3355 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3357 p = self.ipv6_params
3358 p.inner_hop_limit = 24
3359 p.outer_hop_limit = 23
3360 p.outer_flow_label = 243224
3361 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3363 self.config_network(p)
3365 p, self.encryption_type, None, self.pg0.local_ip6, self.pg0.remote_ip6
3367 self.verify_drop_tun_66(p, count=n_pkts)
3368 self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3369 self.config_protect(p)
3371 self.verify_tun_66(p, count=n_pkts)
3372 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3373 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3375 p.tun_if.admin_down()
3376 self.verify_drop_tun_66(p, count=n_pkts)
3378 self.verify_tun_66(p, count=n_pkts)
3380 self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3381 self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3383 # it's a v4 packet when its encrypted
3384 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
3386 self.verify_tun_46(p, count=n_pkts)
3387 self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3388 self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3390 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
3392 self.vapi.cli("clear interfaces")
3394 # rekey - create new SAs and update the tunnel protection
3396 np.crypt_key = b"X" + p.crypt_key[1:]
3397 np.scapy_tun_spi += 100
3398 np.scapy_tun_sa_id += 1
3399 np.vpp_tun_spi += 100
3400 np.vpp_tun_sa_id += 1
3401 np.tun_if.local_spi = p.vpp_tun_spi
3402 np.tun_if.remote_spi = p.scapy_tun_spi
3403 np.inner_hop_limit = 24
3404 np.outer_hop_limit = 128
3405 np.inner_flow_label = 0xABCDE
3406 np.outer_flow_label = 0xABCDE
3408 np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
3410 self.config_sa_tun(np, self.pg0.local_ip6, self.pg0.remote_ip6)
3411 self.config_protect(np)
3414 self.verify_tun_66(np, count=n_pkts)
3415 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3416 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3419 self.unconfig_protect(np)
3420 self.unconfig_sa(np)
3421 self.unconfig_network(p)
3423 def test_tun_66_police(self):
3424 """IPSEC interface IPv6 with input policer"""
3425 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3427 p = self.ipv6_params
3428 p.inner_hop_limit = 24
3429 p.outer_hop_limit = 23
3430 p.outer_flow_label = 243224
3431 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3433 self.config_network(p)
3434 self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3435 self.config_protect(p)
3437 action_tx = PolicerAction(
3438 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3440 policer = VppPolicer(
3447 conform_action=action_tx,
3448 exceed_action=action_tx,
3449 violate_action=action_tx,
3451 policer.add_vpp_config()
3453 # Start policing on tun
3454 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3456 self.verify_tun_66(p, count=n_pkts)
3457 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3458 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3460 stats = policer.get_stats()
3462 # Single rate, 2 colour policer - expect conform, violate but no exceed
3463 self.assertGreater(stats["conform_packets"], 0)
3464 self.assertEqual(stats["exceed_packets"], 0)
3465 self.assertGreater(stats["violate_packets"], 0)
3467 # Stop policing on tun
3468 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3469 self.verify_tun_66(p, count=n_pkts)
3471 # No new policer stats
3472 statsnew = policer.get_stats()
3473 self.assertEqual(stats, statsnew)
3476 policer.remove_vpp_config()
3477 self.unconfig_protect(p)
3479 self.unconfig_network(p)
3482 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3483 """Ipsec P2MP ESP v4 tests"""
3485 tun4_encrypt_node_name = "esp4-encrypt-tun"
3486 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3487 encryption_type = ESP
3489 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3491 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3493 IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
3494 / UDP(sport=1144, dport=2233)
3495 / Raw(b"X" * payload_size)
3497 for i in range(count)
3500 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
3502 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3503 / IP(src="1.1.1.1", dst=dst)
3504 / UDP(sport=1144, dport=2233)
3505 / Raw(b"X" * payload_size)
3506 for i in range(count)
3509 def verify_decrypted(self, p, rxs):
3511 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3512 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3514 def verify_encrypted(self, p, sa, rxs):
3518 rx[IP].tos, VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2
3520 self.assertEqual(rx[IP].ttl, p.hop_limit)
3521 pkt = sa.decrypt(rx[IP])
3522 if not pkt.haslayer(IP):
3523 pkt = IP(pkt[Raw].load)
3524 self.assert_packet_checksums_valid(pkt)
3526 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3527 except (IndexError, AssertionError):
3528 self.logger.debug(ppp("Unexpected packet:", rx))
3530 self.logger.debug(ppp("Decrypted packet:", pkt))
3536 super(TestIpsecMIfEsp4, self).setUp()
3539 self.tun_if = self.pg0
3540 p = self.ipv4_params
3541 p.tun_if = VppIpsecInterface(
3542 self, mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP)
3544 p.tun_if.add_vpp_config()
3546 p.tun_if.config_ip4()
3547 p.tun_if.unconfig_ip4()
3548 p.tun_if.config_ip4()
3549 p.tun_if.generate_remote_hosts(N_NHS)
3550 self.pg0.generate_remote_hosts(N_NHS)
3551 self.pg0.configure_ipv4_neighbors()
3553 r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
3554 a = VppAcl(self, [r_all]).add_vpp_config()
3556 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
3557 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
3559 # setup some SAs for several next-hops on the interface
3560 self.multi_params = []
3562 for ii in range(N_NHS):
3563 p = copy.copy(self.ipv4_params)
3565 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3566 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3567 p.scapy_tun_spi = p.scapy_tun_spi + ii
3568 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3569 p.vpp_tun_spi = p.vpp_tun_spi + ii
3571 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3572 p.scapy_tra_spi = p.scapy_tra_spi + ii
3573 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3574 p.vpp_tra_spi = p.vpp_tra_spi + ii
3575 p.hop_limit = ii + 10
3576 p.tun_sa_out = VppIpsecSA(
3582 p.crypt_algo_vpp_id,
3584 self.vpp_esp_protocol,
3586 self.pg0.remote_hosts[ii].ip4,
3587 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3588 hop_limit=p.hop_limit,
3590 p.tun_sa_out.add_vpp_config()
3592 p.tun_sa_in = VppIpsecSA(
3598 p.crypt_algo_vpp_id,
3600 self.vpp_esp_protocol,
3601 self.pg0.remote_hosts[ii].ip4,
3603 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3604 hop_limit=p.hop_limit,
3606 p.tun_sa_in.add_vpp_config()
3608 p.tun_protect = VppIpsecTunProtect(
3613 nh=p.tun_if.remote_hosts[ii].ip4,
3615 p.tun_protect.add_vpp_config()
3618 self.encryption_type,
3621 self.pg0.remote_hosts[ii].ip4,
3623 self.multi_params.append(p)
3625 p.via_tun_route = VppIpRoute(
3627 p.remote_tun_if_host,
3629 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
3632 p.tun_dst = self.pg0.remote_hosts[ii].ip4
3635 p = self.ipv4_params
3636 p.tun_if.unconfig_ip4()
3637 super(TestIpsecMIfEsp4, self).tearDown()
3639 def test_tun_44(self):
3642 for p in self.multi_params:
3643 self.verify_tun_44(p, count=N_PKTS)
3645 # remove one tunnel protect, the rest should still work
3646 self.multi_params[0].tun_protect.remove_vpp_config()
3647 self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3648 self.multi_params[0].via_tun_route.remove_vpp_config()
3649 self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3651 for p in self.multi_params[1:]:
3652 self.verify_tun_44(p, count=N_PKTS)
3654 self.multi_params[0].tun_protect.add_vpp_config()
3655 self.multi_params[0].via_tun_route.add_vpp_config()
3657 for p in self.multi_params:
3658 self.verify_tun_44(p, count=N_PKTS)
3661 class TestIpsecItf6MPLS(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3662 """IPsec Interface MPLSoIPv6"""
3664 tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3667 super(TestIpsecItf6MPLS, self).setUp()
3669 self.tun_if = self.pg0
3672 super(TestIpsecItf6MPLS, self).tearDown()
3674 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3676 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3678 MPLS(label=66, ttl=3)
3679 / IPv6(src=src, dst=dst)
3680 / UDP(sport=1166, dport=2233)
3681 / Raw(b"X" * payload_size)
3683 for i in range(count)
3686 def verify_encrypted6(self, p, sa, rxs):
3689 pkt = sa.decrypt(rx[IPv6])
3690 if not pkt.haslayer(IPv6):
3691 pkt = IP(pkt[Raw].load)
3692 self.assert_packet_checksums_valid(pkt)
3693 self.assert_equal(pkt[MPLS].label, 66)
3694 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3695 except (IndexError, AssertionError):
3696 self.logger.debug(ppp("Unexpected packet:", rx))
3698 self.logger.debug(ppp("Decrypted packet:", pkt))
3703 def test_tun_mpls_o_ip6(self):
3704 """IPSEC interface MPLS over IPv6"""
3707 p = self.ipv6_params
3710 tbl = VppMplsTable(self, 0)
3711 tbl.add_vpp_config()
3713 self.config_network(p)
3714 # deag MPLS routes from the tunnel
3719 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3720 eos_proto=f.FIB_PATH_NH_PROTO_IP6,
3725 p.tun_if.remote_ip6, p.tun_if.sw_if_index, labels=[VppMplsLabel(66)]
3729 p.tun_if.enable_mpls()
3731 self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3732 self.config_protect(p)
3734 self.verify_tun_66(p, count=n_pkts)
3737 p.tun_if.disable_mpls()
3738 self.unconfig_protect(p)
3740 self.unconfig_network(p)
3743 if __name__ == "__main__":
3744 unittest.main(testRunner=VppTestRunner)