5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw, bind_layers
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from framework import tag_fixme_vpp_workers
12 from framework import VppTestRunner
13 from template_ipsec import (
21 IpsecTun6HandoffTests,
22 IpsecTun4HandoffTests,
25 from vpp_gre_interface import VppGreInterface
26 from vpp_ipip_tun_interface import VppIpIpTunInterface
27 from vpp_ip_route import (
36 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
37 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
38 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
39 from vpp_teib import VppTeib
41 from vpp_papi import VppEnum
42 from vpp_papi_provider import CliFailedCommandError
43 from vpp_acl import AclRule, VppAcl, VppAclInterface
44 from vpp_policer import PolicerAction, VppPolicer, Dir
47 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
48 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
50 p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
52 crypt_key = mk_scapy_crypt_key(p)
54 p.tun_dst = tun_if.remote_ip
55 p.tun_src = tun_if.local_ip
61 is_default_port = p.nat_header.dport == 4500
63 is_default_port = True
66 outbound_nat_header = p.nat_header
68 outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
69 bind_layers(UDP, ESP, dport=p.nat_header.dport)
71 p.scapy_tun_sa = SecurityAssociation(
74 crypt_algo=p.crypt_algo,
76 auth_algo=p.auth_algo,
78 tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
79 nat_t_header=outbound_nat_header,
82 p.vpp_tun_sa = SecurityAssociation(
85 crypt_algo=p.crypt_algo,
87 auth_algo=p.auth_algo,
89 tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
90 nat_t_header=p.nat_header,
95 def config_tra_params(p, encryption_type, tun_if):
96 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
98 p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
100 crypt_key = mk_scapy_crypt_key(p)
101 p.tun_dst = tun_if.remote_ip
102 p.tun_src = tun_if.local_ip
105 is_default_port = p.nat_header.dport == 4500
107 is_default_port = True
110 outbound_nat_header = p.nat_header
112 outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
113 bind_layers(UDP, ESP, dport=p.nat_header.dport)
115 p.scapy_tun_sa = SecurityAssociation(
118 crypt_algo=p.crypt_algo,
120 auth_algo=p.auth_algo,
123 nat_t_header=outbound_nat_header,
125 p.vpp_tun_sa = SecurityAssociation(
128 crypt_algo=p.crypt_algo,
130 auth_algo=p.auth_algo,
133 nat_t_header=p.nat_header,
137 class TemplateIpsec4TunProtect(object):
138 """IPsec IPv4 Tunnel protect"""
140 encryption_type = ESP
141 tun4_encrypt_node_name = "esp4-encrypt-tun"
142 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
143 tun4_input_node = "ipsec4-tun-input"
145 def config_sa_tra(self, p):
146 config_tun_params(p, self.encryption_type, p.tun_if)
148 p.tun_sa_out = VppIpsecSA(
156 self.vpp_esp_protocol,
159 p.tun_sa_out.add_vpp_config()
161 p.tun_sa_in = VppIpsecSA(
169 self.vpp_esp_protocol,
172 p.tun_sa_in.add_vpp_config()
174 def config_sa_tun(self, p):
175 config_tun_params(p, self.encryption_type, p.tun_if)
177 p.tun_sa_out = VppIpsecSA(
185 self.vpp_esp_protocol,
186 self.tun_if.local_addr[p.addr_type],
187 self.tun_if.remote_addr[p.addr_type],
190 p.tun_sa_out.add_vpp_config()
192 p.tun_sa_in = VppIpsecSA(
200 self.vpp_esp_protocol,
201 self.tun_if.remote_addr[p.addr_type],
202 self.tun_if.local_addr[p.addr_type],
205 p.tun_sa_in.add_vpp_config()
207 def config_protect(self, p):
208 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
209 p.tun_protect.add_vpp_config()
211 def config_network(self, p):
212 if hasattr(p, "tun_dst"):
215 tun_dst = self.pg0.remote_ip4
216 p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip4, tun_dst)
217 p.tun_if.add_vpp_config()
219 p.tun_if.config_ip4()
220 p.tun_if.config_ip6()
222 p.route = VppIpRoute(
224 p.remote_tun_if_host,
226 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
228 p.route.add_vpp_config()
231 p.remote_tun_if_host6,
235 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
241 def unconfig_network(self, p):
242 p.route.remove_vpp_config()
243 p.tun_if.remove_vpp_config()
245 def unconfig_protect(self, p):
246 p.tun_protect.remove_vpp_config()
248 def unconfig_sa(self, p):
249 p.tun_sa_out.remove_vpp_config()
250 p.tun_sa_in.remove_vpp_config()
253 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec):
254 """IPsec tunnel interface tests"""
256 encryption_type = ESP
260 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
263 def tearDownClass(cls):
264 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
267 super(TemplateIpsec4TunIfEsp, self).setUp()
269 self.tun_if = self.pg0
273 self.config_network(p)
274 self.config_sa_tra(p)
275 self.config_protect(p)
278 super(TemplateIpsec4TunIfEsp, self).tearDown()
281 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect, TemplateIpsec):
282 """IPsec UDP tunnel interface tests"""
284 tun4_encrypt_node_name = "esp4-encrypt-tun"
285 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
286 encryption_type = ESP
290 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
293 def tearDownClass(cls):
294 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
296 def verify_encrypted(self, p, sa, rxs):
299 # ensure the UDP ports are correct before we decrypt
301 self.assertTrue(rx.haslayer(UDP))
302 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
303 self.assert_equal(rx[UDP].dport, p.nat_header.dport)
305 pkt = sa.decrypt(rx[IP])
306 if not pkt.haslayer(IP):
307 pkt = IP(pkt[Raw].load)
309 self.assert_packet_checksums_valid(pkt)
310 self.assert_equal(pkt[IP].dst, "1.1.1.1")
311 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
312 except (IndexError, AssertionError):
313 self.logger.debug(ppp("Unexpected packet:", rx))
315 self.logger.debug(ppp("Decrypted packet:", pkt))
320 def config_sa_tra(self, p):
321 config_tun_params(p, self.encryption_type, p.tun_if)
323 p.tun_sa_out = VppIpsecSA(
331 self.vpp_esp_protocol,
333 udp_src=p.nat_header.sport,
334 udp_dst=p.nat_header.dport,
336 p.tun_sa_out.add_vpp_config()
338 p.tun_sa_in = VppIpsecSA(
346 self.vpp_esp_protocol,
348 | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
349 udp_src=p.nat_header.sport,
350 udp_dst=p.nat_header.dport,
352 p.tun_sa_in.add_vpp_config()
355 super(TemplateIpsec4TunIfEspUdp, self).setUp()
358 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
359 p.nat_header = UDP(sport=5454, dport=4500)
361 self.tun_if = self.pg0
363 self.config_network(p)
364 self.config_sa_tra(p)
365 self.config_protect(p)
368 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
371 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
372 """Ipsec ESP - TUN tests"""
374 tun4_encrypt_node_name = "esp4-encrypt-tun"
375 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
377 def test_tun_basic64(self):
378 """ipsec 6o4 tunnel basic test"""
379 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
381 self.verify_tun_64(self.params[socket.AF_INET], count=1)
383 def test_tun_burst64(self):
384 """ipsec 6o4 tunnel basic test"""
385 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
387 self.verify_tun_64(self.params[socket.AF_INET], count=257)
389 def test_tun_basic_frag44(self):
390 """ipsec 4o4 tunnel frag basic test"""
391 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
395 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [1500, 0, 0, 0])
397 self.params[socket.AF_INET], count=1, payload_size=1800, n_rx=2
399 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [9000, 0, 0, 0])
402 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
403 """Ipsec ESP UDP tests"""
405 tun4_input_node = "ipsec4-tun-input"
408 super(TestIpsec4TunIfEspUdp, self).setUp()
410 def test_keepalive(self):
411 """IPSEC NAT Keepalive"""
412 self.verify_keepalive(self.ipv4_params)
415 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
416 """Ipsec ESP UDP GCM tests"""
418 tun4_input_node = "ipsec4-tun-input"
421 super(TestIpsec4TunIfEspUdpGCM, self).setUp()
423 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
424 p.crypt_algo_vpp_id = (
425 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
427 p.crypt_algo = "AES-GCM"
429 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
433 class TestIpsec4TunIfEspUdpUpdate(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
434 """Ipsec ESP UDP update tests"""
436 tun4_input_node = "ipsec4-tun-input"
439 super(TestIpsec4TunIfEspUdpUpdate, self).setUp()
441 p.nat_header = UDP(sport=6565, dport=7676)
442 config_tun_params(p, self.encryption_type, p.tun_if)
443 p.tun_sa_in.update_vpp_config(
444 udp_src=p.nat_header.dport, udp_dst=p.nat_header.sport
446 p.tun_sa_out.update_vpp_config(
447 udp_src=p.nat_header.sport, udp_dst=p.nat_header.dport
451 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
452 """Ipsec ESP - TCP tests"""
457 class TemplateIpsec6TunProtect(object):
458 """IPsec IPv6 Tunnel protect"""
460 def config_sa_tra(self, p):
461 config_tun_params(p, self.encryption_type, p.tun_if)
463 p.tun_sa_out = VppIpsecSA(
471 self.vpp_esp_protocol,
473 p.tun_sa_out.add_vpp_config()
475 p.tun_sa_in = VppIpsecSA(
483 self.vpp_esp_protocol,
485 p.tun_sa_in.add_vpp_config()
487 def config_sa_tun(self, p):
488 config_tun_params(p, self.encryption_type, p.tun_if)
490 p.tun_sa_out = VppIpsecSA(
498 self.vpp_esp_protocol,
499 self.tun_if.local_addr[p.addr_type],
500 self.tun_if.remote_addr[p.addr_type],
502 p.tun_sa_out.add_vpp_config()
504 p.tun_sa_in = VppIpsecSA(
512 self.vpp_esp_protocol,
513 self.tun_if.remote_addr[p.addr_type],
514 self.tun_if.local_addr[p.addr_type],
516 p.tun_sa_in.add_vpp_config()
518 def config_protect(self, p):
519 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
520 p.tun_protect.add_vpp_config()
522 def config_network(self, p):
523 if hasattr(p, "tun_dst"):
526 tun_dst = self.pg0.remote_ip6
527 p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip6, tun_dst)
528 p.tun_if.add_vpp_config()
530 p.tun_if.config_ip6()
531 p.tun_if.config_ip4()
533 p.route = VppIpRoute(
535 p.remote_tun_if_host,
539 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
543 p.route.add_vpp_config()
546 p.remote_tun_if_host4,
548 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
552 def unconfig_network(self, p):
553 p.route.remove_vpp_config()
554 p.tun_if.remove_vpp_config()
556 def unconfig_protect(self, p):
557 p.tun_protect.remove_vpp_config()
559 def unconfig_sa(self, p):
560 p.tun_sa_out.remove_vpp_config()
561 p.tun_sa_in.remove_vpp_config()
564 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec):
565 """IPsec tunnel interface tests"""
567 encryption_type = ESP
570 super(TemplateIpsec6TunIfEsp, self).setUp()
572 self.tun_if = self.pg0
575 self.config_network(p)
576 self.config_sa_tra(p)
577 self.config_protect(p)
580 super(TemplateIpsec6TunIfEsp, self).tearDown()
583 class TemplateIpsec6TunIfEspUdp(TemplateIpsec6TunProtect, TemplateIpsec):
584 """IPsec6 UDP tunnel interface tests"""
586 tun4_encrypt_node_name = "esp6-encrypt-tun"
587 tun4_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
588 encryption_type = ESP
592 super(TemplateIpsec6TunIfEspUdp, cls).setUpClass()
595 def tearDownClass(cls):
596 super(TemplateIpsec6TunIfEspUdp, cls).tearDownClass()
598 def verify_encrypted(self, p, sa, rxs):
601 # ensure the UDP ports are correct before we decrypt
603 self.assertTrue(rx.haslayer(UDP))
604 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
605 self.assert_equal(rx[UDP].dport, p.nat_header.dport)
607 pkt = sa.decrypt(rx[IP])
608 if not pkt.haslayer(IP):
609 pkt = IP(pkt[Raw].load)
611 self.assert_packet_checksums_valid(pkt)
613 pkt[IP].dst, "1111:1111:1111:1111:1111:1111:1111:1111"
615 self.assert_equal(pkt[IP].src, self.pg1.remote_ip6)
616 except (IndexError, AssertionError):
617 self.logger.debug(ppp("Unexpected packet:", rx))
619 self.logger.debug(ppp("Decrypted packet:", pkt))
624 def config_sa_tra(self, p):
625 config_tun_params(p, self.encryption_type, p.tun_if)
627 p.tun_sa_out = VppIpsecSA(
635 self.vpp_esp_protocol,
637 udp_src=p.nat_header.sport,
638 udp_dst=p.nat_header.dport,
640 p.tun_sa_out.add_vpp_config()
642 p.tun_sa_in = VppIpsecSA(
650 self.vpp_esp_protocol,
652 | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
653 udp_src=p.nat_header.sport,
654 udp_dst=p.nat_header.dport,
656 p.tun_sa_in.add_vpp_config()
659 super(TemplateIpsec6TunIfEspUdp, self).setUp()
662 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
663 p.nat_header = UDP(sport=5454, dport=4500)
665 self.tun_if = self.pg0
667 self.config_network(p)
668 self.config_sa_tra(p)
669 self.config_protect(p)
672 super(TemplateIpsec6TunIfEspUdp, self).tearDown()
675 class TestIpsec6TunIfEspUdp(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
676 """Ipsec ESP 6 UDP tests"""
678 tun6_input_node = "ipsec6-tun-input"
679 tun6_encrypt_node_name = "esp6-encrypt-tun"
680 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
683 super(TestIpsec6TunIfEspUdp, self).setUp()
685 def test_keepalive(self):
686 """IPSEC6 NAT Keepalive"""
687 self.verify_keepalive(self.ipv6_params)
690 class TestIpsec6TunIfEspUdpGCM(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
691 """Ipsec ESP 6 UDP GCM tests"""
693 tun6_input_node = "ipsec6-tun-input"
694 tun6_encrypt_node_name = "esp6-encrypt-tun"
695 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
698 super(TestIpsec6TunIfEspUdpGCM, self).setUp()
700 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
701 p.crypt_algo_vpp_id = (
702 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
704 p.crypt_algo = "AES-GCM"
706 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
710 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
711 """Ipsec ESP - TUN tests"""
713 tun6_encrypt_node_name = "esp6-encrypt-tun"
714 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
716 def test_tun_basic46(self):
717 """ipsec 4o6 tunnel basic test"""
718 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
719 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
721 def test_tun_burst46(self):
722 """ipsec 4o6 tunnel burst test"""
723 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
724 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
727 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp, IpsecTun6HandoffTests):
728 """Ipsec ESP 6 Handoff tests"""
730 tun6_encrypt_node_name = "esp6-encrypt-tun"
731 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
733 def test_tun_handoff_66_police(self):
734 """ESP 6o6 tunnel with policer worker hand-off test"""
735 self.vapi.cli("clear errors")
736 self.vapi.cli("clear ipsec sa")
739 p = self.params[socket.AF_INET6]
741 action_tx = PolicerAction(
742 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
744 policer = VppPolicer(
751 conform_action=action_tx,
752 exceed_action=action_tx,
753 violate_action=action_tx,
755 policer.add_vpp_config()
757 # Start policing on tun
758 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
760 for pol_bind in [1, 0]:
761 policer.bind_vpp_config(pol_bind, True)
763 # inject alternately on worker 0 and 1.
764 for worker in [0, 1, 0, 1]:
765 send_pkts = self.gen_encrypt_pkts6(
769 src=p.remote_tun_if_host,
770 dst=self.pg1.remote_ip6,
773 recv_pkts = self.send_and_expect(
774 self.tun_if, send_pkts, self.pg1, worker=worker
776 self.verify_decrypted6(p, recv_pkts)
777 self.logger.debug(self.vapi.cli("show trace max 100"))
779 stats = policer.get_stats()
780 stats0 = policer.get_stats(worker=0)
781 stats1 = policer.get_stats(worker=1)
784 # First pass: Worker 1, should have done all the policing
785 self.assertEqual(stats, stats1)
787 # Worker 0, should have handed everything off
788 self.assertEqual(stats0["conform_packets"], 0)
789 self.assertEqual(stats0["exceed_packets"], 0)
790 self.assertEqual(stats0["violate_packets"], 0)
792 # Second pass: both workers should have policed equal amounts
793 self.assertGreater(stats1["conform_packets"], 0)
794 self.assertEqual(stats1["exceed_packets"], 0)
795 self.assertGreater(stats1["violate_packets"], 0)
797 self.assertGreater(stats0["conform_packets"], 0)
798 self.assertEqual(stats0["exceed_packets"], 0)
799 self.assertGreater(stats0["violate_packets"], 0)
802 stats0["conform_packets"] + stats0["violate_packets"],
803 stats1["conform_packets"] + stats1["violate_packets"],
806 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
807 policer.remove_vpp_config()
810 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp, IpsecTun4HandoffTests):
811 """Ipsec ESP 4 Handoff tests"""
813 tun4_encrypt_node_name = "esp4-encrypt-tun"
814 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
816 def test_tun_handoff_44_police(self):
817 """ESP 4o4 tunnel with policer worker hand-off test"""
818 self.vapi.cli("clear errors")
819 self.vapi.cli("clear ipsec sa")
822 p = self.params[socket.AF_INET]
824 action_tx = PolicerAction(
825 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
827 policer = VppPolicer(
834 conform_action=action_tx,
835 exceed_action=action_tx,
836 violate_action=action_tx,
838 policer.add_vpp_config()
840 # Start policing on tun
841 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
843 for pol_bind in [1, 0]:
844 policer.bind_vpp_config(pol_bind, True)
846 # inject alternately on worker 0 and 1.
847 for worker in [0, 1, 0, 1]:
848 send_pkts = self.gen_encrypt_pkts(
852 src=p.remote_tun_if_host,
853 dst=self.pg1.remote_ip4,
856 recv_pkts = self.send_and_expect(
857 self.tun_if, send_pkts, self.pg1, worker=worker
859 self.verify_decrypted(p, recv_pkts)
860 self.logger.debug(self.vapi.cli("show trace max 100"))
862 stats = policer.get_stats()
863 stats0 = policer.get_stats(worker=0)
864 stats1 = policer.get_stats(worker=1)
867 # First pass: Worker 1, should have done all the policing
868 self.assertEqual(stats, stats1)
870 # Worker 0, should have handed everything off
871 self.assertEqual(stats0["conform_packets"], 0)
872 self.assertEqual(stats0["exceed_packets"], 0)
873 self.assertEqual(stats0["violate_packets"], 0)
875 # Second pass: both workers should have policed equal amounts
876 self.assertGreater(stats1["conform_packets"], 0)
877 self.assertEqual(stats1["exceed_packets"], 0)
878 self.assertGreater(stats1["violate_packets"], 0)
880 self.assertGreater(stats0["conform_packets"], 0)
881 self.assertEqual(stats0["exceed_packets"], 0)
882 self.assertGreater(stats0["violate_packets"], 0)
885 stats0["conform_packets"] + stats0["violate_packets"],
886 stats1["conform_packets"] + stats1["violate_packets"],
889 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
890 policer.remove_vpp_config()
893 @tag_fixme_vpp_workers
894 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
895 """IPsec IPv4 Multi Tunnel interface"""
897 encryption_type = ESP
898 tun4_encrypt_node_name = "esp4-encrypt-tun"
899 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
902 super(TestIpsec4MultiTunIfEsp, self).setUp()
904 self.tun_if = self.pg0
906 self.multi_params = []
907 self.pg0.generate_remote_hosts(10)
908 self.pg0.configure_ipv4_neighbors()
911 p = copy.copy(self.ipv4_params)
913 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
914 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
915 p.scapy_tun_spi = p.scapy_tun_spi + ii
916 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
917 p.vpp_tun_spi = p.vpp_tun_spi + ii
919 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
920 p.scapy_tra_spi = p.scapy_tra_spi + ii
921 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
922 p.vpp_tra_spi = p.vpp_tra_spi + ii
923 p.tun_dst = self.pg0.remote_hosts[ii].ip4
925 self.multi_params.append(p)
926 self.config_network(p)
927 self.config_sa_tra(p)
928 self.config_protect(p)
931 super(TestIpsec4MultiTunIfEsp, self).tearDown()
933 def test_tun_44(self):
934 """Multiple IPSEC tunnel interfaces"""
935 for p in self.multi_params:
936 self.verify_tun_44(p, count=127)
937 self.assertEqual(p.tun_if.get_rx_stats(), 127)
938 self.assertEqual(p.tun_if.get_tx_stats(), 127)
940 def test_tun_rr_44(self):
941 """Round-robin packets acrros multiple interface"""
943 for p in self.multi_params:
944 tx = tx + self.gen_encrypt_pkts(
948 src=p.remote_tun_if_host,
949 dst=self.pg1.remote_ip4,
951 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
953 for rx, p in zip(rxs, self.multi_params):
954 self.verify_decrypted(p, [rx])
957 for p in self.multi_params:
958 tx = tx + self.gen_pkts(
959 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host
961 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
963 for rx, p in zip(rxs, self.multi_params):
964 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
967 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
968 """IPsec IPv4 Tunnel interface all Algos"""
970 encryption_type = ESP
971 tun4_encrypt_node_name = "esp4-encrypt-tun"
972 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
975 super(TestIpsec4TunIfEspAll, self).setUp()
977 self.tun_if = self.pg0
980 self.config_network(p)
981 self.config_sa_tra(p)
982 self.config_protect(p)
986 self.unconfig_protect(p)
987 self.unconfig_network(p)
990 super(TestIpsec4TunIfEspAll, self).tearDown()
994 # change the key and the SPI
997 p.crypt_key = b"X" + p.crypt_key[1:]
999 p.scapy_tun_sa_id += 1
1001 p.vpp_tun_sa_id += 1
1002 p.tun_if.local_spi = p.vpp_tun_spi
1003 p.tun_if.remote_spi = p.scapy_tun_spi
1005 config_tun_params(p, self.encryption_type, p.tun_if)
1007 p.tun_sa_out = VppIpsecSA(
1013 p.crypt_algo_vpp_id,
1015 self.vpp_esp_protocol,
1019 p.tun_sa_in = VppIpsecSA(
1025 p.crypt_algo_vpp_id,
1027 self.vpp_esp_protocol,
1031 p.tun_sa_in.add_vpp_config()
1032 p.tun_sa_out.add_vpp_config()
1034 self.config_protect(p)
1035 np.tun_sa_out.remove_vpp_config()
1036 np.tun_sa_in.remove_vpp_config()
1037 self.logger.info(self.vapi.cli("sh ipsec sa"))
1039 def test_tun_44(self):
1040 """IPSEC tunnel all algos"""
1042 # foreach VPP crypto engine
1043 engines = ["ia32", "ipsecmb", "openssl"]
1045 # foreach crypto algorithm
1049 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_128
1052 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1054 "scapy-crypto": "AES-GCM",
1055 "scapy-integ": "NULL",
1056 "key": b"JPjyOWBeVEQiMe7h",
1061 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_192
1064 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1066 "scapy-crypto": "AES-GCM",
1067 "scapy-integ": "NULL",
1068 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1073 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
1076 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1078 "scapy-crypto": "AES-GCM",
1079 "scapy-integ": "NULL",
1080 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1085 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
1088 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1090 "scapy-crypto": "AES-CBC",
1091 "scapy-integ": "HMAC-SHA1-96",
1093 "key": b"JPjyOWBeVEQiMe7h",
1097 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_192
1100 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256
1102 "scapy-crypto": "AES-CBC",
1103 "scapy-integ": "SHA2-512-256",
1105 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1109 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_256
1112 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128
1114 "scapy-crypto": "AES-CBC",
1115 "scapy-integ": "SHA2-256-128",
1117 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1121 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1124 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1126 "scapy-crypto": "NULL",
1127 "scapy-integ": "HMAC-SHA1-96",
1129 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1133 for engine in engines:
1134 self.vapi.cli("set crypto handler all %s" % engine)
1137 # loop through each of the algorithms
1140 # with self.subTest(algo=algo['scapy']):
1142 p = self.ipv4_params
1143 p.auth_algo_vpp_id = algo["vpp-integ"]
1144 p.crypt_algo_vpp_id = algo["vpp-crypto"]
1145 p.crypt_algo = algo["scapy-crypto"]
1146 p.auth_algo = algo["scapy-integ"]
1147 p.crypt_key = algo["key"]
1148 p.salt = algo["salt"]
1154 self.verify_tun_44(p, count=127)
1157 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
1158 """IPsec IPv4 Tunnel interface no Algos"""
1160 encryption_type = ESP
1161 tun4_encrypt_node_name = "esp4-encrypt-tun"
1162 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1165 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
1167 self.tun_if = self.pg0
1168 p = self.ipv4_params
1169 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1170 p.auth_algo = "NULL"
1173 p.crypt_algo_vpp_id = (
1174 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1176 p.crypt_algo = "NULL"
1180 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
1182 def test_tun_44(self):
1183 """IPSec SA with NULL algos"""
1184 p = self.ipv4_params
1186 self.config_network(p)
1187 self.config_sa_tra(p)
1188 self.config_protect(p)
1190 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host)
1191 self.send_and_assert_no_replies(self.pg1, tx)
1193 self.unconfig_protect(p)
1195 self.unconfig_network(p)
1198 @tag_fixme_vpp_workers
1199 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec, IpsecTun6):
1200 """IPsec IPv6 Multi Tunnel interface"""
1202 encryption_type = ESP
1203 tun6_encrypt_node_name = "esp6-encrypt-tun"
1204 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1207 super(TestIpsec6MultiTunIfEsp, self).setUp()
1209 self.tun_if = self.pg0
1211 self.multi_params = []
1212 self.pg0.generate_remote_hosts(10)
1213 self.pg0.configure_ipv6_neighbors()
1215 for ii in range(10):
1216 p = copy.copy(self.ipv6_params)
1218 p.remote_tun_if_host = "1111::%d" % (ii + 1)
1219 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1220 p.scapy_tun_spi = p.scapy_tun_spi + ii
1221 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1222 p.vpp_tun_spi = p.vpp_tun_spi + ii
1224 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1225 p.scapy_tra_spi = p.scapy_tra_spi + ii
1226 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1227 p.vpp_tra_spi = p.vpp_tra_spi + ii
1228 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1230 self.multi_params.append(p)
1231 self.config_network(p)
1232 self.config_sa_tra(p)
1233 self.config_protect(p)
1236 super(TestIpsec6MultiTunIfEsp, self).tearDown()
1238 def test_tun_66(self):
1239 """Multiple IPSEC tunnel interfaces"""
1240 for p in self.multi_params:
1241 self.verify_tun_66(p, count=127)
1242 self.assertEqual(p.tun_if.get_rx_stats(), 127)
1243 self.assertEqual(p.tun_if.get_tx_stats(), 127)
1246 class TestIpsecGreTebIfEsp(TemplateIpsec, IpsecTun4Tests):
1247 """Ipsec GRE TEB ESP - TUN tests"""
1249 tun4_encrypt_node_name = "esp4-encrypt-tun"
1250 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1251 encryption_type = ESP
1252 omac = "00:11:22:33:44:55"
1254 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1256 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1258 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1260 / Ether(dst=self.omac)
1261 / IP(src="1.1.1.1", dst="1.1.1.2")
1262 / UDP(sport=1144, dport=2233)
1263 / Raw(b"X" * payload_size)
1265 for i in range(count)
1268 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1270 Ether(dst=self.omac)
1271 / IP(src="1.1.1.1", dst="1.1.1.2")
1272 / UDP(sport=1144, dport=2233)
1273 / Raw(b"X" * payload_size)
1274 for i in range(count)
1277 def verify_decrypted(self, p, rxs):
1279 self.assert_equal(rx[Ether].dst, self.omac)
1280 self.assert_equal(rx[IP].dst, "1.1.1.2")
1282 def verify_encrypted(self, p, sa, rxs):
1285 pkt = sa.decrypt(rx[IP])
1286 if not pkt.haslayer(IP):
1287 pkt = IP(pkt[Raw].load)
1288 self.assert_packet_checksums_valid(pkt)
1289 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1290 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1291 self.assertTrue(pkt.haslayer(GRE))
1293 self.assertEqual(e[Ether].dst, self.omac)
1294 self.assertEqual(e[IP].dst, "1.1.1.2")
1295 except (IndexError, AssertionError):
1296 self.logger.debug(ppp("Unexpected packet:", rx))
1298 self.logger.debug(ppp("Decrypted packet:", pkt))
1304 super(TestIpsecGreTebIfEsp, self).setUp()
1306 self.tun_if = self.pg0
1308 p = self.ipv4_params
1310 bd1 = VppBridgeDomain(self, 1)
1311 bd1.add_vpp_config()
1313 p.tun_sa_out = VppIpsecSA(
1319 p.crypt_algo_vpp_id,
1321 self.vpp_esp_protocol,
1323 self.pg0.remote_ip4,
1325 p.tun_sa_out.add_vpp_config()
1327 p.tun_sa_in = VppIpsecSA(
1333 p.crypt_algo_vpp_id,
1335 self.vpp_esp_protocol,
1336 self.pg0.remote_ip4,
1339 p.tun_sa_in.add_vpp_config()
1341 p.tun_if = VppGreInterface(
1344 self.pg0.remote_ip4,
1345 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1347 p.tun_if.add_vpp_config()
1349 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1351 p.tun_protect.add_vpp_config()
1354 p.tun_if.config_ip4()
1355 config_tun_params(p, self.encryption_type, p.tun_if)
1357 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1358 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1360 self.vapi.cli("clear ipsec sa")
1361 self.vapi.cli("sh adj")
1362 self.vapi.cli("sh ipsec tun")
1365 p = self.ipv4_params
1366 p.tun_if.unconfig_ip4()
1367 super(TestIpsecGreTebIfEsp, self).tearDown()
1370 class TestIpsecGreTebVlanIfEsp(TemplateIpsec, IpsecTun4Tests):
1371 """Ipsec GRE TEB ESP - TUN tests"""
1373 tun4_encrypt_node_name = "esp4-encrypt-tun"
1374 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1375 encryption_type = ESP
1376 omac = "00:11:22:33:44:55"
1378 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1380 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1382 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1384 / Ether(dst=self.omac)
1385 / IP(src="1.1.1.1", dst="1.1.1.2")
1386 / UDP(sport=1144, dport=2233)
1387 / Raw(b"X" * payload_size)
1389 for i in range(count)
1392 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1394 Ether(dst=self.omac)
1396 / IP(src="1.1.1.1", dst="1.1.1.2")
1397 / UDP(sport=1144, dport=2233)
1398 / Raw(b"X" * payload_size)
1399 for i in range(count)
1402 def verify_decrypted(self, p, rxs):
1404 self.assert_equal(rx[Ether].dst, self.omac)
1405 self.assert_equal(rx[Dot1Q].vlan, 11)
1406 self.assert_equal(rx[IP].dst, "1.1.1.2")
1408 def verify_encrypted(self, p, sa, rxs):
1411 pkt = sa.decrypt(rx[IP])
1412 if not pkt.haslayer(IP):
1413 pkt = IP(pkt[Raw].load)
1414 self.assert_packet_checksums_valid(pkt)
1415 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1416 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1417 self.assertTrue(pkt.haslayer(GRE))
1419 self.assertEqual(e[Ether].dst, self.omac)
1420 self.assertFalse(e.haslayer(Dot1Q))
1421 self.assertEqual(e[IP].dst, "1.1.1.2")
1422 except (IndexError, AssertionError):
1423 self.logger.debug(ppp("Unexpected packet:", rx))
1425 self.logger.debug(ppp("Decrypted packet:", pkt))
1431 super(TestIpsecGreTebVlanIfEsp, self).setUp()
1433 self.tun_if = self.pg0
1435 p = self.ipv4_params
1437 bd1 = VppBridgeDomain(self, 1)
1438 bd1.add_vpp_config()
1440 self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1441 self.vapi.l2_interface_vlan_tag_rewrite(
1442 sw_if_index=self.pg1_11.sw_if_index,
1443 vtr_op=L2_VTR_OP.L2_POP_1,
1446 self.pg1_11.admin_up()
1448 p.tun_sa_out = VppIpsecSA(
1454 p.crypt_algo_vpp_id,
1456 self.vpp_esp_protocol,
1458 self.pg0.remote_ip4,
1460 p.tun_sa_out.add_vpp_config()
1462 p.tun_sa_in = VppIpsecSA(
1468 p.crypt_algo_vpp_id,
1470 self.vpp_esp_protocol,
1471 self.pg0.remote_ip4,
1474 p.tun_sa_in.add_vpp_config()
1476 p.tun_if = VppGreInterface(
1479 self.pg0.remote_ip4,
1480 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1482 p.tun_if.add_vpp_config()
1484 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1486 p.tun_protect.add_vpp_config()
1489 p.tun_if.config_ip4()
1490 config_tun_params(p, self.encryption_type, p.tun_if)
1492 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1493 VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1495 self.vapi.cli("clear ipsec sa")
1498 p = self.ipv4_params
1499 p.tun_if.unconfig_ip4()
1500 super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1501 self.pg1_11.admin_down()
1502 self.pg1_11.remove_vpp_config()
1505 class TestIpsecGreTebIfEspTra(TemplateIpsec, IpsecTun4Tests):
1506 """Ipsec GRE TEB ESP - Tra tests"""
1508 tun4_encrypt_node_name = "esp4-encrypt-tun"
1509 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1510 encryption_type = ESP
1511 omac = "00:11:22:33:44:55"
1513 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1515 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1517 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1519 / Ether(dst=self.omac)
1520 / IP(src="1.1.1.1", dst="1.1.1.2")
1521 / UDP(sport=1144, dport=2233)
1522 / Raw(b"X" * payload_size)
1524 for i in range(count)
1527 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1529 Ether(dst=self.omac)
1530 / IP(src="1.1.1.1", dst="1.1.1.2")
1531 / UDP(sport=1144, dport=2233)
1532 / Raw(b"X" * payload_size)
1533 for i in range(count)
1536 def verify_decrypted(self, p, rxs):
1538 self.assert_equal(rx[Ether].dst, self.omac)
1539 self.assert_equal(rx[IP].dst, "1.1.1.2")
1541 def verify_encrypted(self, p, sa, rxs):
1544 pkt = sa.decrypt(rx[IP])
1545 if not pkt.haslayer(IP):
1546 pkt = IP(pkt[Raw].load)
1547 self.assert_packet_checksums_valid(pkt)
1548 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1549 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1550 self.assertTrue(pkt.haslayer(GRE))
1552 self.assertEqual(e[Ether].dst, self.omac)
1553 self.assertEqual(e[IP].dst, "1.1.1.2")
1554 except (IndexError, AssertionError):
1555 self.logger.debug(ppp("Unexpected packet:", rx))
1557 self.logger.debug(ppp("Decrypted packet:", pkt))
1563 super(TestIpsecGreTebIfEspTra, self).setUp()
1565 self.tun_if = self.pg0
1567 p = self.ipv4_params
1569 bd1 = VppBridgeDomain(self, 1)
1570 bd1.add_vpp_config()
1572 p.tun_sa_out = VppIpsecSA(
1578 p.crypt_algo_vpp_id,
1580 self.vpp_esp_protocol,
1582 p.tun_sa_out.add_vpp_config()
1584 p.tun_sa_in = VppIpsecSA(
1590 p.crypt_algo_vpp_id,
1592 self.vpp_esp_protocol,
1594 p.tun_sa_in.add_vpp_config()
1596 p.tun_if = VppGreInterface(
1599 self.pg0.remote_ip4,
1600 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1602 p.tun_if.add_vpp_config()
1604 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1606 p.tun_protect.add_vpp_config()
1609 p.tun_if.config_ip4()
1610 config_tra_params(p, self.encryption_type, p.tun_if)
1612 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1613 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1615 self.vapi.cli("clear ipsec sa")
1618 p = self.ipv4_params
1619 p.tun_if.unconfig_ip4()
1620 super(TestIpsecGreTebIfEspTra, self).tearDown()
1623 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec, IpsecTun4Tests):
1624 """Ipsec GRE TEB UDP ESP - Tra tests"""
1626 tun4_encrypt_node_name = "esp4-encrypt-tun"
1627 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1628 encryption_type = ESP
1629 omac = "00:11:22:33:44:55"
1631 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1633 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1635 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1637 / Ether(dst=self.omac)
1638 / IP(src="1.1.1.1", dst="1.1.1.2")
1639 / UDP(sport=1144, dport=2233)
1640 / Raw(b"X" * payload_size)
1642 for i in range(count)
1645 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1647 Ether(dst=self.omac)
1648 / IP(src="1.1.1.1", dst="1.1.1.2")
1649 / UDP(sport=1144, dport=2233)
1650 / Raw(b"X" * payload_size)
1651 for i in range(count)
1654 def verify_decrypted(self, p, rxs):
1656 self.assert_equal(rx[Ether].dst, self.omac)
1657 self.assert_equal(rx[IP].dst, "1.1.1.2")
1659 def verify_encrypted(self, p, sa, rxs):
1661 self.assertTrue(rx.haslayer(UDP))
1662 self.assertEqual(rx[UDP].dport, 4545)
1663 self.assertEqual(rx[UDP].sport, 5454)
1665 pkt = sa.decrypt(rx[IP])
1666 if not pkt.haslayer(IP):
1667 pkt = IP(pkt[Raw].load)
1668 self.assert_packet_checksums_valid(pkt)
1669 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1670 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1671 self.assertTrue(pkt.haslayer(GRE))
1673 self.assertEqual(e[Ether].dst, self.omac)
1674 self.assertEqual(e[IP].dst, "1.1.1.2")
1675 except (IndexError, AssertionError):
1676 self.logger.debug(ppp("Unexpected packet:", rx))
1678 self.logger.debug(ppp("Decrypted packet:", pkt))
1684 super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1686 self.tun_if = self.pg0
1688 p = self.ipv4_params
1689 p = self.ipv4_params
1690 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
1691 p.nat_header = UDP(sport=5454, dport=4545)
1693 bd1 = VppBridgeDomain(self, 1)
1694 bd1.add_vpp_config()
1696 p.tun_sa_out = VppIpsecSA(
1702 p.crypt_algo_vpp_id,
1704 self.vpp_esp_protocol,
1709 p.tun_sa_out.add_vpp_config()
1711 p.tun_sa_in = VppIpsecSA(
1717 p.crypt_algo_vpp_id,
1719 self.vpp_esp_protocol,
1721 p.flags | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND
1726 p.tun_sa_in.add_vpp_config()
1728 p.tun_if = VppGreInterface(
1731 self.pg0.remote_ip4,
1732 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1734 p.tun_if.add_vpp_config()
1736 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1738 p.tun_protect.add_vpp_config()
1741 p.tun_if.config_ip4()
1742 config_tra_params(p, self.encryption_type, p.tun_if)
1744 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1745 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1747 self.vapi.cli("clear ipsec sa")
1748 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1751 p = self.ipv4_params
1752 p.tun_if.unconfig_ip4()
1753 super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1756 class TestIpsecGreIfEsp(TemplateIpsec, IpsecTun4Tests):
1757 """Ipsec GRE ESP - TUN tests"""
1759 tun4_encrypt_node_name = "esp4-encrypt-tun"
1760 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1761 encryption_type = ESP
1763 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1765 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1767 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1769 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1770 / UDP(sport=1144, dport=2233)
1771 / Raw(b"X" * payload_size)
1773 for i in range(count)
1776 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1778 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1779 / IP(src="1.1.1.1", dst="1.1.1.2")
1780 / UDP(sport=1144, dport=2233)
1781 / Raw(b"X" * payload_size)
1782 for i in range(count)
1785 def verify_decrypted(self, p, rxs):
1787 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1788 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1790 def verify_encrypted(self, p, sa, rxs):
1793 pkt = sa.decrypt(rx[IP])
1794 if not pkt.haslayer(IP):
1795 pkt = IP(pkt[Raw].load)
1796 self.assert_packet_checksums_valid(pkt)
1797 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1798 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1799 self.assertTrue(pkt.haslayer(GRE))
1801 self.assertEqual(e[IP].dst, "1.1.1.2")
1802 except (IndexError, AssertionError):
1803 self.logger.debug(ppp("Unexpected packet:", rx))
1805 self.logger.debug(ppp("Decrypted packet:", pkt))
1811 super(TestIpsecGreIfEsp, self).setUp()
1813 self.tun_if = self.pg0
1815 p = self.ipv4_params
1817 bd1 = VppBridgeDomain(self, 1)
1818 bd1.add_vpp_config()
1820 p.tun_sa_out = VppIpsecSA(
1826 p.crypt_algo_vpp_id,
1828 self.vpp_esp_protocol,
1830 self.pg0.remote_ip4,
1832 p.tun_sa_out.add_vpp_config()
1834 p.tun_sa_in = VppIpsecSA(
1840 p.crypt_algo_vpp_id,
1842 self.vpp_esp_protocol,
1843 self.pg0.remote_ip4,
1846 p.tun_sa_in.add_vpp_config()
1848 p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1849 p.tun_if.add_vpp_config()
1851 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1852 p.tun_protect.add_vpp_config()
1855 p.tun_if.config_ip4()
1856 config_tun_params(p, self.encryption_type, p.tun_if)
1859 self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1863 p = self.ipv4_params
1864 p.tun_if.unconfig_ip4()
1865 super(TestIpsecGreIfEsp, self).tearDown()
1868 class TestIpsecGreIfEspTra(TemplateIpsec, IpsecTun4Tests):
1869 """Ipsec GRE ESP - TRA tests"""
1871 tun4_encrypt_node_name = "esp4-encrypt-tun"
1872 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1873 encryption_type = ESP
1875 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1877 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1879 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1881 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1882 / UDP(sport=1144, dport=2233)
1883 / Raw(b"X" * payload_size)
1885 for i in range(count)
1888 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1, payload_size=100):
1890 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1892 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1894 / UDP(sport=1144, dport=2233)
1895 / Raw(b"X" * payload_size)
1897 for i in range(count)
1900 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1902 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1903 / IP(src="1.1.1.1", dst="1.1.1.2")
1904 / UDP(sport=1144, dport=2233)
1905 / Raw(b"X" * payload_size)
1906 for i in range(count)
1909 def verify_decrypted(self, p, rxs):
1911 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1912 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1914 def verify_encrypted(self, p, sa, rxs):
1917 pkt = sa.decrypt(rx[IP])
1918 if not pkt.haslayer(IP):
1919 pkt = IP(pkt[Raw].load)
1920 self.assert_packet_checksums_valid(pkt)
1921 self.assertTrue(pkt.haslayer(GRE))
1923 self.assertEqual(e[IP].dst, "1.1.1.2")
1924 except (IndexError, AssertionError):
1925 self.logger.debug(ppp("Unexpected packet:", rx))
1927 self.logger.debug(ppp("Decrypted packet:", pkt))
1933 super(TestIpsecGreIfEspTra, self).setUp()
1935 self.tun_if = self.pg0
1937 p = self.ipv4_params
1939 p.tun_sa_out = VppIpsecSA(
1945 p.crypt_algo_vpp_id,
1947 self.vpp_esp_protocol,
1949 p.tun_sa_out.add_vpp_config()
1951 p.tun_sa_in = VppIpsecSA(
1957 p.crypt_algo_vpp_id,
1959 self.vpp_esp_protocol,
1961 p.tun_sa_in.add_vpp_config()
1963 p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1964 p.tun_if.add_vpp_config()
1966 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1967 p.tun_protect.add_vpp_config()
1970 p.tun_if.config_ip4()
1971 config_tra_params(p, self.encryption_type, p.tun_if)
1974 self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1978 p = self.ipv4_params
1979 p.tun_if.unconfig_ip4()
1980 super(TestIpsecGreIfEspTra, self).tearDown()
1982 def test_gre_non_ip(self):
1983 p = self.ipv4_params
1984 tx = self.gen_encrypt_non_ip_pkts(
1987 src=p.remote_tun_if_host,
1988 dst=self.pg1.remote_ip6,
1990 self.send_and_assert_no_replies(self.tun_if, tx)
1991 node_name = "/err/%s/unsup_payload" % self.tun4_decrypt_node_name[0]
1992 self.assertEqual(1, self.statistics.get_err_counter(node_name))
1993 err = p.tun_sa_in.get_err("unsup_payload")
1994 self.assertEqual(err, 1)
1997 class TestIpsecGre6IfEspTra(TemplateIpsec, IpsecTun6Tests):
1998 """Ipsec GRE ESP - TRA tests"""
2000 tun6_encrypt_node_name = "esp6-encrypt-tun"
2001 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2002 encryption_type = ESP
2004 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2006 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2008 IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
2010 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2011 / UDP(sport=1144, dport=2233)
2012 / Raw(b"X" * payload_size)
2014 for i in range(count)
2017 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2019 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2020 / IPv6(src="1::1", dst="1::2")
2021 / UDP(sport=1144, dport=2233)
2022 / Raw(b"X" * payload_size)
2023 for i in range(count)
2026 def verify_decrypted6(self, p, rxs):
2028 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2029 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2031 def verify_encrypted6(self, p, sa, rxs):
2034 pkt = sa.decrypt(rx[IPv6])
2035 if not pkt.haslayer(IPv6):
2036 pkt = IPv6(pkt[Raw].load)
2037 self.assert_packet_checksums_valid(pkt)
2038 self.assertTrue(pkt.haslayer(GRE))
2040 self.assertEqual(e[IPv6].dst, "1::2")
2041 except (IndexError, AssertionError):
2042 self.logger.debug(ppp("Unexpected packet:", rx))
2044 self.logger.debug(ppp("Decrypted packet:", pkt))
2050 super(TestIpsecGre6IfEspTra, self).setUp()
2052 self.tun_if = self.pg0
2054 p = self.ipv6_params
2056 bd1 = VppBridgeDomain(self, 1)
2057 bd1.add_vpp_config()
2059 p.tun_sa_out = VppIpsecSA(
2065 p.crypt_algo_vpp_id,
2067 self.vpp_esp_protocol,
2069 p.tun_sa_out.add_vpp_config()
2071 p.tun_sa_in = VppIpsecSA(
2077 p.crypt_algo_vpp_id,
2079 self.vpp_esp_protocol,
2081 p.tun_sa_in.add_vpp_config()
2083 p.tun_if = VppGreInterface(self, self.pg0.local_ip6, self.pg0.remote_ip6)
2084 p.tun_if.add_vpp_config()
2086 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2087 p.tun_protect.add_vpp_config()
2090 p.tun_if.config_ip6()
2091 config_tra_params(p, self.encryption_type, p.tun_if)
2099 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
2106 p = self.ipv6_params
2107 p.tun_if.unconfig_ip6()
2108 super(TestIpsecGre6IfEspTra, self).tearDown()
2111 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
2112 """Ipsec mGRE ESP v4 TRA tests"""
2114 tun4_encrypt_node_name = "esp4-encrypt-tun"
2115 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2116 encryption_type = ESP
2118 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2120 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2122 IP(src=p.tun_dst, dst=self.pg0.local_ip4)
2124 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
2125 / UDP(sport=1144, dport=2233)
2126 / Raw(b"X" * payload_size)
2128 for i in range(count)
2131 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2133 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2134 / IP(src="1.1.1.1", dst=dst)
2135 / UDP(sport=1144, dport=2233)
2136 / Raw(b"X" * payload_size)
2137 for i in range(count)
2140 def verify_decrypted(self, p, rxs):
2142 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2143 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2145 def verify_encrypted(self, p, sa, rxs):
2148 pkt = sa.decrypt(rx[IP])
2149 if not pkt.haslayer(IP):
2150 pkt = IP(pkt[Raw].load)
2151 self.assert_packet_checksums_valid(pkt)
2152 self.assertTrue(pkt.haslayer(GRE))
2154 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2155 except (IndexError, AssertionError):
2156 self.logger.debug(ppp("Unexpected packet:", rx))
2158 self.logger.debug(ppp("Decrypted packet:", pkt))
2164 super(TestIpsecMGreIfEspTra4, self).setUp()
2167 self.tun_if = self.pg0
2168 p = self.ipv4_params
2169 p.tun_if = VppGreInterface(
2173 mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2175 p.tun_if.add_vpp_config()
2177 p.tun_if.config_ip4()
2178 p.tun_if.generate_remote_hosts(N_NHS)
2179 self.pg0.generate_remote_hosts(N_NHS)
2180 self.pg0.configure_ipv4_neighbors()
2182 # setup some SAs for several next-hops on the interface
2183 self.multi_params = []
2185 for ii in range(N_NHS):
2186 p = copy.copy(self.ipv4_params)
2188 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2189 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2190 p.scapy_tun_spi = p.scapy_tun_spi + ii
2191 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2192 p.vpp_tun_spi = p.vpp_tun_spi + ii
2194 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2195 p.scapy_tra_spi = p.scapy_tra_spi + ii
2196 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2197 p.vpp_tra_spi = p.vpp_tra_spi + ii
2198 p.tun_sa_out = VppIpsecSA(
2204 p.crypt_algo_vpp_id,
2206 self.vpp_esp_protocol,
2208 p.tun_sa_out.add_vpp_config()
2210 p.tun_sa_in = VppIpsecSA(
2216 p.crypt_algo_vpp_id,
2218 self.vpp_esp_protocol,
2220 p.tun_sa_in.add_vpp_config()
2222 p.tun_protect = VppIpsecTunProtect(
2227 nh=p.tun_if.remote_hosts[ii].ip4,
2229 p.tun_protect.add_vpp_config()
2230 config_tra_params(p, self.encryption_type, p.tun_if)
2231 self.multi_params.append(p)
2235 p.remote_tun_if_host,
2237 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
2240 # in this v4 variant add the teibs after the protect
2244 p.tun_if.remote_hosts[ii].ip4,
2245 self.pg0.remote_hosts[ii].ip4,
2247 p.tun_dst = self.pg0.remote_hosts[ii].ip4
2248 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2251 p = self.ipv4_params
2252 p.tun_if.unconfig_ip4()
2253 super(TestIpsecMGreIfEspTra4, self).tearDown()
2255 def test_tun_44(self):
2258 for p in self.multi_params:
2259 self.verify_tun_44(p, count=N_PKTS)
2260 p.teib.remove_vpp_config()
2261 self.verify_tun_dropped_44(p, count=N_PKTS)
2262 p.teib.add_vpp_config()
2263 self.verify_tun_44(p, count=N_PKTS)
2266 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
2267 """Ipsec mGRE ESP v6 TRA tests"""
2269 tun6_encrypt_node_name = "esp6-encrypt-tun"
2270 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2271 encryption_type = ESP
2273 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2275 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2277 IPv6(src=p.tun_dst, dst=self.pg0.local_ip6)
2279 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2280 / UDP(sport=1144, dport=2233)
2281 / Raw(b"X" * payload_size)
2283 for i in range(count)
2286 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2288 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2289 / IPv6(src="1::1", dst=dst)
2290 / UDP(sport=1144, dport=2233)
2291 / Raw(b"X" * payload_size)
2292 for i in range(count)
2295 def verify_decrypted6(self, p, rxs):
2297 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2298 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2300 def verify_encrypted6(self, p, sa, rxs):
2303 pkt = sa.decrypt(rx[IPv6])
2304 if not pkt.haslayer(IPv6):
2305 pkt = IPv6(pkt[Raw].load)
2306 self.assert_packet_checksums_valid(pkt)
2307 self.assertTrue(pkt.haslayer(GRE))
2309 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
2310 except (IndexError, AssertionError):
2311 self.logger.debug(ppp("Unexpected packet:", rx))
2313 self.logger.debug(ppp("Decrypted packet:", pkt))
2319 super(TestIpsecMGreIfEspTra6, self).setUp()
2321 self.vapi.cli("set logging class ipsec level debug")
2324 self.tun_if = self.pg0
2325 p = self.ipv6_params
2326 p.tun_if = VppGreInterface(
2330 mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2332 p.tun_if.add_vpp_config()
2334 p.tun_if.config_ip6()
2335 p.tun_if.generate_remote_hosts(N_NHS)
2336 self.pg0.generate_remote_hosts(N_NHS)
2337 self.pg0.configure_ipv6_neighbors()
2339 # setup some SAs for several next-hops on the interface
2340 self.multi_params = []
2342 for ii in range(N_NHS):
2343 p = copy.copy(self.ipv6_params)
2345 p.remote_tun_if_host = "1::%d" % (ii + 1)
2346 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2347 p.scapy_tun_spi = p.scapy_tun_spi + ii
2348 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2349 p.vpp_tun_spi = p.vpp_tun_spi + ii
2351 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2352 p.scapy_tra_spi = p.scapy_tra_spi + ii
2353 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2354 p.vpp_tra_spi = p.vpp_tra_spi + ii
2355 p.tun_sa_out = VppIpsecSA(
2361 p.crypt_algo_vpp_id,
2363 self.vpp_esp_protocol,
2365 p.tun_sa_out.add_vpp_config()
2367 p.tun_sa_in = VppIpsecSA(
2373 p.crypt_algo_vpp_id,
2375 self.vpp_esp_protocol,
2377 p.tun_sa_in.add_vpp_config()
2379 # in this v6 variant add the teibs first then the protection
2380 p.tun_dst = self.pg0.remote_hosts[ii].ip6
2382 self, p.tun_if, p.tun_if.remote_hosts[ii].ip6, p.tun_dst
2385 p.tun_protect = VppIpsecTunProtect(
2390 nh=p.tun_if.remote_hosts[ii].ip6,
2392 p.tun_protect.add_vpp_config()
2393 config_tra_params(p, self.encryption_type, p.tun_if)
2394 self.multi_params.append(p)
2398 p.remote_tun_if_host,
2400 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6, p.tun_if.sw_if_index)],
2402 p.tun_dst = self.pg0.remote_hosts[ii].ip6
2404 self.logger.info(self.vapi.cli("sh log"))
2405 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2406 self.logger.info(self.vapi.cli("sh adj 41"))
2409 p = self.ipv6_params
2410 p.tun_if.unconfig_ip6()
2411 super(TestIpsecMGreIfEspTra6, self).tearDown()
2413 def test_tun_66(self):
2415 for p in self.multi_params:
2416 self.verify_tun_66(p, count=63)
2419 @tag_fixme_vpp_workers
2420 class TestIpsec4TunProtect(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2421 """IPsec IPv4 Tunnel protect - transport mode"""
2424 super(TestIpsec4TunProtect, self).setUp()
2426 self.tun_if = self.pg0
2429 super(TestIpsec4TunProtect, self).tearDown()
2431 def test_tun_44(self):
2432 """IPSEC tunnel protect"""
2434 p = self.ipv4_params
2436 self.config_network(p)
2437 self.config_sa_tra(p)
2438 self.config_protect(p)
2440 self.verify_tun_44(p, count=127)
2441 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2442 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2444 self.vapi.cli("clear ipsec sa")
2445 self.verify_tun_64(p, count=127)
2446 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2447 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2449 # rekey - create new SAs and update the tunnel protection
2451 np.crypt_key = b"X" + p.crypt_key[1:]
2452 np.scapy_tun_spi += 100
2453 np.scapy_tun_sa_id += 1
2454 np.vpp_tun_spi += 100
2455 np.vpp_tun_sa_id += 1
2456 np.tun_if.local_spi = p.vpp_tun_spi
2457 np.tun_if.remote_spi = p.scapy_tun_spi
2459 self.config_sa_tra(np)
2460 self.config_protect(np)
2463 self.verify_tun_44(np, count=127)
2464 self.assertEqual(p.tun_if.get_rx_stats(), 381)
2465 self.assertEqual(p.tun_if.get_tx_stats(), 381)
2468 self.unconfig_protect(np)
2469 self.unconfig_sa(np)
2470 self.unconfig_network(p)
2473 @tag_fixme_vpp_workers
2474 class TestIpsec4TunProtectUdp(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2475 """IPsec IPv4 Tunnel protect - transport mode"""
2478 super(TestIpsec4TunProtectUdp, self).setUp()
2480 self.tun_if = self.pg0
2482 p = self.ipv4_params
2483 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
2484 p.nat_header = UDP(sport=4500, dport=4500)
2485 self.config_network(p)
2486 self.config_sa_tra(p)
2487 self.config_protect(p)
2490 p = self.ipv4_params
2491 self.unconfig_protect(p)
2493 self.unconfig_network(p)
2494 super(TestIpsec4TunProtectUdp, self).tearDown()
2496 def verify_encrypted(self, p, sa, rxs):
2497 # ensure encrypted packets are recieved with the default UDP ports
2499 self.assertEqual(rx[UDP].sport, 4500)
2500 self.assertEqual(rx[UDP].dport, 4500)
2501 super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2503 def test_tun_44(self):
2504 """IPSEC UDP tunnel protect"""
2506 p = self.ipv4_params
2508 self.verify_tun_44(p, count=127)
2509 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2510 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2512 def test_keepalive(self):
2513 """IPSEC NAT Keepalive"""
2514 self.verify_keepalive(self.ipv4_params)
2517 @tag_fixme_vpp_workers
2518 class TestIpsec4TunProtectTun(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2519 """IPsec IPv4 Tunnel protect - tunnel mode"""
2521 encryption_type = ESP
2522 tun4_encrypt_node_name = "esp4-encrypt-tun"
2523 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2526 super(TestIpsec4TunProtectTun, self).setUp()
2528 self.tun_if = self.pg0
2531 super(TestIpsec4TunProtectTun, self).tearDown()
2533 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2535 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2537 IP(src=sw_intf.remote_ip4, dst=sw_intf.local_ip4)
2538 / IP(src=src, dst=dst)
2539 / UDP(sport=1144, dport=2233)
2540 / Raw(b"X" * payload_size)
2542 for i in range(count)
2545 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2547 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2548 / IP(src=src, dst=dst)
2549 / UDP(sport=1144, dport=2233)
2550 / Raw(b"X" * payload_size)
2551 for i in range(count)
2554 def verify_decrypted(self, p, rxs):
2556 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2557 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2558 self.assert_packet_checksums_valid(rx)
2560 def verify_encrypted(self, p, sa, rxs):
2563 pkt = sa.decrypt(rx[IP])
2564 if not pkt.haslayer(IP):
2565 pkt = IP(pkt[Raw].load)
2566 self.assert_packet_checksums_valid(pkt)
2567 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2568 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2569 inner = pkt[IP].payload
2570 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2572 except (IndexError, AssertionError):
2573 self.logger.debug(ppp("Unexpected packet:", rx))
2575 self.logger.debug(ppp("Decrypted packet:", pkt))
2580 def test_tun_44(self):
2581 """IPSEC tunnel protect"""
2583 p = self.ipv4_params
2585 self.config_network(p)
2586 self.config_sa_tun(p)
2587 self.config_protect(p)
2589 # also add an output features on the tunnel and physical interface
2590 # so we test they still work
2591 r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
2592 a = VppAcl(self, [r_all]).add_vpp_config()
2594 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2595 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2597 self.verify_tun_44(p, count=127)
2599 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2600 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2602 # rekey - create new SAs and update the tunnel protection
2604 np.crypt_key = b"X" + p.crypt_key[1:]
2605 np.scapy_tun_spi += 100
2606 np.scapy_tun_sa_id += 1
2607 np.vpp_tun_spi += 100
2608 np.vpp_tun_sa_id += 1
2609 np.tun_if.local_spi = p.vpp_tun_spi
2610 np.tun_if.remote_spi = p.scapy_tun_spi
2612 self.config_sa_tun(np)
2613 self.config_protect(np)
2616 self.verify_tun_44(np, count=127)
2617 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2618 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2621 self.unconfig_protect(np)
2622 self.unconfig_sa(np)
2623 self.unconfig_network(p)
2626 class TestIpsec4TunProtectTunDrop(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2627 """IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2629 encryption_type = ESP
2630 tun4_encrypt_node_name = "esp4-encrypt-tun"
2631 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2634 super(TestIpsec4TunProtectTunDrop, self).setUp()
2636 self.tun_if = self.pg0
2639 super(TestIpsec4TunProtectTunDrop, self).tearDown()
2641 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2643 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2645 IP(src=sw_intf.remote_ip4, dst="5.5.5.5")
2646 / IP(src=src, dst=dst)
2647 / UDP(sport=1144, dport=2233)
2648 / Raw(b"X" * payload_size)
2650 for i in range(count)
2653 def test_tun_drop_44(self):
2654 """IPSEC tunnel protect bogus tunnel header"""
2656 p = self.ipv4_params
2658 self.config_network(p)
2659 self.config_sa_tun(p)
2660 self.config_protect(p)
2662 tx = self.gen_encrypt_pkts(
2666 src=p.remote_tun_if_host,
2667 dst=self.pg1.remote_ip4,
2670 self.send_and_assert_no_replies(self.tun_if, tx)
2673 self.unconfig_protect(p)
2675 self.unconfig_network(p)
2678 @tag_fixme_vpp_workers
2679 class TestIpsec6TunProtect(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2680 """IPsec IPv6 Tunnel protect - transport mode"""
2682 encryption_type = ESP
2683 tun6_encrypt_node_name = "esp6-encrypt-tun"
2684 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2687 super(TestIpsec6TunProtect, self).setUp()
2689 self.tun_if = self.pg0
2692 super(TestIpsec6TunProtect, self).tearDown()
2694 def test_tun_66(self):
2695 """IPSEC tunnel protect 6o6"""
2697 p = self.ipv6_params
2699 self.config_network(p)
2700 self.config_sa_tra(p)
2701 self.config_protect(p)
2703 self.verify_tun_66(p, count=127)
2704 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2705 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2707 # rekey - create new SAs and update the tunnel protection
2709 np.crypt_key = b"X" + p.crypt_key[1:]
2710 np.scapy_tun_spi += 100
2711 np.scapy_tun_sa_id += 1
2712 np.vpp_tun_spi += 100
2713 np.vpp_tun_sa_id += 1
2714 np.tun_if.local_spi = p.vpp_tun_spi
2715 np.tun_if.remote_spi = p.scapy_tun_spi
2717 self.config_sa_tra(np)
2718 self.config_protect(np)
2721 self.verify_tun_66(np, count=127)
2722 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2723 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2725 # bounce the interface state
2726 p.tun_if.admin_down()
2727 self.verify_drop_tun_66(np, count=127)
2728 node = "/err/ipsec6-tun-input/disabled"
2729 self.assertEqual(127, self.statistics.get_err_counter(node))
2731 self.verify_tun_66(np, count=127)
2734 # 1) add two input SAs [old, new]
2735 # 2) swap output SA to [new]
2736 # 3) use only [new] input SA
2738 np3.crypt_key = b"Z" + p.crypt_key[1:]
2739 np3.scapy_tun_spi += 100
2740 np3.scapy_tun_sa_id += 1
2741 np3.vpp_tun_spi += 100
2742 np3.vpp_tun_sa_id += 1
2743 np3.tun_if.local_spi = p.vpp_tun_spi
2744 np3.tun_if.remote_spi = p.scapy_tun_spi
2746 self.config_sa_tra(np3)
2749 p.tun_protect.update_vpp_config(np.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2750 self.verify_tun_66(np, np, count=127)
2751 self.verify_tun_66(np3, np, count=127)
2754 p.tun_protect.update_vpp_config(np3.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2755 self.verify_tun_66(np, np3, count=127)
2756 self.verify_tun_66(np3, np3, count=127)
2759 p.tun_protect.update_vpp_config(np3.tun_sa_out, [np3.tun_sa_in])
2760 self.verify_tun_66(np3, np3, count=127)
2761 self.verify_drop_tun_rx_66(np, count=127)
2763 self.assertEqual(p.tun_if.get_rx_stats(), 127 * 9)
2764 self.assertEqual(p.tun_if.get_tx_stats(), 127 * 8)
2765 self.unconfig_sa(np)
2768 self.unconfig_protect(np3)
2769 self.unconfig_sa(np3)
2770 self.unconfig_network(p)
2772 def test_tun_46(self):
2773 """IPSEC tunnel protect 4o6"""
2775 p = self.ipv6_params
2777 self.config_network(p)
2778 self.config_sa_tra(p)
2779 self.config_protect(p)
2781 self.verify_tun_46(p, count=127)
2782 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2783 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2786 self.unconfig_protect(p)
2788 self.unconfig_network(p)
2791 @tag_fixme_vpp_workers
2792 class TestIpsec6TunProtectTun(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2793 """IPsec IPv6 Tunnel protect - tunnel mode"""
2795 encryption_type = ESP
2796 tun6_encrypt_node_name = "esp6-encrypt-tun"
2797 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2800 super(TestIpsec6TunProtectTun, self).setUp()
2802 self.tun_if = self.pg0
2805 super(TestIpsec6TunProtectTun, self).tearDown()
2807 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2809 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2811 IPv6(src=sw_intf.remote_ip6, dst=sw_intf.local_ip6)
2812 / IPv6(src=src, dst=dst)
2813 / UDP(sport=1166, dport=2233)
2814 / Raw(b"X" * payload_size)
2816 for i in range(count)
2819 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2821 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2822 / IPv6(src=src, dst=dst)
2823 / UDP(sport=1166, dport=2233)
2824 / Raw(b"X" * payload_size)
2825 for i in range(count)
2828 def verify_decrypted6(self, p, rxs):
2830 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2831 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2832 self.assert_packet_checksums_valid(rx)
2834 def verify_encrypted6(self, p, sa, rxs):
2837 pkt = sa.decrypt(rx[IPv6])
2838 if not pkt.haslayer(IPv6):
2839 pkt = IPv6(pkt[Raw].load)
2840 self.assert_packet_checksums_valid(pkt)
2841 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2842 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2843 inner = pkt[IPv6].payload
2844 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2846 except (IndexError, AssertionError):
2847 self.logger.debug(ppp("Unexpected packet:", rx))
2849 self.logger.debug(ppp("Decrypted packet:", pkt))
2854 def test_tun_66(self):
2855 """IPSEC tunnel protect"""
2857 p = self.ipv6_params
2859 self.config_network(p)
2860 self.config_sa_tun(p)
2861 self.config_protect(p)
2863 self.verify_tun_66(p, count=127)
2865 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2866 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2868 # rekey - create new SAs and update the tunnel protection
2870 np.crypt_key = b"X" + p.crypt_key[1:]
2871 np.scapy_tun_spi += 100
2872 np.scapy_tun_sa_id += 1
2873 np.vpp_tun_spi += 100
2874 np.vpp_tun_sa_id += 1
2875 np.tun_if.local_spi = p.vpp_tun_spi
2876 np.tun_if.remote_spi = p.scapy_tun_spi
2878 self.config_sa_tun(np)
2879 self.config_protect(np)
2882 self.verify_tun_66(np, count=127)
2883 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2884 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2887 self.unconfig_protect(np)
2888 self.unconfig_sa(np)
2889 self.unconfig_network(p)
2892 class TestIpsec6TunProtectTunDrop(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2893 """IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2895 encryption_type = ESP
2896 tun6_encrypt_node_name = "esp6-encrypt-tun"
2897 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2900 super(TestIpsec6TunProtectTunDrop, self).setUp()
2902 self.tun_if = self.pg0
2905 super(TestIpsec6TunProtectTunDrop, self).tearDown()
2907 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2908 # the IP destination of the revelaed packet does not match
2909 # that assigned to the tunnel
2911 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2913 IPv6(src=sw_intf.remote_ip6, dst="5::5")
2914 / IPv6(src=src, dst=dst)
2915 / UDP(sport=1144, dport=2233)
2916 / Raw(b"X" * payload_size)
2918 for i in range(count)
2921 def test_tun_drop_66(self):
2922 """IPSEC 6 tunnel protect bogus tunnel header"""
2924 p = self.ipv6_params
2926 self.config_network(p)
2927 self.config_sa_tun(p)
2928 self.config_protect(p)
2930 tx = self.gen_encrypt_pkts6(
2934 src=p.remote_tun_if_host,
2935 dst=self.pg1.remote_ip6,
2938 self.send_and_assert_no_replies(self.tun_if, tx)
2940 self.unconfig_protect(p)
2942 self.unconfig_network(p)
2945 class TemplateIpsecItf4(object):
2946 """IPsec Interface IPv4"""
2948 encryption_type = ESP
2949 tun4_encrypt_node_name = "esp4-encrypt-tun"
2950 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2951 tun4_input_node = "ipsec4-tun-input"
2953 def config_sa_tun(self, p, src, dst):
2954 config_tun_params(p, self.encryption_type, None, src, dst)
2956 p.tun_sa_out = VppIpsecSA(
2962 p.crypt_algo_vpp_id,
2964 self.vpp_esp_protocol,
2969 p.tun_sa_out.add_vpp_config()
2971 p.tun_sa_in = VppIpsecSA(
2977 p.crypt_algo_vpp_id,
2979 self.vpp_esp_protocol,
2983 | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
2985 p.tun_sa_in.add_vpp_config()
2987 def config_protect(self, p):
2988 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2989 p.tun_protect.add_vpp_config()
2991 def config_network(self, p, instance=0xFFFFFFFF):
2992 p.tun_if = VppIpsecInterface(self, instance=instance)
2994 p.tun_if.add_vpp_config()
2996 p.tun_if.config_ip4()
2997 p.tun_if.config_ip6()
2999 p.route = VppIpRoute(
3001 p.remote_tun_if_host,
3003 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3005 p.route.add_vpp_config()
3008 p.remote_tun_if_host6,
3012 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3018 def unconfig_network(self, p):
3019 p.route.remove_vpp_config()
3020 p.tun_if.remove_vpp_config()
3022 def unconfig_protect(self, p):
3023 p.tun_protect.remove_vpp_config()
3025 def unconfig_sa(self, p):
3026 p.tun_sa_out.remove_vpp_config()
3027 p.tun_sa_in.remove_vpp_config()
3030 @tag_fixme_vpp_workers
3031 class TestIpsecItf4(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3032 """IPsec Interface IPv4"""
3035 super(TestIpsecItf4, self).setUp()
3037 self.tun_if = self.pg0
3040 super(TestIpsecItf4, self).tearDown()
3042 def test_tun_instance_44(self):
3043 p = self.ipv4_params
3044 self.config_network(p, instance=3)
3046 with self.assertRaises(CliFailedCommandError):
3047 self.vapi.cli("show interface ipsec0")
3049 output = self.vapi.cli("show interface ipsec3")
3050 self.assertTrue("unknown" not in output)
3052 self.unconfig_network(p)
3054 def test_tun_44(self):
3055 """IPSEC interface IPv4"""
3058 p = self.ipv4_params
3060 self.config_network(p)
3062 p, self.encryption_type, None, self.pg0.local_ip4, self.pg0.remote_ip4
3064 self.verify_tun_dropped_44(p, count=n_pkts)
3065 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3066 self.config_protect(p)
3068 self.verify_tun_44(p, count=n_pkts)
3069 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3070 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3072 p.tun_if.admin_down()
3073 self.verify_tun_dropped_44(p, count=n_pkts)
3075 self.verify_tun_44(p, count=n_pkts)
3077 self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3078 self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3080 # it's a v6 packet when its encrypted
3081 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
3083 self.verify_tun_64(p, count=n_pkts)
3084 self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3085 self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3087 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
3089 # update the SA tunnel
3091 p, self.encryption_type, None, self.pg2.local_ip4, self.pg2.remote_ip4
3093 p.tun_sa_in.update_vpp_config(
3094 is_tun=True, tun_src=self.pg2.remote_ip4, tun_dst=self.pg2.local_ip4
3096 p.tun_sa_out.update_vpp_config(
3097 is_tun=True, tun_src=self.pg2.local_ip4, tun_dst=self.pg2.remote_ip4
3099 self.verify_tun_44(p, count=n_pkts)
3100 self.assertEqual(p.tun_if.get_rx_stats(), 5 * n_pkts)
3101 self.assertEqual(p.tun_if.get_tx_stats(), 4 * n_pkts)
3103 self.vapi.cli("clear interfaces")
3105 # rekey - create new SAs and update the tunnel protection
3107 np.crypt_key = b"X" + p.crypt_key[1:]
3108 np.scapy_tun_spi += 100
3109 np.scapy_tun_sa_id += 1
3110 np.vpp_tun_spi += 100
3111 np.vpp_tun_sa_id += 1
3112 np.tun_if.local_spi = p.vpp_tun_spi
3113 np.tun_if.remote_spi = p.scapy_tun_spi
3115 self.config_sa_tun(np, self.pg0.local_ip4, self.pg0.remote_ip4)
3116 self.config_protect(np)
3119 self.verify_tun_44(np, count=n_pkts)
3120 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3121 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3124 self.unconfig_protect(np)
3125 self.unconfig_sa(np)
3126 self.unconfig_network(p)
3128 def test_tun_44_null(self):
3129 """IPSEC interface IPv4 NULL auth/crypto"""
3132 p = copy.copy(self.ipv4_params)
3134 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
3135 p.crypt_algo_vpp_id = (
3136 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
3138 p.crypt_algo = "NULL"
3139 p.auth_algo = "NULL"
3141 self.config_network(p)
3142 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3143 self.config_protect(p)
3145 self.logger.info(self.vapi.cli("sh ipsec sa"))
3146 self.verify_tun_44(p, count=n_pkts)
3149 self.unconfig_protect(p)
3151 self.unconfig_network(p)
3153 def test_tun_44_police(self):
3154 """IPSEC interface IPv4 with input policer"""
3156 p = self.ipv4_params
3158 self.config_network(p)
3159 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3160 self.config_protect(p)
3162 action_tx = PolicerAction(
3163 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3165 policer = VppPolicer(
3172 conform_action=action_tx,
3173 exceed_action=action_tx,
3174 violate_action=action_tx,
3176 policer.add_vpp_config()
3178 # Start policing on tun
3179 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3181 self.verify_tun_44(p, count=n_pkts)
3182 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3183 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3185 stats = policer.get_stats()
3187 # Single rate, 2 colour policer - expect conform, violate but no exceed
3188 self.assertGreater(stats["conform_packets"], 0)
3189 self.assertEqual(stats["exceed_packets"], 0)
3190 self.assertGreater(stats["violate_packets"], 0)
3192 # Stop policing on tun
3193 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3194 self.verify_tun_44(p, count=n_pkts)
3196 # No new policer stats
3197 statsnew = policer.get_stats()
3198 self.assertEqual(stats, statsnew)
3201 policer.remove_vpp_config()
3202 self.unconfig_protect(p)
3204 self.unconfig_network(p)
3207 class TestIpsecItf4MPLS(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3208 """IPsec Interface MPLSoIPv4"""
3210 tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
3213 super(TestIpsecItf4MPLS, self).setUp()
3215 self.tun_if = self.pg0
3218 super(TestIpsecItf4MPLS, self).tearDown()
3220 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3222 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3224 MPLS(label=44, ttl=3)
3225 / IP(src=src, dst=dst)
3226 / UDP(sport=1166, dport=2233)
3227 / Raw(b"X" * payload_size)
3229 for i in range(count)
3232 def verify_encrypted(self, p, sa, rxs):
3235 pkt = sa.decrypt(rx[IP])
3236 if not pkt.haslayer(IP):
3237 pkt = IP(pkt[Raw].load)
3238 self.assert_packet_checksums_valid(pkt)
3239 self.assert_equal(pkt[MPLS].label, 44)
3240 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
3241 except (IndexError, AssertionError):
3242 self.logger.debug(ppp("Unexpected packet:", rx))
3244 self.logger.debug(ppp("Decrypted packet:", pkt))
3249 def test_tun_mpls_o_ip4(self):
3250 """IPSEC interface MPLS over IPv4"""
3253 p = self.ipv4_params
3256 tbl = VppMplsTable(self, 0)
3257 tbl.add_vpp_config()
3259 self.config_network(p)
3260 # deag MPLS routes from the tunnel
3262 self, 44, 1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
3267 p.tun_if.remote_ip4, p.tun_if.sw_if_index, labels=[VppMplsLabel(44)]
3271 p.tun_if.enable_mpls()
3273 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3274 self.config_protect(p)
3276 self.verify_tun_44(p, count=n_pkts)
3279 p.tun_if.disable_mpls()
3280 self.unconfig_protect(p)
3282 self.unconfig_network(p)
3285 class TemplateIpsecItf6(object):
3286 """IPsec Interface IPv6"""
3288 encryption_type = ESP
3289 tun6_encrypt_node_name = "esp6-encrypt-tun"
3290 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
3291 tun6_input_node = "ipsec6-tun-input"
3293 def config_sa_tun(self, p, src, dst):
3294 config_tun_params(p, self.encryption_type, None, src, dst)
3296 if not hasattr(p, "tun_flags"):
3298 if not hasattr(p, "hop_limit"):
3301 p.tun_sa_out = VppIpsecSA(
3307 p.crypt_algo_vpp_id,
3309 self.vpp_esp_protocol,
3313 tun_flags=p.tun_flags,
3314 hop_limit=p.hop_limit,
3316 p.tun_sa_out.add_vpp_config()
3318 p.tun_sa_in = VppIpsecSA(
3324 p.crypt_algo_vpp_id,
3326 self.vpp_esp_protocol,
3331 p.tun_sa_in.add_vpp_config()
3333 def config_protect(self, p):
3334 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
3335 p.tun_protect.add_vpp_config()
3337 def config_network(self, p):
3338 p.tun_if = VppIpsecInterface(self)
3340 p.tun_if.add_vpp_config()
3342 p.tun_if.config_ip4()
3343 p.tun_if.config_ip6()
3347 p.remote_tun_if_host4,
3349 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3353 p.route = VppIpRoute(
3355 p.remote_tun_if_host,
3359 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3363 p.route.add_vpp_config()
3365 def unconfig_network(self, p):
3366 p.route.remove_vpp_config()
3367 p.tun_if.remove_vpp_config()
3369 def unconfig_protect(self, p):
3370 p.tun_protect.remove_vpp_config()
3372 def unconfig_sa(self, p):
3373 p.tun_sa_out.remove_vpp_config()
3374 p.tun_sa_in.remove_vpp_config()
3377 @tag_fixme_vpp_workers
3378 class TestIpsecItf6(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3379 """IPsec Interface IPv6"""
3382 super(TestIpsecItf6, self).setUp()
3384 self.tun_if = self.pg0
3387 super(TestIpsecItf6, self).tearDown()
3389 def test_tun_66(self):
3390 """IPSEC interface IPv6"""
3392 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3394 p = self.ipv6_params
3395 p.inner_hop_limit = 24
3396 p.outer_hop_limit = 23
3397 p.outer_flow_label = 243224
3398 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3400 self.config_network(p)
3402 p, self.encryption_type, None, self.pg0.local_ip6, self.pg0.remote_ip6
3404 self.verify_drop_tun_66(p, count=n_pkts)
3405 self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3406 self.config_protect(p)
3408 self.verify_tun_66(p, count=n_pkts)
3409 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3410 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3412 p.tun_if.admin_down()
3413 self.verify_drop_tun_66(p, count=n_pkts)
3415 self.verify_tun_66(p, count=n_pkts)
3417 self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3418 self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3420 # it's a v4 packet when its encrypted
3421 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
3423 self.verify_tun_46(p, count=n_pkts)
3424 self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3425 self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3427 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
3429 self.vapi.cli("clear interfaces")
3431 # rekey - create new SAs and update the tunnel protection
3433 np.crypt_key = b"X" + p.crypt_key[1:]
3434 np.scapy_tun_spi += 100
3435 np.scapy_tun_sa_id += 1
3436 np.vpp_tun_spi += 100
3437 np.vpp_tun_sa_id += 1
3438 np.tun_if.local_spi = p.vpp_tun_spi
3439 np.tun_if.remote_spi = p.scapy_tun_spi
3440 np.inner_hop_limit = 24
3441 np.outer_hop_limit = 128
3442 np.inner_flow_label = 0xABCDE
3443 np.outer_flow_label = 0xABCDE
3445 np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
3447 self.config_sa_tun(np, self.pg0.local_ip6, self.pg0.remote_ip6)
3448 self.config_protect(np)
3451 self.verify_tun_66(np, count=n_pkts)
3452 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3453 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3456 self.unconfig_protect(np)
3457 self.unconfig_sa(np)
3458 self.unconfig_network(p)
3460 def test_tun_66_police(self):
3461 """IPSEC interface IPv6 with input policer"""
3462 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3464 p = self.ipv6_params
3465 p.inner_hop_limit = 24
3466 p.outer_hop_limit = 23
3467 p.outer_flow_label = 243224
3468 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3470 self.config_network(p)
3471 self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3472 self.config_protect(p)
3474 action_tx = PolicerAction(
3475 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3477 policer = VppPolicer(
3484 conform_action=action_tx,
3485 exceed_action=action_tx,
3486 violate_action=action_tx,
3488 policer.add_vpp_config()
3490 # Start policing on tun
3491 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3493 self.verify_tun_66(p, count=n_pkts)
3494 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3495 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3497 stats = policer.get_stats()
3499 # Single rate, 2 colour policer - expect conform, violate but no exceed
3500 self.assertGreater(stats["conform_packets"], 0)
3501 self.assertEqual(stats["exceed_packets"], 0)
3502 self.assertGreater(stats["violate_packets"], 0)
3504 # Stop policing on tun
3505 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3506 self.verify_tun_66(p, count=n_pkts)
3508 # No new policer stats
3509 statsnew = policer.get_stats()
3510 self.assertEqual(stats, statsnew)
3513 policer.remove_vpp_config()
3514 self.unconfig_protect(p)
3516 self.unconfig_network(p)
3519 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3520 """Ipsec P2MP ESP v4 tests"""
3522 tun4_encrypt_node_name = "esp4-encrypt-tun"
3523 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3524 encryption_type = ESP
3526 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3528 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3530 IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
3531 / UDP(sport=1144, dport=2233)
3532 / Raw(b"X" * payload_size)
3534 for i in range(count)
3537 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
3539 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3540 / IP(src="1.1.1.1", dst=dst)
3541 / UDP(sport=1144, dport=2233)
3542 / Raw(b"X" * payload_size)
3543 for i in range(count)
3546 def verify_decrypted(self, p, rxs):
3548 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3549 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3551 def verify_encrypted(self, p, sa, rxs):
3555 rx[IP].tos, VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2
3557 self.assertEqual(rx[IP].ttl, p.hop_limit)
3558 pkt = sa.decrypt(rx[IP])
3559 if not pkt.haslayer(IP):
3560 pkt = IP(pkt[Raw].load)
3561 self.assert_packet_checksums_valid(pkt)
3563 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3564 except (IndexError, AssertionError):
3565 self.logger.debug(ppp("Unexpected packet:", rx))
3567 self.logger.debug(ppp("Decrypted packet:", pkt))
3573 super(TestIpsecMIfEsp4, self).setUp()
3576 self.tun_if = self.pg0
3577 p = self.ipv4_params
3578 p.tun_if = VppIpsecInterface(
3579 self, mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP)
3581 p.tun_if.add_vpp_config()
3583 p.tun_if.config_ip4()
3584 p.tun_if.unconfig_ip4()
3585 p.tun_if.config_ip4()
3586 p.tun_if.generate_remote_hosts(N_NHS)
3587 self.pg0.generate_remote_hosts(N_NHS)
3588 self.pg0.configure_ipv4_neighbors()
3590 r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
3591 a = VppAcl(self, [r_all]).add_vpp_config()
3593 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
3594 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
3596 # setup some SAs for several next-hops on the interface
3597 self.multi_params = []
3599 for ii in range(N_NHS):
3600 p = copy.copy(self.ipv4_params)
3602 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3603 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3604 p.scapy_tun_spi = p.scapy_tun_spi + ii
3605 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3606 p.vpp_tun_spi = p.vpp_tun_spi + ii
3608 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3609 p.scapy_tra_spi = p.scapy_tra_spi + ii
3610 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3611 p.vpp_tra_spi = p.vpp_tra_spi + ii
3612 p.hop_limit = ii + 10
3613 p.tun_sa_out = VppIpsecSA(
3619 p.crypt_algo_vpp_id,
3621 self.vpp_esp_protocol,
3623 self.pg0.remote_hosts[ii].ip4,
3624 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3625 hop_limit=p.hop_limit,
3627 p.tun_sa_out.add_vpp_config()
3629 p.tun_sa_in = VppIpsecSA(
3635 p.crypt_algo_vpp_id,
3637 self.vpp_esp_protocol,
3638 self.pg0.remote_hosts[ii].ip4,
3640 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3641 hop_limit=p.hop_limit,
3643 p.tun_sa_in.add_vpp_config()
3645 p.tun_protect = VppIpsecTunProtect(
3650 nh=p.tun_if.remote_hosts[ii].ip4,
3652 p.tun_protect.add_vpp_config()
3655 self.encryption_type,
3658 self.pg0.remote_hosts[ii].ip4,
3660 self.multi_params.append(p)
3662 p.via_tun_route = VppIpRoute(
3664 p.remote_tun_if_host,
3666 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
3669 p.tun_dst = self.pg0.remote_hosts[ii].ip4
3672 p = self.ipv4_params
3673 p.tun_if.unconfig_ip4()
3674 super(TestIpsecMIfEsp4, self).tearDown()
3676 def test_tun_44(self):
3679 for p in self.multi_params:
3680 self.verify_tun_44(p, count=N_PKTS)
3682 # remove one tunnel protect, the rest should still work
3683 self.multi_params[0].tun_protect.remove_vpp_config()
3684 self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3685 self.multi_params[0].via_tun_route.remove_vpp_config()
3686 self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3688 for p in self.multi_params[1:]:
3689 self.verify_tun_44(p, count=N_PKTS)
3691 self.multi_params[0].tun_protect.add_vpp_config()
3692 self.multi_params[0].via_tun_route.add_vpp_config()
3694 for p in self.multi_params:
3695 self.verify_tun_44(p, count=N_PKTS)
3698 class TestIpsecItf6MPLS(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3699 """IPsec Interface MPLSoIPv6"""
3701 tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3704 super(TestIpsecItf6MPLS, self).setUp()
3706 self.tun_if = self.pg0
3709 super(TestIpsecItf6MPLS, self).tearDown()
3711 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3713 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3715 MPLS(label=66, ttl=3)
3716 / IPv6(src=src, dst=dst)
3717 / UDP(sport=1166, dport=2233)
3718 / Raw(b"X" * payload_size)
3720 for i in range(count)
3723 def verify_encrypted6(self, p, sa, rxs):
3726 pkt = sa.decrypt(rx[IPv6])
3727 if not pkt.haslayer(IPv6):
3728 pkt = IP(pkt[Raw].load)
3729 self.assert_packet_checksums_valid(pkt)
3730 self.assert_equal(pkt[MPLS].label, 66)
3731 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3732 except (IndexError, AssertionError):
3733 self.logger.debug(ppp("Unexpected packet:", rx))
3735 self.logger.debug(ppp("Decrypted packet:", pkt))
3740 def test_tun_mpls_o_ip6(self):
3741 """IPSEC interface MPLS over IPv6"""
3744 p = self.ipv6_params
3747 tbl = VppMplsTable(self, 0)
3748 tbl.add_vpp_config()
3750 self.config_network(p)
3751 # deag MPLS routes from the tunnel
3756 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3757 eos_proto=f.FIB_PATH_NH_PROTO_IP6,
3762 p.tun_if.remote_ip6, p.tun_if.sw_if_index, labels=[VppMplsLabel(66)]
3766 p.tun_if.enable_mpls()
3768 self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3769 self.config_protect(p)
3771 self.verify_tun_66(p, count=n_pkts)
3774 p.tun_if.disable_mpls()
3775 self.unconfig_protect(p)
3777 self.unconfig_network(p)
3780 if __name__ == "__main__":
3781 unittest.main(testRunner=VppTestRunner)