5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw, bind_layers
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from framework import tag_fixme_vpp_workers
12 from framework import VppTestRunner
13 from template_ipsec import (
21 IpsecTun6HandoffTests,
22 IpsecTun4HandoffTests,
25 from vpp_gre_interface import VppGreInterface
26 from vpp_ipip_tun_interface import VppIpIpTunInterface
27 from vpp_ip_route import (
36 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
37 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
38 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
39 from vpp_teib import VppTeib
41 from vpp_papi import VppEnum
42 from vpp_papi_provider import CliFailedCommandError
43 from vpp_acl import AclRule, VppAcl, VppAclInterface
44 from vpp_policer import PolicerAction, VppPolicer, Dir
47 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
48 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
50 p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
52 crypt_key = mk_scapy_crypt_key(p)
54 p.tun_dst = tun_if.remote_ip
55 p.tun_src = tun_if.local_ip
61 is_default_port = p.nat_header.dport == 4500
63 is_default_port = True
66 outbound_nat_header = p.nat_header
68 outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
69 bind_layers(UDP, ESP, dport=p.nat_header.dport)
71 p.scapy_tun_sa = SecurityAssociation(
74 crypt_algo=p.crypt_algo,
76 auth_algo=p.auth_algo,
78 tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
79 nat_t_header=outbound_nat_header,
82 p.vpp_tun_sa = SecurityAssociation(
85 crypt_algo=p.crypt_algo,
87 auth_algo=p.auth_algo,
89 tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
90 nat_t_header=p.nat_header,
95 def config_tra_params(p, encryption_type, tun_if):
96 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
98 p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
100 crypt_key = mk_scapy_crypt_key(p)
101 p.tun_dst = tun_if.remote_ip
102 p.tun_src = tun_if.local_ip
105 is_default_port = p.nat_header.dport == 4500
107 is_default_port = True
110 outbound_nat_header = p.nat_header
112 outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
113 bind_layers(UDP, ESP, dport=p.nat_header.dport)
115 p.scapy_tun_sa = SecurityAssociation(
118 crypt_algo=p.crypt_algo,
120 auth_algo=p.auth_algo,
123 nat_t_header=outbound_nat_header,
125 p.vpp_tun_sa = SecurityAssociation(
128 crypt_algo=p.crypt_algo,
130 auth_algo=p.auth_algo,
133 nat_t_header=p.nat_header,
137 class TemplateIpsec4TunProtect(object):
138 """IPsec IPv4 Tunnel protect"""
140 encryption_type = ESP
141 tun4_encrypt_node_name = "esp4-encrypt-tun"
142 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
143 tun4_input_node = "ipsec4-tun-input"
145 def config_sa_tra(self, p):
146 config_tun_params(p, self.encryption_type, p.tun_if)
148 p.tun_sa_out = VppIpsecSA(
156 self.vpp_esp_protocol,
159 p.tun_sa_out.add_vpp_config()
161 p.tun_sa_in = VppIpsecSA(
169 self.vpp_esp_protocol,
172 p.tun_sa_in.add_vpp_config()
174 def config_sa_tun(self, p):
175 config_tun_params(p, self.encryption_type, p.tun_if)
177 p.tun_sa_out = VppIpsecSA(
185 self.vpp_esp_protocol,
186 self.tun_if.local_addr[p.addr_type],
187 self.tun_if.remote_addr[p.addr_type],
190 p.tun_sa_out.add_vpp_config()
192 p.tun_sa_in = VppIpsecSA(
200 self.vpp_esp_protocol,
201 self.tun_if.remote_addr[p.addr_type],
202 self.tun_if.local_addr[p.addr_type],
205 p.tun_sa_in.add_vpp_config()
207 def config_protect(self, p):
208 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
209 p.tun_protect.add_vpp_config()
211 def config_network(self, p):
212 if hasattr(p, "tun_dst"):
215 tun_dst = self.pg0.remote_ip4
216 p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip4, tun_dst)
217 p.tun_if.add_vpp_config()
219 p.tun_if.config_ip4()
220 p.tun_if.config_ip6()
222 p.route = VppIpRoute(
224 p.remote_tun_if_host,
226 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
228 p.route.add_vpp_config()
231 p.remote_tun_if_host6,
235 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
241 def unconfig_network(self, p):
242 p.route.remove_vpp_config()
243 p.tun_if.remove_vpp_config()
245 def unconfig_protect(self, p):
246 p.tun_protect.remove_vpp_config()
248 def unconfig_sa(self, p):
249 p.tun_sa_out.remove_vpp_config()
250 p.tun_sa_in.remove_vpp_config()
253 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec):
254 """IPsec tunnel interface tests"""
256 encryption_type = ESP
260 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
263 def tearDownClass(cls):
264 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
267 super(TemplateIpsec4TunIfEsp, self).setUp()
269 self.tun_if = self.pg0
273 self.config_network(p)
274 self.config_sa_tra(p)
275 self.config_protect(p)
278 super(TemplateIpsec4TunIfEsp, self).tearDown()
281 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect, TemplateIpsec):
282 """IPsec UDP tunnel interface tests"""
284 tun4_encrypt_node_name = "esp4-encrypt-tun"
285 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
286 encryption_type = ESP
290 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
293 def tearDownClass(cls):
294 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
296 def verify_encrypted(self, p, sa, rxs):
299 # ensure the UDP ports are correct before we decrypt
301 self.assertTrue(rx.haslayer(UDP))
302 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
303 self.assert_equal(rx[UDP].dport, p.nat_header.dport)
305 pkt = sa.decrypt(rx[IP])
306 if not pkt.haslayer(IP):
307 pkt = IP(pkt[Raw].load)
309 self.assert_packet_checksums_valid(pkt)
310 self.assert_equal(pkt[IP].dst, "1.1.1.1")
311 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
312 except (IndexError, AssertionError):
313 self.logger.debug(ppp("Unexpected packet:", rx))
315 self.logger.debug(ppp("Decrypted packet:", pkt))
320 def config_sa_tra(self, p):
321 config_tun_params(p, self.encryption_type, p.tun_if)
323 p.tun_sa_out = VppIpsecSA(
331 self.vpp_esp_protocol,
333 udp_src=p.nat_header.sport,
334 udp_dst=p.nat_header.dport,
336 p.tun_sa_out.add_vpp_config()
338 p.tun_sa_in = VppIpsecSA(
346 self.vpp_esp_protocol,
348 | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
349 udp_src=p.nat_header.sport,
350 udp_dst=p.nat_header.dport,
352 p.tun_sa_in.add_vpp_config()
355 super(TemplateIpsec4TunIfEspUdp, self).setUp()
358 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
359 p.nat_header = UDP(sport=5454, dport=4500)
361 self.tun_if = self.pg0
363 self.config_network(p)
364 self.config_sa_tra(p)
365 self.config_protect(p)
368 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
371 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
372 """Ipsec ESP - TUN tests"""
374 tun4_encrypt_node_name = "esp4-encrypt-tun"
375 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
377 def test_tun_basic64(self):
378 """ipsec 6o4 tunnel basic test"""
379 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
381 self.verify_tun_64(self.params[socket.AF_INET], count=1)
383 def test_tun_burst64(self):
384 """ipsec 6o4 tunnel basic test"""
385 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
387 self.verify_tun_64(self.params[socket.AF_INET], count=257)
389 def test_tun_basic_frag44(self):
390 """ipsec 4o4 tunnel frag basic test"""
391 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
395 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [1500, 0, 0, 0])
397 self.params[socket.AF_INET], count=1, payload_size=1800, n_rx=2
399 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [9000, 0, 0, 0])
402 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
403 """Ipsec ESP UDP tests"""
405 tun4_input_node = "ipsec4-tun-input"
408 super(TestIpsec4TunIfEspUdp, self).setUp()
410 def test_keepalive(self):
411 """IPSEC NAT Keepalive"""
412 self.verify_keepalive(self.ipv4_params)
415 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
416 """Ipsec ESP UDP GCM tests"""
418 tun4_input_node = "ipsec4-tun-input"
421 super(TestIpsec4TunIfEspUdpGCM, self).setUp()
423 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
424 p.crypt_algo_vpp_id = (
425 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
427 p.crypt_algo = "AES-GCM"
429 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
433 class TestIpsec4TunIfEspUdpUpdate(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
434 """Ipsec ESP UDP update tests"""
436 tun4_input_node = "ipsec4-tun-input"
439 super(TestIpsec4TunIfEspUdpUpdate, self).setUp()
441 p.nat_header = UDP(sport=6565, dport=7676)
442 config_tun_params(p, self.encryption_type, p.tun_if)
443 p.tun_sa_in.update_vpp_config(
444 udp_src=p.nat_header.dport, udp_dst=p.nat_header.sport
446 p.tun_sa_out.update_vpp_config(
447 udp_src=p.nat_header.sport, udp_dst=p.nat_header.dport
451 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
452 """Ipsec ESP - TCP tests"""
457 class TemplateIpsec6TunProtect(object):
458 """IPsec IPv6 Tunnel protect"""
460 def config_sa_tra(self, p):
461 config_tun_params(p, self.encryption_type, p.tun_if)
463 p.tun_sa_out = VppIpsecSA(
471 self.vpp_esp_protocol,
473 p.tun_sa_out.add_vpp_config()
475 p.tun_sa_in = VppIpsecSA(
483 self.vpp_esp_protocol,
485 p.tun_sa_in.add_vpp_config()
487 def config_sa_tun(self, p):
488 config_tun_params(p, self.encryption_type, p.tun_if)
490 p.tun_sa_out = VppIpsecSA(
498 self.vpp_esp_protocol,
499 self.tun_if.local_addr[p.addr_type],
500 self.tun_if.remote_addr[p.addr_type],
502 p.tun_sa_out.add_vpp_config()
504 p.tun_sa_in = VppIpsecSA(
512 self.vpp_esp_protocol,
513 self.tun_if.remote_addr[p.addr_type],
514 self.tun_if.local_addr[p.addr_type],
516 p.tun_sa_in.add_vpp_config()
518 def config_protect(self, p):
519 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
520 p.tun_protect.add_vpp_config()
522 def config_network(self, p):
523 if hasattr(p, "tun_dst"):
526 tun_dst = self.pg0.remote_ip6
527 p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip6, tun_dst)
528 p.tun_if.add_vpp_config()
530 p.tun_if.config_ip6()
531 p.tun_if.config_ip4()
533 p.route = VppIpRoute(
535 p.remote_tun_if_host,
539 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
543 p.route.add_vpp_config()
546 p.remote_tun_if_host4,
548 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
552 def unconfig_network(self, p):
553 p.route.remove_vpp_config()
554 p.tun_if.remove_vpp_config()
556 def unconfig_protect(self, p):
557 p.tun_protect.remove_vpp_config()
559 def unconfig_sa(self, p):
560 p.tun_sa_out.remove_vpp_config()
561 p.tun_sa_in.remove_vpp_config()
564 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec):
565 """IPsec tunnel interface tests"""
567 encryption_type = ESP
570 super(TemplateIpsec6TunIfEsp, self).setUp()
572 self.tun_if = self.pg0
575 self.config_network(p)
576 self.config_sa_tra(p)
577 self.config_protect(p)
580 super(TemplateIpsec6TunIfEsp, self).tearDown()
583 class TemplateIpsec6TunIfEspUdp(TemplateIpsec6TunProtect, TemplateIpsec):
584 """IPsec6 UDP tunnel interface tests"""
586 tun4_encrypt_node_name = "esp6-encrypt-tun"
587 tun4_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
588 encryption_type = ESP
592 super(TemplateIpsec6TunIfEspUdp, cls).setUpClass()
595 def tearDownClass(cls):
596 super(TemplateIpsec6TunIfEspUdp, cls).tearDownClass()
598 def verify_encrypted(self, p, sa, rxs):
601 # ensure the UDP ports are correct before we decrypt
603 self.assertTrue(rx.haslayer(UDP))
604 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
605 self.assert_equal(rx[UDP].dport, p.nat_header.dport)
607 pkt = sa.decrypt(rx[IP])
608 if not pkt.haslayer(IP):
609 pkt = IP(pkt[Raw].load)
611 self.assert_packet_checksums_valid(pkt)
613 pkt[IP].dst, "1111:1111:1111:1111:1111:1111:1111:1111"
615 self.assert_equal(pkt[IP].src, self.pg1.remote_ip6)
616 except (IndexError, AssertionError):
617 self.logger.debug(ppp("Unexpected packet:", rx))
619 self.logger.debug(ppp("Decrypted packet:", pkt))
624 def config_sa_tra(self, p):
625 config_tun_params(p, self.encryption_type, p.tun_if)
627 p.tun_sa_out = VppIpsecSA(
635 self.vpp_esp_protocol,
637 udp_src=p.nat_header.sport,
638 udp_dst=p.nat_header.dport,
640 p.tun_sa_out.add_vpp_config()
642 p.tun_sa_in = VppIpsecSA(
650 self.vpp_esp_protocol,
652 | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
653 udp_src=p.nat_header.sport,
654 udp_dst=p.nat_header.dport,
656 p.tun_sa_in.add_vpp_config()
659 super(TemplateIpsec6TunIfEspUdp, self).setUp()
662 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
663 p.nat_header = UDP(sport=5454, dport=4500)
665 self.tun_if = self.pg0
667 self.config_network(p)
668 self.config_sa_tra(p)
669 self.config_protect(p)
672 super(TemplateIpsec6TunIfEspUdp, self).tearDown()
675 class TestIpsec6TunIfEspUdp(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
676 """Ipsec ESP 6 UDP tests"""
678 tun6_input_node = "ipsec6-tun-input"
679 tun6_encrypt_node_name = "esp6-encrypt-tun"
680 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
683 super(TestIpsec6TunIfEspUdp, self).setUp()
685 def test_keepalive(self):
686 """IPSEC6 NAT Keepalive"""
687 self.verify_keepalive(self.ipv6_params)
690 class TestIpsec6TunIfEspUdpGCM(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
691 """Ipsec ESP 6 UDP GCM tests"""
693 tun6_input_node = "ipsec6-tun-input"
694 tun6_encrypt_node_name = "esp6-encrypt-tun"
695 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
698 super(TestIpsec6TunIfEspUdpGCM, self).setUp()
700 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
701 p.crypt_algo_vpp_id = (
702 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
704 p.crypt_algo = "AES-GCM"
706 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
710 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
711 """Ipsec ESP - TUN tests"""
713 tun6_encrypt_node_name = "esp6-encrypt-tun"
714 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
716 def test_tun_basic46(self):
717 """ipsec 4o6 tunnel basic test"""
718 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
719 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
721 def test_tun_burst46(self):
722 """ipsec 4o6 tunnel burst test"""
723 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
724 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
727 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp, IpsecTun6HandoffTests):
728 """Ipsec ESP 6 Handoff tests"""
730 tun6_encrypt_node_name = "esp6-encrypt-tun"
731 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
733 def test_tun_handoff_66_police(self):
734 """ESP 6o6 tunnel with policer worker hand-off test"""
735 self.vapi.cli("clear errors")
736 self.vapi.cli("clear ipsec sa")
739 p = self.params[socket.AF_INET6]
741 action_tx = PolicerAction(
742 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
744 policer = VppPolicer(
751 conform_action=action_tx,
752 exceed_action=action_tx,
753 violate_action=action_tx,
755 policer.add_vpp_config()
757 # Start policing on tun
758 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
760 for pol_bind in [1, 0]:
761 policer.bind_vpp_config(pol_bind, True)
763 # inject alternately on worker 0 and 1.
764 for worker in [0, 1, 0, 1]:
765 send_pkts = self.gen_encrypt_pkts6(
769 src=p.remote_tun_if_host,
770 dst=self.pg1.remote_ip6,
773 recv_pkts = self.send_and_expect(
774 self.tun_if, send_pkts, self.pg1, worker=worker
776 self.verify_decrypted6(p, recv_pkts)
777 self.logger.debug(self.vapi.cli("show trace max 100"))
779 stats = policer.get_stats()
780 stats0 = policer.get_stats(worker=0)
781 stats1 = policer.get_stats(worker=1)
784 # First pass: Worker 1, should have done all the policing
785 self.assertEqual(stats, stats1)
787 # Worker 0, should have handed everything off
788 self.assertEqual(stats0["conform_packets"], 0)
789 self.assertEqual(stats0["exceed_packets"], 0)
790 self.assertEqual(stats0["violate_packets"], 0)
792 # Second pass: both workers should have policed equal amounts
793 self.assertGreater(stats1["conform_packets"], 0)
794 self.assertEqual(stats1["exceed_packets"], 0)
795 self.assertGreater(stats1["violate_packets"], 0)
797 self.assertGreater(stats0["conform_packets"], 0)
798 self.assertEqual(stats0["exceed_packets"], 0)
799 self.assertGreater(stats0["violate_packets"], 0)
802 stats0["conform_packets"] + stats0["violate_packets"],
803 stats1["conform_packets"] + stats1["violate_packets"],
806 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
807 policer.remove_vpp_config()
810 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp, IpsecTun4HandoffTests):
811 """Ipsec ESP 4 Handoff tests"""
813 tun4_encrypt_node_name = "esp4-encrypt-tun"
814 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
816 def test_tun_handoff_44_police(self):
817 """ESP 4o4 tunnel with policer worker hand-off test"""
818 self.vapi.cli("clear errors")
819 self.vapi.cli("clear ipsec sa")
822 p = self.params[socket.AF_INET]
824 action_tx = PolicerAction(
825 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
827 policer = VppPolicer(
834 conform_action=action_tx,
835 exceed_action=action_tx,
836 violate_action=action_tx,
838 policer.add_vpp_config()
840 # Start policing on tun
841 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
843 for pol_bind in [1, 0]:
844 policer.bind_vpp_config(pol_bind, True)
846 # inject alternately on worker 0 and 1.
847 for worker in [0, 1, 0, 1]:
848 send_pkts = self.gen_encrypt_pkts(
852 src=p.remote_tun_if_host,
853 dst=self.pg1.remote_ip4,
856 recv_pkts = self.send_and_expect(
857 self.tun_if, send_pkts, self.pg1, worker=worker
859 self.verify_decrypted(p, recv_pkts)
860 self.logger.debug(self.vapi.cli("show trace max 100"))
862 stats = policer.get_stats()
863 stats0 = policer.get_stats(worker=0)
864 stats1 = policer.get_stats(worker=1)
867 # First pass: Worker 1, should have done all the policing
868 self.assertEqual(stats, stats1)
870 # Worker 0, should have handed everything off
871 self.assertEqual(stats0["conform_packets"], 0)
872 self.assertEqual(stats0["exceed_packets"], 0)
873 self.assertEqual(stats0["violate_packets"], 0)
875 # Second pass: both workers should have policed equal amounts
876 self.assertGreater(stats1["conform_packets"], 0)
877 self.assertEqual(stats1["exceed_packets"], 0)
878 self.assertGreater(stats1["violate_packets"], 0)
880 self.assertGreater(stats0["conform_packets"], 0)
881 self.assertEqual(stats0["exceed_packets"], 0)
882 self.assertGreater(stats0["violate_packets"], 0)
885 stats0["conform_packets"] + stats0["violate_packets"],
886 stats1["conform_packets"] + stats1["violate_packets"],
889 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
890 policer.remove_vpp_config()
893 @tag_fixme_vpp_workers
894 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
895 """IPsec IPv4 Multi Tunnel interface"""
897 encryption_type = ESP
898 tun4_encrypt_node_name = "esp4-encrypt-tun"
899 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
902 super(TestIpsec4MultiTunIfEsp, self).setUp()
904 self.tun_if = self.pg0
906 self.multi_params = []
907 self.pg0.generate_remote_hosts(10)
908 self.pg0.configure_ipv4_neighbors()
911 p = copy.copy(self.ipv4_params)
913 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
914 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
915 p.scapy_tun_spi = p.scapy_tun_spi + ii
916 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
917 p.vpp_tun_spi = p.vpp_tun_spi + ii
919 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
920 p.scapy_tra_spi = p.scapy_tra_spi + ii
921 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
922 p.vpp_tra_spi = p.vpp_tra_spi + ii
923 p.tun_dst = self.pg0.remote_hosts[ii].ip4
925 self.multi_params.append(p)
926 self.config_network(p)
927 self.config_sa_tra(p)
928 self.config_protect(p)
931 super(TestIpsec4MultiTunIfEsp, self).tearDown()
933 def test_tun_44(self):
934 """Multiple IPSEC tunnel interfaces"""
935 for p in self.multi_params:
936 self.verify_tun_44(p, count=127)
937 self.assertEqual(p.tun_if.get_rx_stats(), 127)
938 self.assertEqual(p.tun_if.get_tx_stats(), 127)
940 def test_tun_rr_44(self):
941 """Round-robin packets acrros multiple interface"""
943 for p in self.multi_params:
944 tx = tx + self.gen_encrypt_pkts(
948 src=p.remote_tun_if_host,
949 dst=self.pg1.remote_ip4,
951 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
953 for rx, p in zip(rxs, self.multi_params):
954 self.verify_decrypted(p, [rx])
957 for p in self.multi_params:
958 tx = tx + self.gen_pkts(
959 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host
961 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
963 for rx, p in zip(rxs, self.multi_params):
964 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
967 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
968 """IPsec IPv4 Tunnel interface all Algos"""
970 encryption_type = ESP
971 tun4_encrypt_node_name = "esp4-encrypt-tun"
972 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
975 super(TestIpsec4TunIfEspAll, self).setUp()
977 self.tun_if = self.pg0
980 self.config_network(p)
981 self.config_sa_tra(p)
982 self.config_protect(p)
986 self.unconfig_protect(p)
987 self.unconfig_network(p)
990 super(TestIpsec4TunIfEspAll, self).tearDown()
994 # change the key and the SPI
997 p.crypt_key = b"X" + p.crypt_key[1:]
999 p.scapy_tun_sa_id += 1
1001 p.vpp_tun_sa_id += 1
1002 p.tun_if.local_spi = p.vpp_tun_spi
1003 p.tun_if.remote_spi = p.scapy_tun_spi
1005 config_tun_params(p, self.encryption_type, p.tun_if)
1007 p.tun_sa_out = VppIpsecSA(
1013 p.crypt_algo_vpp_id,
1015 self.vpp_esp_protocol,
1019 p.tun_sa_in = VppIpsecSA(
1025 p.crypt_algo_vpp_id,
1027 self.vpp_esp_protocol,
1031 p.tun_sa_in.add_vpp_config()
1032 p.tun_sa_out.add_vpp_config()
1034 self.config_protect(p)
1035 np.tun_sa_out.remove_vpp_config()
1036 np.tun_sa_in.remove_vpp_config()
1037 self.logger.info(self.vapi.cli("sh ipsec sa"))
1039 def test_tun_44(self):
1040 """IPSEC tunnel all algos"""
1042 # foreach VPP crypto engine
1043 engines = ["ia32", "ipsecmb", "openssl"]
1045 # foreach crypto algorithm
1049 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_128
1052 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1054 "scapy-crypto": "AES-GCM",
1055 "scapy-integ": "NULL",
1056 "key": b"JPjyOWBeVEQiMe7h",
1061 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_192
1064 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1066 "scapy-crypto": "AES-GCM",
1067 "scapy-integ": "NULL",
1068 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1073 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
1076 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1078 "scapy-crypto": "AES-GCM",
1079 "scapy-integ": "NULL",
1080 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1085 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
1088 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1090 "scapy-crypto": "AES-CBC",
1091 "scapy-integ": "HMAC-SHA1-96",
1093 "key": b"JPjyOWBeVEQiMe7h",
1097 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_192
1100 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256
1102 "scapy-crypto": "AES-CBC",
1103 "scapy-integ": "SHA2-512-256",
1105 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1109 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_256
1112 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128
1114 "scapy-crypto": "AES-CBC",
1115 "scapy-integ": "SHA2-256-128",
1117 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1121 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1124 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1126 "scapy-crypto": "NULL",
1127 "scapy-integ": "HMAC-SHA1-96",
1129 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1133 for engine in engines:
1134 self.vapi.cli("set crypto handler all %s" % engine)
1137 # loop through each of the algorithms
1140 # with self.subTest(algo=algo['scapy']):
1142 p = self.ipv4_params
1143 p.auth_algo_vpp_id = algo["vpp-integ"]
1144 p.crypt_algo_vpp_id = algo["vpp-crypto"]
1145 p.crypt_algo = algo["scapy-crypto"]
1146 p.auth_algo = algo["scapy-integ"]
1147 p.crypt_key = algo["key"]
1148 p.salt = algo["salt"]
1154 self.verify_tun_44(p, count=127)
1157 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
1158 """IPsec IPv4 Tunnel interface no Algos"""
1160 encryption_type = ESP
1161 tun4_encrypt_node_name = "esp4-encrypt-tun"
1162 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1165 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
1167 self.tun_if = self.pg0
1168 p = self.ipv4_params
1169 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1170 p.auth_algo = "NULL"
1173 p.crypt_algo_vpp_id = (
1174 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1176 p.crypt_algo = "NULL"
1180 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
1182 def test_tun_44(self):
1183 """IPSec SA with NULL algos"""
1184 p = self.ipv4_params
1186 self.config_network(p)
1187 self.config_sa_tra(p)
1188 self.config_protect(p)
1190 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host)
1191 self.send_and_assert_no_replies(self.pg1, tx)
1193 self.unconfig_protect(p)
1195 self.unconfig_network(p)
1198 @tag_fixme_vpp_workers
1199 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec, IpsecTun6):
1200 """IPsec IPv6 Multi Tunnel interface"""
1202 encryption_type = ESP
1203 tun6_encrypt_node_name = "esp6-encrypt-tun"
1204 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1207 super(TestIpsec6MultiTunIfEsp, self).setUp()
1209 self.tun_if = self.pg0
1211 self.multi_params = []
1212 self.pg0.generate_remote_hosts(10)
1213 self.pg0.configure_ipv6_neighbors()
1215 for ii in range(10):
1216 p = copy.copy(self.ipv6_params)
1218 p.remote_tun_if_host = "1111::%d" % (ii + 1)
1219 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1220 p.scapy_tun_spi = p.scapy_tun_spi + ii
1221 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1222 p.vpp_tun_spi = p.vpp_tun_spi + ii
1224 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1225 p.scapy_tra_spi = p.scapy_tra_spi + ii
1226 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1227 p.vpp_tra_spi = p.vpp_tra_spi + ii
1228 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1230 self.multi_params.append(p)
1231 self.config_network(p)
1232 self.config_sa_tra(p)
1233 self.config_protect(p)
1236 super(TestIpsec6MultiTunIfEsp, self).tearDown()
1238 def test_tun_66(self):
1239 """Multiple IPSEC tunnel interfaces"""
1240 for p in self.multi_params:
1241 self.verify_tun_66(p, count=127)
1242 self.assertEqual(p.tun_if.get_rx_stats(), 127)
1243 self.assertEqual(p.tun_if.get_tx_stats(), 127)
1246 class TestIpsecGreTebIfEsp(TemplateIpsec, IpsecTun4Tests):
1247 """Ipsec GRE TEB ESP - TUN tests"""
1249 tun4_encrypt_node_name = "esp4-encrypt-tun"
1250 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1251 encryption_type = ESP
1252 omac = "00:11:22:33:44:55"
1254 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1256 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1258 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1260 / Ether(dst=self.omac)
1261 / IP(src="1.1.1.1", dst="1.1.1.2")
1262 / UDP(sport=1144, dport=2233)
1263 / Raw(b"X" * payload_size)
1265 for i in range(count)
1268 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1270 Ether(dst=self.omac)
1271 / IP(src="1.1.1.1", dst="1.1.1.2")
1272 / UDP(sport=1144, dport=2233)
1273 / Raw(b"X" * payload_size)
1274 for i in range(count)
1277 def verify_decrypted(self, p, rxs):
1279 self.assert_equal(rx[Ether].dst, self.omac)
1280 self.assert_equal(rx[IP].dst, "1.1.1.2")
1282 def verify_encrypted(self, p, sa, rxs):
1285 pkt = sa.decrypt(rx[IP])
1286 if not pkt.haslayer(IP):
1287 pkt = IP(pkt[Raw].load)
1288 self.assert_packet_checksums_valid(pkt)
1289 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1290 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1291 self.assertTrue(pkt.haslayer(GRE))
1293 self.assertEqual(e[Ether].dst, self.omac)
1294 self.assertEqual(e[IP].dst, "1.1.1.2")
1295 except (IndexError, AssertionError):
1296 self.logger.debug(ppp("Unexpected packet:", rx))
1298 self.logger.debug(ppp("Decrypted packet:", pkt))
1304 super(TestIpsecGreTebIfEsp, self).setUp()
1306 self.tun_if = self.pg0
1308 p = self.ipv4_params
1310 bd1 = VppBridgeDomain(self, 1)
1311 bd1.add_vpp_config()
1313 p.tun_sa_out = VppIpsecSA(
1319 p.crypt_algo_vpp_id,
1321 self.vpp_esp_protocol,
1323 self.pg0.remote_ip4,
1325 p.tun_sa_out.add_vpp_config()
1327 p.tun_sa_in = VppIpsecSA(
1333 p.crypt_algo_vpp_id,
1335 self.vpp_esp_protocol,
1336 self.pg0.remote_ip4,
1339 p.tun_sa_in.add_vpp_config()
1341 p.tun_if = VppGreInterface(
1344 self.pg0.remote_ip4,
1345 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1347 p.tun_if.add_vpp_config()
1349 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1351 p.tun_protect.add_vpp_config()
1354 p.tun_if.config_ip4()
1355 config_tun_params(p, self.encryption_type, p.tun_if)
1357 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1358 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1360 self.vapi.cli("clear ipsec sa")
1361 self.vapi.cli("sh adj")
1362 self.vapi.cli("sh ipsec tun")
1365 p = self.ipv4_params
1366 p.tun_if.unconfig_ip4()
1367 super(TestIpsecGreTebIfEsp, self).tearDown()
1370 class TestIpsecGreTebVlanIfEsp(TemplateIpsec, IpsecTun4Tests):
1371 """Ipsec GRE TEB ESP - TUN tests"""
1373 tun4_encrypt_node_name = "esp4-encrypt-tun"
1374 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1375 encryption_type = ESP
1376 omac = "00:11:22:33:44:55"
1378 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1380 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1382 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1384 / Ether(dst=self.omac)
1385 / IP(src="1.1.1.1", dst="1.1.1.2")
1386 / UDP(sport=1144, dport=2233)
1387 / Raw(b"X" * payload_size)
1389 for i in range(count)
1392 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1394 Ether(dst=self.omac)
1396 / IP(src="1.1.1.1", dst="1.1.1.2")
1397 / UDP(sport=1144, dport=2233)
1398 / Raw(b"X" * payload_size)
1399 for i in range(count)
1402 def verify_decrypted(self, p, rxs):
1404 self.assert_equal(rx[Ether].dst, self.omac)
1405 self.assert_equal(rx[Dot1Q].vlan, 11)
1406 self.assert_equal(rx[IP].dst, "1.1.1.2")
1408 def verify_encrypted(self, p, sa, rxs):
1411 pkt = sa.decrypt(rx[IP])
1412 if not pkt.haslayer(IP):
1413 pkt = IP(pkt[Raw].load)
1414 self.assert_packet_checksums_valid(pkt)
1415 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1416 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1417 self.assertTrue(pkt.haslayer(GRE))
1419 self.assertEqual(e[Ether].dst, self.omac)
1420 self.assertFalse(e.haslayer(Dot1Q))
1421 self.assertEqual(e[IP].dst, "1.1.1.2")
1422 except (IndexError, AssertionError):
1423 self.logger.debug(ppp("Unexpected packet:", rx))
1425 self.logger.debug(ppp("Decrypted packet:", pkt))
1431 super(TestIpsecGreTebVlanIfEsp, self).setUp()
1433 self.tun_if = self.pg0
1435 p = self.ipv4_params
1437 bd1 = VppBridgeDomain(self, 1)
1438 bd1.add_vpp_config()
1440 self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1441 self.vapi.l2_interface_vlan_tag_rewrite(
1442 sw_if_index=self.pg1_11.sw_if_index,
1443 vtr_op=L2_VTR_OP.L2_POP_1,
1446 self.pg1_11.admin_up()
1448 p.tun_sa_out = VppIpsecSA(
1454 p.crypt_algo_vpp_id,
1456 self.vpp_esp_protocol,
1458 self.pg0.remote_ip4,
1460 p.tun_sa_out.add_vpp_config()
1462 p.tun_sa_in = VppIpsecSA(
1468 p.crypt_algo_vpp_id,
1470 self.vpp_esp_protocol,
1471 self.pg0.remote_ip4,
1474 p.tun_sa_in.add_vpp_config()
1476 p.tun_if = VppGreInterface(
1479 self.pg0.remote_ip4,
1480 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1482 p.tun_if.add_vpp_config()
1484 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1486 p.tun_protect.add_vpp_config()
1489 p.tun_if.config_ip4()
1490 config_tun_params(p, self.encryption_type, p.tun_if)
1492 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1493 VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1495 self.vapi.cli("clear ipsec sa")
1498 p = self.ipv4_params
1499 p.tun_if.unconfig_ip4()
1500 super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1501 self.pg1_11.admin_down()
1502 self.pg1_11.remove_vpp_config()
1505 class TestIpsecGreTebIfEspTra(TemplateIpsec, IpsecTun4Tests):
1506 """Ipsec GRE TEB ESP - Tra tests"""
1508 tun4_encrypt_node_name = "esp4-encrypt-tun"
1509 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1510 encryption_type = ESP
1511 omac = "00:11:22:33:44:55"
1513 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1515 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1517 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1519 / Ether(dst=self.omac)
1520 / IP(src="1.1.1.1", dst="1.1.1.2")
1521 / UDP(sport=1144, dport=2233)
1522 / Raw(b"X" * payload_size)
1524 for i in range(count)
1527 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1529 Ether(dst=self.omac)
1530 / IP(src="1.1.1.1", dst="1.1.1.2")
1531 / UDP(sport=1144, dport=2233)
1532 / Raw(b"X" * payload_size)
1533 for i in range(count)
1536 def verify_decrypted(self, p, rxs):
1538 self.assert_equal(rx[Ether].dst, self.omac)
1539 self.assert_equal(rx[IP].dst, "1.1.1.2")
1541 def verify_encrypted(self, p, sa, rxs):
1544 pkt = sa.decrypt(rx[IP])
1545 if not pkt.haslayer(IP):
1546 pkt = IP(pkt[Raw].load)
1547 self.assert_packet_checksums_valid(pkt)
1548 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1549 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1550 self.assertTrue(pkt.haslayer(GRE))
1552 self.assertEqual(e[Ether].dst, self.omac)
1553 self.assertEqual(e[IP].dst, "1.1.1.2")
1554 except (IndexError, AssertionError):
1555 self.logger.debug(ppp("Unexpected packet:", rx))
1557 self.logger.debug(ppp("Decrypted packet:", pkt))
1563 super(TestIpsecGreTebIfEspTra, self).setUp()
1565 self.tun_if = self.pg0
1567 p = self.ipv4_params
1569 bd1 = VppBridgeDomain(self, 1)
1570 bd1.add_vpp_config()
1572 p.tun_sa_out = VppIpsecSA(
1578 p.crypt_algo_vpp_id,
1580 self.vpp_esp_protocol,
1582 p.tun_sa_out.add_vpp_config()
1584 p.tun_sa_in = VppIpsecSA(
1590 p.crypt_algo_vpp_id,
1592 self.vpp_esp_protocol,
1594 p.tun_sa_in.add_vpp_config()
1596 p.tun_if = VppGreInterface(
1599 self.pg0.remote_ip4,
1600 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1602 p.tun_if.add_vpp_config()
1604 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1606 p.tun_protect.add_vpp_config()
1609 p.tun_if.config_ip4()
1610 config_tra_params(p, self.encryption_type, p.tun_if)
1612 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1613 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1615 self.vapi.cli("clear ipsec sa")
1618 p = self.ipv4_params
1619 p.tun_if.unconfig_ip4()
1620 super(TestIpsecGreTebIfEspTra, self).tearDown()
1623 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec, IpsecTun4Tests):
1624 """Ipsec GRE TEB UDP ESP - Tra tests"""
1626 tun4_encrypt_node_name = "esp4-encrypt-tun"
1627 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1628 encryption_type = ESP
1629 omac = "00:11:22:33:44:55"
1631 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1633 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1635 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1637 / Ether(dst=self.omac)
1638 / IP(src="1.1.1.1", dst="1.1.1.2")
1639 / UDP(sport=1144, dport=2233)
1640 / Raw(b"X" * payload_size)
1642 for i in range(count)
1645 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1647 Ether(dst=self.omac)
1648 / IP(src="1.1.1.1", dst="1.1.1.2")
1649 / UDP(sport=1144, dport=2233)
1650 / Raw(b"X" * payload_size)
1651 for i in range(count)
1654 def verify_decrypted(self, p, rxs):
1656 self.assert_equal(rx[Ether].dst, self.omac)
1657 self.assert_equal(rx[IP].dst, "1.1.1.2")
1659 def verify_encrypted(self, p, sa, rxs):
1661 self.assertTrue(rx.haslayer(UDP))
1662 self.assertEqual(rx[UDP].dport, 4545)
1663 self.assertEqual(rx[UDP].sport, 5454)
1665 pkt = sa.decrypt(rx[IP])
1666 if not pkt.haslayer(IP):
1667 pkt = IP(pkt[Raw].load)
1668 self.assert_packet_checksums_valid(pkt)
1669 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1670 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1671 self.assertTrue(pkt.haslayer(GRE))
1673 self.assertEqual(e[Ether].dst, self.omac)
1674 self.assertEqual(e[IP].dst, "1.1.1.2")
1675 except (IndexError, AssertionError):
1676 self.logger.debug(ppp("Unexpected packet:", rx))
1678 self.logger.debug(ppp("Decrypted packet:", pkt))
1684 super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1686 self.tun_if = self.pg0
1688 p = self.ipv4_params
1689 p = self.ipv4_params
1690 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
1691 p.nat_header = UDP(sport=5454, dport=4545)
1693 bd1 = VppBridgeDomain(self, 1)
1694 bd1.add_vpp_config()
1696 p.tun_sa_out = VppIpsecSA(
1702 p.crypt_algo_vpp_id,
1704 self.vpp_esp_protocol,
1709 p.tun_sa_out.add_vpp_config()
1711 p.tun_sa_in = VppIpsecSA(
1717 p.crypt_algo_vpp_id,
1719 self.vpp_esp_protocol,
1721 p.flags | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND
1726 p.tun_sa_in.add_vpp_config()
1728 p.tun_if = VppGreInterface(
1731 self.pg0.remote_ip4,
1732 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1734 p.tun_if.add_vpp_config()
1736 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1738 p.tun_protect.add_vpp_config()
1741 p.tun_if.config_ip4()
1742 config_tra_params(p, self.encryption_type, p.tun_if)
1744 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1745 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1747 self.vapi.cli("clear ipsec sa")
1748 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1751 p = self.ipv4_params
1752 p.tun_if.unconfig_ip4()
1753 super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1756 class TestIpsecGreIfEsp(TemplateIpsec, IpsecTun4Tests):
1757 """Ipsec GRE ESP - TUN tests"""
1759 tun4_encrypt_node_name = "esp4-encrypt-tun"
1760 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1761 encryption_type = ESP
1763 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1765 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1767 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1769 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1770 / UDP(sport=1144, dport=2233)
1771 / Raw(b"X" * payload_size)
1773 for i in range(count)
1776 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1778 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1779 / IP(src="1.1.1.1", dst="1.1.1.2")
1780 / UDP(sport=1144, dport=2233)
1781 / Raw(b"X" * payload_size)
1782 for i in range(count)
1785 def verify_decrypted(self, p, rxs):
1787 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1788 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1790 def verify_encrypted(self, p, sa, rxs):
1793 pkt = sa.decrypt(rx[IP])
1794 if not pkt.haslayer(IP):
1795 pkt = IP(pkt[Raw].load)
1796 self.assert_packet_checksums_valid(pkt)
1797 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1798 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1799 self.assertTrue(pkt.haslayer(GRE))
1801 self.assertEqual(e[IP].dst, "1.1.1.2")
1802 except (IndexError, AssertionError):
1803 self.logger.debug(ppp("Unexpected packet:", rx))
1805 self.logger.debug(ppp("Decrypted packet:", pkt))
1811 super(TestIpsecGreIfEsp, self).setUp()
1813 self.tun_if = self.pg0
1815 p = self.ipv4_params
1817 bd1 = VppBridgeDomain(self, 1)
1818 bd1.add_vpp_config()
1820 p.tun_sa_out = VppIpsecSA(
1826 p.crypt_algo_vpp_id,
1828 self.vpp_esp_protocol,
1830 self.pg0.remote_ip4,
1832 p.tun_sa_out.add_vpp_config()
1834 p.tun_sa_in = VppIpsecSA(
1840 p.crypt_algo_vpp_id,
1842 self.vpp_esp_protocol,
1843 self.pg0.remote_ip4,
1846 p.tun_sa_in.add_vpp_config()
1848 p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1849 p.tun_if.add_vpp_config()
1851 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1852 p.tun_protect.add_vpp_config()
1855 p.tun_if.config_ip4()
1856 config_tun_params(p, self.encryption_type, p.tun_if)
1859 self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1863 p = self.ipv4_params
1864 p.tun_if.unconfig_ip4()
1865 super(TestIpsecGreIfEsp, self).tearDown()
1868 class TestIpsecGreIfEspTra(TemplateIpsec, IpsecTun4Tests):
1869 """Ipsec GRE ESP - TRA tests"""
1871 tun4_encrypt_node_name = "esp4-encrypt-tun"
1872 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1873 encryption_type = ESP
1875 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1877 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1879 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1881 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1882 / UDP(sport=1144, dport=2233)
1883 / Raw(b"X" * payload_size)
1885 for i in range(count)
1888 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1, payload_size=100):
1890 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1892 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1894 / UDP(sport=1144, dport=2233)
1895 / Raw(b"X" * payload_size)
1897 for i in range(count)
1900 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1902 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1903 / IP(src="1.1.1.1", dst="1.1.1.2")
1904 / UDP(sport=1144, dport=2233)
1905 / Raw(b"X" * payload_size)
1906 for i in range(count)
1909 def verify_decrypted(self, p, rxs):
1911 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1912 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1914 def verify_encrypted(self, p, sa, rxs):
1917 pkt = sa.decrypt(rx[IP])
1918 if not pkt.haslayer(IP):
1919 pkt = IP(pkt[Raw].load)
1920 self.assert_packet_checksums_valid(pkt)
1921 self.assertTrue(pkt.haslayer(GRE))
1923 self.assertEqual(e[IP].dst, "1.1.1.2")
1924 except (IndexError, AssertionError):
1925 self.logger.debug(ppp("Unexpected packet:", rx))
1927 self.logger.debug(ppp("Decrypted packet:", pkt))
1933 super(TestIpsecGreIfEspTra, self).setUp()
1935 self.tun_if = self.pg0
1937 p = self.ipv4_params
1939 p.tun_sa_out = VppIpsecSA(
1945 p.crypt_algo_vpp_id,
1947 self.vpp_esp_protocol,
1949 p.tun_sa_out.add_vpp_config()
1951 p.tun_sa_in = VppIpsecSA(
1957 p.crypt_algo_vpp_id,
1959 self.vpp_esp_protocol,
1961 p.tun_sa_in.add_vpp_config()
1963 p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1964 p.tun_if.add_vpp_config()
1966 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1967 p.tun_protect.add_vpp_config()
1970 p.tun_if.config_ip4()
1971 config_tra_params(p, self.encryption_type, p.tun_if)
1974 self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1978 p = self.ipv4_params
1979 p.tun_if.unconfig_ip4()
1980 super(TestIpsecGreIfEspTra, self).tearDown()
1982 def test_gre_non_ip(self):
1983 p = self.ipv4_params
1984 tx = self.gen_encrypt_non_ip_pkts(
1987 src=p.remote_tun_if_host,
1988 dst=self.pg1.remote_ip6,
1990 self.send_and_assert_no_replies(self.tun_if, tx)
1991 node_name = "/err/%s/unsup_payload" % self.tun4_decrypt_node_name[0]
1992 self.assertEqual(1, self.statistics.get_err_counter(node_name))
1995 class TestIpsecGre6IfEspTra(TemplateIpsec, IpsecTun6Tests):
1996 """Ipsec GRE ESP - TRA tests"""
1998 tun6_encrypt_node_name = "esp6-encrypt-tun"
1999 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2000 encryption_type = ESP
2002 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2004 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2006 IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
2008 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2009 / UDP(sport=1144, dport=2233)
2010 / Raw(b"X" * payload_size)
2012 for i in range(count)
2015 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2017 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2018 / IPv6(src="1::1", dst="1::2")
2019 / UDP(sport=1144, dport=2233)
2020 / Raw(b"X" * payload_size)
2021 for i in range(count)
2024 def verify_decrypted6(self, p, rxs):
2026 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2027 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2029 def verify_encrypted6(self, p, sa, rxs):
2032 pkt = sa.decrypt(rx[IPv6])
2033 if not pkt.haslayer(IPv6):
2034 pkt = IPv6(pkt[Raw].load)
2035 self.assert_packet_checksums_valid(pkt)
2036 self.assertTrue(pkt.haslayer(GRE))
2038 self.assertEqual(e[IPv6].dst, "1::2")
2039 except (IndexError, AssertionError):
2040 self.logger.debug(ppp("Unexpected packet:", rx))
2042 self.logger.debug(ppp("Decrypted packet:", pkt))
2048 super(TestIpsecGre6IfEspTra, self).setUp()
2050 self.tun_if = self.pg0
2052 p = self.ipv6_params
2054 bd1 = VppBridgeDomain(self, 1)
2055 bd1.add_vpp_config()
2057 p.tun_sa_out = VppIpsecSA(
2063 p.crypt_algo_vpp_id,
2065 self.vpp_esp_protocol,
2067 p.tun_sa_out.add_vpp_config()
2069 p.tun_sa_in = VppIpsecSA(
2075 p.crypt_algo_vpp_id,
2077 self.vpp_esp_protocol,
2079 p.tun_sa_in.add_vpp_config()
2081 p.tun_if = VppGreInterface(self, self.pg0.local_ip6, self.pg0.remote_ip6)
2082 p.tun_if.add_vpp_config()
2084 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2085 p.tun_protect.add_vpp_config()
2088 p.tun_if.config_ip6()
2089 config_tra_params(p, self.encryption_type, p.tun_if)
2097 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
2104 p = self.ipv6_params
2105 p.tun_if.unconfig_ip6()
2106 super(TestIpsecGre6IfEspTra, self).tearDown()
2109 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
2110 """Ipsec mGRE ESP v4 TRA tests"""
2112 tun4_encrypt_node_name = "esp4-encrypt-tun"
2113 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2114 encryption_type = ESP
2116 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2118 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2120 IP(src=p.tun_dst, dst=self.pg0.local_ip4)
2122 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
2123 / UDP(sport=1144, dport=2233)
2124 / Raw(b"X" * payload_size)
2126 for i in range(count)
2129 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2131 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2132 / IP(src="1.1.1.1", dst=dst)
2133 / UDP(sport=1144, dport=2233)
2134 / Raw(b"X" * payload_size)
2135 for i in range(count)
2138 def verify_decrypted(self, p, rxs):
2140 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2141 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2143 def verify_encrypted(self, p, sa, rxs):
2146 pkt = sa.decrypt(rx[IP])
2147 if not pkt.haslayer(IP):
2148 pkt = IP(pkt[Raw].load)
2149 self.assert_packet_checksums_valid(pkt)
2150 self.assertTrue(pkt.haslayer(GRE))
2152 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2153 except (IndexError, AssertionError):
2154 self.logger.debug(ppp("Unexpected packet:", rx))
2156 self.logger.debug(ppp("Decrypted packet:", pkt))
2162 super(TestIpsecMGreIfEspTra4, self).setUp()
2165 self.tun_if = self.pg0
2166 p = self.ipv4_params
2167 p.tun_if = VppGreInterface(
2171 mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2173 p.tun_if.add_vpp_config()
2175 p.tun_if.config_ip4()
2176 p.tun_if.generate_remote_hosts(N_NHS)
2177 self.pg0.generate_remote_hosts(N_NHS)
2178 self.pg0.configure_ipv4_neighbors()
2180 # setup some SAs for several next-hops on the interface
2181 self.multi_params = []
2183 for ii in range(N_NHS):
2184 p = copy.copy(self.ipv4_params)
2186 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2187 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2188 p.scapy_tun_spi = p.scapy_tun_spi + ii
2189 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2190 p.vpp_tun_spi = p.vpp_tun_spi + ii
2192 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2193 p.scapy_tra_spi = p.scapy_tra_spi + ii
2194 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2195 p.vpp_tra_spi = p.vpp_tra_spi + ii
2196 p.tun_sa_out = VppIpsecSA(
2202 p.crypt_algo_vpp_id,
2204 self.vpp_esp_protocol,
2206 p.tun_sa_out.add_vpp_config()
2208 p.tun_sa_in = VppIpsecSA(
2214 p.crypt_algo_vpp_id,
2216 self.vpp_esp_protocol,
2218 p.tun_sa_in.add_vpp_config()
2220 p.tun_protect = VppIpsecTunProtect(
2225 nh=p.tun_if.remote_hosts[ii].ip4,
2227 p.tun_protect.add_vpp_config()
2228 config_tra_params(p, self.encryption_type, p.tun_if)
2229 self.multi_params.append(p)
2233 p.remote_tun_if_host,
2235 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
2238 # in this v4 variant add the teibs after the protect
2242 p.tun_if.remote_hosts[ii].ip4,
2243 self.pg0.remote_hosts[ii].ip4,
2245 p.tun_dst = self.pg0.remote_hosts[ii].ip4
2246 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2249 p = self.ipv4_params
2250 p.tun_if.unconfig_ip4()
2251 super(TestIpsecMGreIfEspTra4, self).tearDown()
2253 def test_tun_44(self):
2256 for p in self.multi_params:
2257 self.verify_tun_44(p, count=N_PKTS)
2258 p.teib.remove_vpp_config()
2259 self.verify_tun_dropped_44(p, count=N_PKTS)
2260 p.teib.add_vpp_config()
2261 self.verify_tun_44(p, count=N_PKTS)
2264 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
2265 """Ipsec mGRE ESP v6 TRA tests"""
2267 tun6_encrypt_node_name = "esp6-encrypt-tun"
2268 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2269 encryption_type = ESP
2271 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2273 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2275 IPv6(src=p.tun_dst, dst=self.pg0.local_ip6)
2277 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2278 / UDP(sport=1144, dport=2233)
2279 / Raw(b"X" * payload_size)
2281 for i in range(count)
2284 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2286 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2287 / IPv6(src="1::1", dst=dst)
2288 / UDP(sport=1144, dport=2233)
2289 / Raw(b"X" * payload_size)
2290 for i in range(count)
2293 def verify_decrypted6(self, p, rxs):
2295 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2296 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2298 def verify_encrypted6(self, p, sa, rxs):
2301 pkt = sa.decrypt(rx[IPv6])
2302 if not pkt.haslayer(IPv6):
2303 pkt = IPv6(pkt[Raw].load)
2304 self.assert_packet_checksums_valid(pkt)
2305 self.assertTrue(pkt.haslayer(GRE))
2307 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
2308 except (IndexError, AssertionError):
2309 self.logger.debug(ppp("Unexpected packet:", rx))
2311 self.logger.debug(ppp("Decrypted packet:", pkt))
2317 super(TestIpsecMGreIfEspTra6, self).setUp()
2319 self.vapi.cli("set logging class ipsec level debug")
2322 self.tun_if = self.pg0
2323 p = self.ipv6_params
2324 p.tun_if = VppGreInterface(
2328 mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2330 p.tun_if.add_vpp_config()
2332 p.tun_if.config_ip6()
2333 p.tun_if.generate_remote_hosts(N_NHS)
2334 self.pg0.generate_remote_hosts(N_NHS)
2335 self.pg0.configure_ipv6_neighbors()
2337 # setup some SAs for several next-hops on the interface
2338 self.multi_params = []
2340 for ii in range(N_NHS):
2341 p = copy.copy(self.ipv6_params)
2343 p.remote_tun_if_host = "1::%d" % (ii + 1)
2344 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2345 p.scapy_tun_spi = p.scapy_tun_spi + ii
2346 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2347 p.vpp_tun_spi = p.vpp_tun_spi + ii
2349 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2350 p.scapy_tra_spi = p.scapy_tra_spi + ii
2351 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2352 p.vpp_tra_spi = p.vpp_tra_spi + ii
2353 p.tun_sa_out = VppIpsecSA(
2359 p.crypt_algo_vpp_id,
2361 self.vpp_esp_protocol,
2363 p.tun_sa_out.add_vpp_config()
2365 p.tun_sa_in = VppIpsecSA(
2371 p.crypt_algo_vpp_id,
2373 self.vpp_esp_protocol,
2375 p.tun_sa_in.add_vpp_config()
2377 # in this v6 variant add the teibs first then the protection
2378 p.tun_dst = self.pg0.remote_hosts[ii].ip6
2380 self, p.tun_if, p.tun_if.remote_hosts[ii].ip6, p.tun_dst
2383 p.tun_protect = VppIpsecTunProtect(
2388 nh=p.tun_if.remote_hosts[ii].ip6,
2390 p.tun_protect.add_vpp_config()
2391 config_tra_params(p, self.encryption_type, p.tun_if)
2392 self.multi_params.append(p)
2396 p.remote_tun_if_host,
2398 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6, p.tun_if.sw_if_index)],
2400 p.tun_dst = self.pg0.remote_hosts[ii].ip6
2402 self.logger.info(self.vapi.cli("sh log"))
2403 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2404 self.logger.info(self.vapi.cli("sh adj 41"))
2407 p = self.ipv6_params
2408 p.tun_if.unconfig_ip6()
2409 super(TestIpsecMGreIfEspTra6, self).tearDown()
2411 def test_tun_66(self):
2413 for p in self.multi_params:
2414 self.verify_tun_66(p, count=63)
2417 @tag_fixme_vpp_workers
2418 class TestIpsec4TunProtect(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2419 """IPsec IPv4 Tunnel protect - transport mode"""
2422 super(TestIpsec4TunProtect, self).setUp()
2424 self.tun_if = self.pg0
2427 super(TestIpsec4TunProtect, self).tearDown()
2429 def test_tun_44(self):
2430 """IPSEC tunnel protect"""
2432 p = self.ipv4_params
2434 self.config_network(p)
2435 self.config_sa_tra(p)
2436 self.config_protect(p)
2438 self.verify_tun_44(p, count=127)
2439 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2440 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2442 self.vapi.cli("clear ipsec sa")
2443 self.verify_tun_64(p, count=127)
2444 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2445 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2447 # rekey - create new SAs and update the tunnel protection
2449 np.crypt_key = b"X" + p.crypt_key[1:]
2450 np.scapy_tun_spi += 100
2451 np.scapy_tun_sa_id += 1
2452 np.vpp_tun_spi += 100
2453 np.vpp_tun_sa_id += 1
2454 np.tun_if.local_spi = p.vpp_tun_spi
2455 np.tun_if.remote_spi = p.scapy_tun_spi
2457 self.config_sa_tra(np)
2458 self.config_protect(np)
2461 self.verify_tun_44(np, count=127)
2462 self.assertEqual(p.tun_if.get_rx_stats(), 381)
2463 self.assertEqual(p.tun_if.get_tx_stats(), 381)
2466 self.unconfig_protect(np)
2467 self.unconfig_sa(np)
2468 self.unconfig_network(p)
2471 @tag_fixme_vpp_workers
2472 class TestIpsec4TunProtectUdp(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2473 """IPsec IPv4 Tunnel protect - transport mode"""
2476 super(TestIpsec4TunProtectUdp, self).setUp()
2478 self.tun_if = self.pg0
2480 p = self.ipv4_params
2481 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
2482 p.nat_header = UDP(sport=4500, dport=4500)
2483 self.config_network(p)
2484 self.config_sa_tra(p)
2485 self.config_protect(p)
2488 p = self.ipv4_params
2489 self.unconfig_protect(p)
2491 self.unconfig_network(p)
2492 super(TestIpsec4TunProtectUdp, self).tearDown()
2494 def verify_encrypted(self, p, sa, rxs):
2495 # ensure encrypted packets are recieved with the default UDP ports
2497 self.assertEqual(rx[UDP].sport, 4500)
2498 self.assertEqual(rx[UDP].dport, 4500)
2499 super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2501 def test_tun_44(self):
2502 """IPSEC UDP tunnel protect"""
2504 p = self.ipv4_params
2506 self.verify_tun_44(p, count=127)
2507 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2508 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2510 def test_keepalive(self):
2511 """IPSEC NAT Keepalive"""
2512 self.verify_keepalive(self.ipv4_params)
2515 @tag_fixme_vpp_workers
2516 class TestIpsec4TunProtectTun(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2517 """IPsec IPv4 Tunnel protect - tunnel mode"""
2519 encryption_type = ESP
2520 tun4_encrypt_node_name = "esp4-encrypt-tun"
2521 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2524 super(TestIpsec4TunProtectTun, self).setUp()
2526 self.tun_if = self.pg0
2529 super(TestIpsec4TunProtectTun, self).tearDown()
2531 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2533 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2535 IP(src=sw_intf.remote_ip4, dst=sw_intf.local_ip4)
2536 / IP(src=src, dst=dst)
2537 / UDP(sport=1144, dport=2233)
2538 / Raw(b"X" * payload_size)
2540 for i in range(count)
2543 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2545 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2546 / IP(src=src, dst=dst)
2547 / UDP(sport=1144, dport=2233)
2548 / Raw(b"X" * payload_size)
2549 for i in range(count)
2552 def verify_decrypted(self, p, rxs):
2554 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2555 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2556 self.assert_packet_checksums_valid(rx)
2558 def verify_encrypted(self, p, sa, rxs):
2561 pkt = sa.decrypt(rx[IP])
2562 if not pkt.haslayer(IP):
2563 pkt = IP(pkt[Raw].load)
2564 self.assert_packet_checksums_valid(pkt)
2565 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2566 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2567 inner = pkt[IP].payload
2568 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2570 except (IndexError, AssertionError):
2571 self.logger.debug(ppp("Unexpected packet:", rx))
2573 self.logger.debug(ppp("Decrypted packet:", pkt))
2578 def test_tun_44(self):
2579 """IPSEC tunnel protect"""
2581 p = self.ipv4_params
2583 self.config_network(p)
2584 self.config_sa_tun(p)
2585 self.config_protect(p)
2587 # also add an output features on the tunnel and physical interface
2588 # so we test they still work
2589 r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
2590 a = VppAcl(self, [r_all]).add_vpp_config()
2592 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2593 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2595 self.verify_tun_44(p, count=127)
2597 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2598 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2600 # rekey - create new SAs and update the tunnel protection
2602 np.crypt_key = b"X" + p.crypt_key[1:]
2603 np.scapy_tun_spi += 100
2604 np.scapy_tun_sa_id += 1
2605 np.vpp_tun_spi += 100
2606 np.vpp_tun_sa_id += 1
2607 np.tun_if.local_spi = p.vpp_tun_spi
2608 np.tun_if.remote_spi = p.scapy_tun_spi
2610 self.config_sa_tun(np)
2611 self.config_protect(np)
2614 self.verify_tun_44(np, count=127)
2615 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2616 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2619 self.unconfig_protect(np)
2620 self.unconfig_sa(np)
2621 self.unconfig_network(p)
2624 class TestIpsec4TunProtectTunDrop(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2625 """IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2627 encryption_type = ESP
2628 tun4_encrypt_node_name = "esp4-encrypt-tun"
2629 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2632 super(TestIpsec4TunProtectTunDrop, self).setUp()
2634 self.tun_if = self.pg0
2637 super(TestIpsec4TunProtectTunDrop, self).tearDown()
2639 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2641 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2643 IP(src=sw_intf.remote_ip4, dst="5.5.5.5")
2644 / IP(src=src, dst=dst)
2645 / UDP(sport=1144, dport=2233)
2646 / Raw(b"X" * payload_size)
2648 for i in range(count)
2651 def test_tun_drop_44(self):
2652 """IPSEC tunnel protect bogus tunnel header"""
2654 p = self.ipv4_params
2656 self.config_network(p)
2657 self.config_sa_tun(p)
2658 self.config_protect(p)
2660 tx = self.gen_encrypt_pkts(
2664 src=p.remote_tun_if_host,
2665 dst=self.pg1.remote_ip4,
2668 self.send_and_assert_no_replies(self.tun_if, tx)
2671 self.unconfig_protect(p)
2673 self.unconfig_network(p)
2676 @tag_fixme_vpp_workers
2677 class TestIpsec6TunProtect(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2678 """IPsec IPv6 Tunnel protect - transport mode"""
2680 encryption_type = ESP
2681 tun6_encrypt_node_name = "esp6-encrypt-tun"
2682 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2685 super(TestIpsec6TunProtect, self).setUp()
2687 self.tun_if = self.pg0
2690 super(TestIpsec6TunProtect, self).tearDown()
2692 def test_tun_66(self):
2693 """IPSEC tunnel protect 6o6"""
2695 p = self.ipv6_params
2697 self.config_network(p)
2698 self.config_sa_tra(p)
2699 self.config_protect(p)
2701 self.verify_tun_66(p, count=127)
2702 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2703 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2705 # rekey - create new SAs and update the tunnel protection
2707 np.crypt_key = b"X" + p.crypt_key[1:]
2708 np.scapy_tun_spi += 100
2709 np.scapy_tun_sa_id += 1
2710 np.vpp_tun_spi += 100
2711 np.vpp_tun_sa_id += 1
2712 np.tun_if.local_spi = p.vpp_tun_spi
2713 np.tun_if.remote_spi = p.scapy_tun_spi
2715 self.config_sa_tra(np)
2716 self.config_protect(np)
2719 self.verify_tun_66(np, count=127)
2720 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2721 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2723 # bounce the interface state
2724 p.tun_if.admin_down()
2725 self.verify_drop_tun_66(np, count=127)
2726 node = "/err/ipsec6-tun-input/disabled"
2727 self.assertEqual(127, self.statistics.get_err_counter(node))
2729 self.verify_tun_66(np, count=127)
2732 # 1) add two input SAs [old, new]
2733 # 2) swap output SA to [new]
2734 # 3) use only [new] input SA
2736 np3.crypt_key = b"Z" + p.crypt_key[1:]
2737 np3.scapy_tun_spi += 100
2738 np3.scapy_tun_sa_id += 1
2739 np3.vpp_tun_spi += 100
2740 np3.vpp_tun_sa_id += 1
2741 np3.tun_if.local_spi = p.vpp_tun_spi
2742 np3.tun_if.remote_spi = p.scapy_tun_spi
2744 self.config_sa_tra(np3)
2747 p.tun_protect.update_vpp_config(np.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2748 self.verify_tun_66(np, np, count=127)
2749 self.verify_tun_66(np3, np, count=127)
2752 p.tun_protect.update_vpp_config(np3.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2753 self.verify_tun_66(np, np3, count=127)
2754 self.verify_tun_66(np3, np3, count=127)
2757 p.tun_protect.update_vpp_config(np3.tun_sa_out, [np3.tun_sa_in])
2758 self.verify_tun_66(np3, np3, count=127)
2759 self.verify_drop_tun_rx_66(np, count=127)
2761 self.assertEqual(p.tun_if.get_rx_stats(), 127 * 9)
2762 self.assertEqual(p.tun_if.get_tx_stats(), 127 * 8)
2763 self.unconfig_sa(np)
2766 self.unconfig_protect(np3)
2767 self.unconfig_sa(np3)
2768 self.unconfig_network(p)
2770 def test_tun_46(self):
2771 """IPSEC tunnel protect 4o6"""
2773 p = self.ipv6_params
2775 self.config_network(p)
2776 self.config_sa_tra(p)
2777 self.config_protect(p)
2779 self.verify_tun_46(p, count=127)
2780 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2781 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2784 self.unconfig_protect(p)
2786 self.unconfig_network(p)
2789 @tag_fixme_vpp_workers
2790 class TestIpsec6TunProtectTun(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2791 """IPsec IPv6 Tunnel protect - tunnel mode"""
2793 encryption_type = ESP
2794 tun6_encrypt_node_name = "esp6-encrypt-tun"
2795 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2798 super(TestIpsec6TunProtectTun, self).setUp()
2800 self.tun_if = self.pg0
2803 super(TestIpsec6TunProtectTun, self).tearDown()
2805 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2807 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2809 IPv6(src=sw_intf.remote_ip6, dst=sw_intf.local_ip6)
2810 / IPv6(src=src, dst=dst)
2811 / UDP(sport=1166, dport=2233)
2812 / Raw(b"X" * payload_size)
2814 for i in range(count)
2817 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2819 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2820 / IPv6(src=src, dst=dst)
2821 / UDP(sport=1166, dport=2233)
2822 / Raw(b"X" * payload_size)
2823 for i in range(count)
2826 def verify_decrypted6(self, p, rxs):
2828 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2829 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2830 self.assert_packet_checksums_valid(rx)
2832 def verify_encrypted6(self, p, sa, rxs):
2835 pkt = sa.decrypt(rx[IPv6])
2836 if not pkt.haslayer(IPv6):
2837 pkt = IPv6(pkt[Raw].load)
2838 self.assert_packet_checksums_valid(pkt)
2839 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2840 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2841 inner = pkt[IPv6].payload
2842 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2844 except (IndexError, AssertionError):
2845 self.logger.debug(ppp("Unexpected packet:", rx))
2847 self.logger.debug(ppp("Decrypted packet:", pkt))
2852 def test_tun_66(self):
2853 """IPSEC tunnel protect"""
2855 p = self.ipv6_params
2857 self.config_network(p)
2858 self.config_sa_tun(p)
2859 self.config_protect(p)
2861 self.verify_tun_66(p, count=127)
2863 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2864 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2866 # rekey - create new SAs and update the tunnel protection
2868 np.crypt_key = b"X" + p.crypt_key[1:]
2869 np.scapy_tun_spi += 100
2870 np.scapy_tun_sa_id += 1
2871 np.vpp_tun_spi += 100
2872 np.vpp_tun_sa_id += 1
2873 np.tun_if.local_spi = p.vpp_tun_spi
2874 np.tun_if.remote_spi = p.scapy_tun_spi
2876 self.config_sa_tun(np)
2877 self.config_protect(np)
2880 self.verify_tun_66(np, count=127)
2881 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2882 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2885 self.unconfig_protect(np)
2886 self.unconfig_sa(np)
2887 self.unconfig_network(p)
2890 class TestIpsec6TunProtectTunDrop(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2891 """IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2893 encryption_type = ESP
2894 tun6_encrypt_node_name = "esp6-encrypt-tun"
2895 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2898 super(TestIpsec6TunProtectTunDrop, self).setUp()
2900 self.tun_if = self.pg0
2903 super(TestIpsec6TunProtectTunDrop, self).tearDown()
2905 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2906 # the IP destination of the revelaed packet does not match
2907 # that assigned to the tunnel
2909 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2911 IPv6(src=sw_intf.remote_ip6, dst="5::5")
2912 / IPv6(src=src, dst=dst)
2913 / UDP(sport=1144, dport=2233)
2914 / Raw(b"X" * payload_size)
2916 for i in range(count)
2919 def test_tun_drop_66(self):
2920 """IPSEC 6 tunnel protect bogus tunnel header"""
2922 p = self.ipv6_params
2924 self.config_network(p)
2925 self.config_sa_tun(p)
2926 self.config_protect(p)
2928 tx = self.gen_encrypt_pkts6(
2932 src=p.remote_tun_if_host,
2933 dst=self.pg1.remote_ip6,
2936 self.send_and_assert_no_replies(self.tun_if, tx)
2938 self.unconfig_protect(p)
2940 self.unconfig_network(p)
2943 class TemplateIpsecItf4(object):
2944 """IPsec Interface IPv4"""
2946 encryption_type = ESP
2947 tun4_encrypt_node_name = "esp4-encrypt-tun"
2948 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2949 tun4_input_node = "ipsec4-tun-input"
2951 def config_sa_tun(self, p, src, dst):
2952 config_tun_params(p, self.encryption_type, None, src, dst)
2954 p.tun_sa_out = VppIpsecSA(
2960 p.crypt_algo_vpp_id,
2962 self.vpp_esp_protocol,
2967 p.tun_sa_out.add_vpp_config()
2969 p.tun_sa_in = VppIpsecSA(
2975 p.crypt_algo_vpp_id,
2977 self.vpp_esp_protocol,
2981 | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
2983 p.tun_sa_in.add_vpp_config()
2985 def config_protect(self, p):
2986 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2987 p.tun_protect.add_vpp_config()
2989 def config_network(self, p, instance=0xFFFFFFFF):
2990 p.tun_if = VppIpsecInterface(self, instance=instance)
2992 p.tun_if.add_vpp_config()
2994 p.tun_if.config_ip4()
2995 p.tun_if.config_ip6()
2997 p.route = VppIpRoute(
2999 p.remote_tun_if_host,
3001 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3003 p.route.add_vpp_config()
3006 p.remote_tun_if_host6,
3010 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3016 def unconfig_network(self, p):
3017 p.route.remove_vpp_config()
3018 p.tun_if.remove_vpp_config()
3020 def unconfig_protect(self, p):
3021 p.tun_protect.remove_vpp_config()
3023 def unconfig_sa(self, p):
3024 p.tun_sa_out.remove_vpp_config()
3025 p.tun_sa_in.remove_vpp_config()
3028 @tag_fixme_vpp_workers
3029 class TestIpsecItf4(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3030 """IPsec Interface IPv4"""
3033 super(TestIpsecItf4, self).setUp()
3035 self.tun_if = self.pg0
3038 super(TestIpsecItf4, self).tearDown()
3040 def test_tun_instance_44(self):
3041 p = self.ipv4_params
3042 self.config_network(p, instance=3)
3044 with self.assertRaises(CliFailedCommandError):
3045 self.vapi.cli("show interface ipsec0")
3047 output = self.vapi.cli("show interface ipsec3")
3048 self.assertTrue("unknown" not in output)
3050 self.unconfig_network(p)
3052 def test_tun_44(self):
3053 """IPSEC interface IPv4"""
3056 p = self.ipv4_params
3058 self.config_network(p)
3060 p, self.encryption_type, None, self.pg0.local_ip4, self.pg0.remote_ip4
3062 self.verify_tun_dropped_44(p, count=n_pkts)
3063 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3064 self.config_protect(p)
3066 self.verify_tun_44(p, count=n_pkts)
3067 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3068 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3070 p.tun_if.admin_down()
3071 self.verify_tun_dropped_44(p, count=n_pkts)
3073 self.verify_tun_44(p, count=n_pkts)
3075 self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3076 self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3078 # it's a v6 packet when its encrypted
3079 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
3081 self.verify_tun_64(p, count=n_pkts)
3082 self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3083 self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3085 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
3087 # update the SA tunnel
3089 p, self.encryption_type, None, self.pg2.local_ip4, self.pg2.remote_ip4
3091 p.tun_sa_in.update_vpp_config(
3092 is_tun=True, tun_src=self.pg2.remote_ip4, tun_dst=self.pg2.local_ip4
3094 p.tun_sa_out.update_vpp_config(
3095 is_tun=True, tun_src=self.pg2.local_ip4, tun_dst=self.pg2.remote_ip4
3097 self.verify_tun_44(p, count=n_pkts)
3098 self.assertEqual(p.tun_if.get_rx_stats(), 5 * n_pkts)
3099 self.assertEqual(p.tun_if.get_tx_stats(), 4 * n_pkts)
3101 self.vapi.cli("clear interfaces")
3103 # rekey - create new SAs and update the tunnel protection
3105 np.crypt_key = b"X" + p.crypt_key[1:]
3106 np.scapy_tun_spi += 100
3107 np.scapy_tun_sa_id += 1
3108 np.vpp_tun_spi += 100
3109 np.vpp_tun_sa_id += 1
3110 np.tun_if.local_spi = p.vpp_tun_spi
3111 np.tun_if.remote_spi = p.scapy_tun_spi
3113 self.config_sa_tun(np, self.pg0.local_ip4, self.pg0.remote_ip4)
3114 self.config_protect(np)
3117 self.verify_tun_44(np, count=n_pkts)
3118 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3119 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3122 self.unconfig_protect(np)
3123 self.unconfig_sa(np)
3124 self.unconfig_network(p)
3126 def test_tun_44_null(self):
3127 """IPSEC interface IPv4 NULL auth/crypto"""
3130 p = copy.copy(self.ipv4_params)
3132 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
3133 p.crypt_algo_vpp_id = (
3134 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
3136 p.crypt_algo = "NULL"
3137 p.auth_algo = "NULL"
3139 self.config_network(p)
3140 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3141 self.config_protect(p)
3143 self.logger.info(self.vapi.cli("sh ipsec sa"))
3144 self.verify_tun_44(p, count=n_pkts)
3147 self.unconfig_protect(p)
3149 self.unconfig_network(p)
3151 def test_tun_44_police(self):
3152 """IPSEC interface IPv4 with input policer"""
3154 p = self.ipv4_params
3156 self.config_network(p)
3157 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3158 self.config_protect(p)
3160 action_tx = PolicerAction(
3161 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3163 policer = VppPolicer(
3170 conform_action=action_tx,
3171 exceed_action=action_tx,
3172 violate_action=action_tx,
3174 policer.add_vpp_config()
3176 # Start policing on tun
3177 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3179 self.verify_tun_44(p, count=n_pkts)
3180 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3181 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3183 stats = policer.get_stats()
3185 # Single rate, 2 colour policer - expect conform, violate but no exceed
3186 self.assertGreater(stats["conform_packets"], 0)
3187 self.assertEqual(stats["exceed_packets"], 0)
3188 self.assertGreater(stats["violate_packets"], 0)
3190 # Stop policing on tun
3191 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3192 self.verify_tun_44(p, count=n_pkts)
3194 # No new policer stats
3195 statsnew = policer.get_stats()
3196 self.assertEqual(stats, statsnew)
3199 policer.remove_vpp_config()
3200 self.unconfig_protect(p)
3202 self.unconfig_network(p)
3205 class TestIpsecItf4MPLS(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3206 """IPsec Interface MPLSoIPv4"""
3208 tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
3211 super(TestIpsecItf4MPLS, self).setUp()
3213 self.tun_if = self.pg0
3216 super(TestIpsecItf4MPLS, self).tearDown()
3218 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3220 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3222 MPLS(label=44, ttl=3)
3223 / IP(src=src, dst=dst)
3224 / UDP(sport=1166, dport=2233)
3225 / Raw(b"X" * payload_size)
3227 for i in range(count)
3230 def verify_encrypted(self, p, sa, rxs):
3233 pkt = sa.decrypt(rx[IP])
3234 if not pkt.haslayer(IP):
3235 pkt = IP(pkt[Raw].load)
3236 self.assert_packet_checksums_valid(pkt)
3237 self.assert_equal(pkt[MPLS].label, 44)
3238 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
3239 except (IndexError, AssertionError):
3240 self.logger.debug(ppp("Unexpected packet:", rx))
3242 self.logger.debug(ppp("Decrypted packet:", pkt))
3247 def test_tun_mpls_o_ip4(self):
3248 """IPSEC interface MPLS over IPv4"""
3251 p = self.ipv4_params
3254 tbl = VppMplsTable(self, 0)
3255 tbl.add_vpp_config()
3257 self.config_network(p)
3258 # deag MPLS routes from the tunnel
3260 self, 44, 1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
3265 p.tun_if.remote_ip4, p.tun_if.sw_if_index, labels=[VppMplsLabel(44)]
3269 p.tun_if.enable_mpls()
3271 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3272 self.config_protect(p)
3274 self.verify_tun_44(p, count=n_pkts)
3277 p.tun_if.disable_mpls()
3278 self.unconfig_protect(p)
3280 self.unconfig_network(p)
3283 class TemplateIpsecItf6(object):
3284 """IPsec Interface IPv6"""
3286 encryption_type = ESP
3287 tun6_encrypt_node_name = "esp6-encrypt-tun"
3288 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
3289 tun6_input_node = "ipsec6-tun-input"
3291 def config_sa_tun(self, p, src, dst):
3292 config_tun_params(p, self.encryption_type, None, src, dst)
3294 if not hasattr(p, "tun_flags"):
3296 if not hasattr(p, "hop_limit"):
3299 p.tun_sa_out = VppIpsecSA(
3305 p.crypt_algo_vpp_id,
3307 self.vpp_esp_protocol,
3311 tun_flags=p.tun_flags,
3312 hop_limit=p.hop_limit,
3314 p.tun_sa_out.add_vpp_config()
3316 p.tun_sa_in = VppIpsecSA(
3322 p.crypt_algo_vpp_id,
3324 self.vpp_esp_protocol,
3329 p.tun_sa_in.add_vpp_config()
3331 def config_protect(self, p):
3332 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
3333 p.tun_protect.add_vpp_config()
3335 def config_network(self, p):
3336 p.tun_if = VppIpsecInterface(self)
3338 p.tun_if.add_vpp_config()
3340 p.tun_if.config_ip4()
3341 p.tun_if.config_ip6()
3345 p.remote_tun_if_host4,
3347 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3351 p.route = VppIpRoute(
3353 p.remote_tun_if_host,
3357 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3361 p.route.add_vpp_config()
3363 def unconfig_network(self, p):
3364 p.route.remove_vpp_config()
3365 p.tun_if.remove_vpp_config()
3367 def unconfig_protect(self, p):
3368 p.tun_protect.remove_vpp_config()
3370 def unconfig_sa(self, p):
3371 p.tun_sa_out.remove_vpp_config()
3372 p.tun_sa_in.remove_vpp_config()
3375 @tag_fixme_vpp_workers
3376 class TestIpsecItf6(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3377 """IPsec Interface IPv6"""
3380 super(TestIpsecItf6, self).setUp()
3382 self.tun_if = self.pg0
3385 super(TestIpsecItf6, self).tearDown()
3387 def test_tun_66(self):
3388 """IPSEC interface IPv6"""
3390 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3392 p = self.ipv6_params
3393 p.inner_hop_limit = 24
3394 p.outer_hop_limit = 23
3395 p.outer_flow_label = 243224
3396 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3398 self.config_network(p)
3400 p, self.encryption_type, None, self.pg0.local_ip6, self.pg0.remote_ip6
3402 self.verify_drop_tun_66(p, count=n_pkts)
3403 self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3404 self.config_protect(p)
3406 self.verify_tun_66(p, count=n_pkts)
3407 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3408 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3410 p.tun_if.admin_down()
3411 self.verify_drop_tun_66(p, count=n_pkts)
3413 self.verify_tun_66(p, count=n_pkts)
3415 self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3416 self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3418 # it's a v4 packet when its encrypted
3419 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
3421 self.verify_tun_46(p, count=n_pkts)
3422 self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3423 self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3425 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
3427 self.vapi.cli("clear interfaces")
3429 # rekey - create new SAs and update the tunnel protection
3431 np.crypt_key = b"X" + p.crypt_key[1:]
3432 np.scapy_tun_spi += 100
3433 np.scapy_tun_sa_id += 1
3434 np.vpp_tun_spi += 100
3435 np.vpp_tun_sa_id += 1
3436 np.tun_if.local_spi = p.vpp_tun_spi
3437 np.tun_if.remote_spi = p.scapy_tun_spi
3438 np.inner_hop_limit = 24
3439 np.outer_hop_limit = 128
3440 np.inner_flow_label = 0xABCDE
3441 np.outer_flow_label = 0xABCDE
3443 np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
3445 self.config_sa_tun(np, self.pg0.local_ip6, self.pg0.remote_ip6)
3446 self.config_protect(np)
3449 self.verify_tun_66(np, count=n_pkts)
3450 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3451 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3454 self.unconfig_protect(np)
3455 self.unconfig_sa(np)
3456 self.unconfig_network(p)
3458 def test_tun_66_police(self):
3459 """IPSEC interface IPv6 with input policer"""
3460 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3462 p = self.ipv6_params
3463 p.inner_hop_limit = 24
3464 p.outer_hop_limit = 23
3465 p.outer_flow_label = 243224
3466 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3468 self.config_network(p)
3469 self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3470 self.config_protect(p)
3472 action_tx = PolicerAction(
3473 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3475 policer = VppPolicer(
3482 conform_action=action_tx,
3483 exceed_action=action_tx,
3484 violate_action=action_tx,
3486 policer.add_vpp_config()
3488 # Start policing on tun
3489 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3491 self.verify_tun_66(p, count=n_pkts)
3492 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3493 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3495 stats = policer.get_stats()
3497 # Single rate, 2 colour policer - expect conform, violate but no exceed
3498 self.assertGreater(stats["conform_packets"], 0)
3499 self.assertEqual(stats["exceed_packets"], 0)
3500 self.assertGreater(stats["violate_packets"], 0)
3502 # Stop policing on tun
3503 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3504 self.verify_tun_66(p, count=n_pkts)
3506 # No new policer stats
3507 statsnew = policer.get_stats()
3508 self.assertEqual(stats, statsnew)
3511 policer.remove_vpp_config()
3512 self.unconfig_protect(p)
3514 self.unconfig_network(p)
3517 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3518 """Ipsec P2MP ESP v4 tests"""
3520 tun4_encrypt_node_name = "esp4-encrypt-tun"
3521 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3522 encryption_type = ESP
3524 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3526 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3528 IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
3529 / UDP(sport=1144, dport=2233)
3530 / Raw(b"X" * payload_size)
3532 for i in range(count)
3535 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
3537 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3538 / IP(src="1.1.1.1", dst=dst)
3539 / UDP(sport=1144, dport=2233)
3540 / Raw(b"X" * payload_size)
3541 for i in range(count)
3544 def verify_decrypted(self, p, rxs):
3546 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3547 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3549 def verify_encrypted(self, p, sa, rxs):
3553 rx[IP].tos, VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2
3555 self.assertEqual(rx[IP].ttl, p.hop_limit)
3556 pkt = sa.decrypt(rx[IP])
3557 if not pkt.haslayer(IP):
3558 pkt = IP(pkt[Raw].load)
3559 self.assert_packet_checksums_valid(pkt)
3561 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3562 except (IndexError, AssertionError):
3563 self.logger.debug(ppp("Unexpected packet:", rx))
3565 self.logger.debug(ppp("Decrypted packet:", pkt))
3571 super(TestIpsecMIfEsp4, self).setUp()
3574 self.tun_if = self.pg0
3575 p = self.ipv4_params
3576 p.tun_if = VppIpsecInterface(
3577 self, mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP)
3579 p.tun_if.add_vpp_config()
3581 p.tun_if.config_ip4()
3582 p.tun_if.unconfig_ip4()
3583 p.tun_if.config_ip4()
3584 p.tun_if.generate_remote_hosts(N_NHS)
3585 self.pg0.generate_remote_hosts(N_NHS)
3586 self.pg0.configure_ipv4_neighbors()
3588 r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
3589 a = VppAcl(self, [r_all]).add_vpp_config()
3591 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
3592 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
3594 # setup some SAs for several next-hops on the interface
3595 self.multi_params = []
3597 for ii in range(N_NHS):
3598 p = copy.copy(self.ipv4_params)
3600 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3601 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3602 p.scapy_tun_spi = p.scapy_tun_spi + ii
3603 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3604 p.vpp_tun_spi = p.vpp_tun_spi + ii
3606 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3607 p.scapy_tra_spi = p.scapy_tra_spi + ii
3608 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3609 p.vpp_tra_spi = p.vpp_tra_spi + ii
3610 p.hop_limit = ii + 10
3611 p.tun_sa_out = VppIpsecSA(
3617 p.crypt_algo_vpp_id,
3619 self.vpp_esp_protocol,
3621 self.pg0.remote_hosts[ii].ip4,
3622 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3623 hop_limit=p.hop_limit,
3625 p.tun_sa_out.add_vpp_config()
3627 p.tun_sa_in = VppIpsecSA(
3633 p.crypt_algo_vpp_id,
3635 self.vpp_esp_protocol,
3636 self.pg0.remote_hosts[ii].ip4,
3638 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3639 hop_limit=p.hop_limit,
3641 p.tun_sa_in.add_vpp_config()
3643 p.tun_protect = VppIpsecTunProtect(
3648 nh=p.tun_if.remote_hosts[ii].ip4,
3650 p.tun_protect.add_vpp_config()
3653 self.encryption_type,
3656 self.pg0.remote_hosts[ii].ip4,
3658 self.multi_params.append(p)
3660 p.via_tun_route = VppIpRoute(
3662 p.remote_tun_if_host,
3664 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
3667 p.tun_dst = self.pg0.remote_hosts[ii].ip4
3670 p = self.ipv4_params
3671 p.tun_if.unconfig_ip4()
3672 super(TestIpsecMIfEsp4, self).tearDown()
3674 def test_tun_44(self):
3677 for p in self.multi_params:
3678 self.verify_tun_44(p, count=N_PKTS)
3680 # remove one tunnel protect, the rest should still work
3681 self.multi_params[0].tun_protect.remove_vpp_config()
3682 self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3683 self.multi_params[0].via_tun_route.remove_vpp_config()
3684 self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3686 for p in self.multi_params[1:]:
3687 self.verify_tun_44(p, count=N_PKTS)
3689 self.multi_params[0].tun_protect.add_vpp_config()
3690 self.multi_params[0].via_tun_route.add_vpp_config()
3692 for p in self.multi_params:
3693 self.verify_tun_44(p, count=N_PKTS)
3696 class TestIpsecItf6MPLS(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3697 """IPsec Interface MPLSoIPv6"""
3699 tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3702 super(TestIpsecItf6MPLS, self).setUp()
3704 self.tun_if = self.pg0
3707 super(TestIpsecItf6MPLS, self).tearDown()
3709 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3711 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3713 MPLS(label=66, ttl=3)
3714 / IPv6(src=src, dst=dst)
3715 / UDP(sport=1166, dport=2233)
3716 / Raw(b"X" * payload_size)
3718 for i in range(count)
3721 def verify_encrypted6(self, p, sa, rxs):
3724 pkt = sa.decrypt(rx[IPv6])
3725 if not pkt.haslayer(IPv6):
3726 pkt = IP(pkt[Raw].load)
3727 self.assert_packet_checksums_valid(pkt)
3728 self.assert_equal(pkt[MPLS].label, 66)
3729 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3730 except (IndexError, AssertionError):
3731 self.logger.debug(ppp("Unexpected packet:", rx))
3733 self.logger.debug(ppp("Decrypted packet:", pkt))
3738 def test_tun_mpls_o_ip6(self):
3739 """IPSEC interface MPLS over IPv6"""
3742 p = self.ipv6_params
3745 tbl = VppMplsTable(self, 0)
3746 tbl.add_vpp_config()
3748 self.config_network(p)
3749 # deag MPLS routes from the tunnel
3754 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3755 eos_proto=f.FIB_PATH_NH_PROTO_IP6,
3760 p.tun_if.remote_ip6, p.tun_if.sw_if_index, labels=[VppMplsLabel(66)]
3764 p.tun_if.enable_mpls()
3766 self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3767 self.config_protect(p)
3769 self.verify_tun_66(p, count=n_pkts)
3772 p.tun_if.disable_mpls()
3773 self.unconfig_protect(p)
3775 self.unconfig_network(p)
3778 if __name__ == "__main__":
3779 unittest.main(testRunner=VppTestRunner)