5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw, bind_layers
8 from scapy.layers.inet import IP, UDP, ICMP
9 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
10 from scapy.contrib.mpls import MPLS
11 from asfframework import VppTestRunner, tag_fixme_vpp_workers
12 from template_ipsec import (
20 IpsecTun6HandoffTests,
21 IpsecTun4HandoffTests,
24 from vpp_gre_interface import VppGreInterface
25 from vpp_ipip_tun_interface import VppIpIpTunInterface
26 from vpp_ip_route import (
35 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
36 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
37 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
38 from vpp_teib import VppTeib
40 from vpp_papi import VppEnum
41 from vpp_papi_provider import CliFailedCommandError
42 from vpp_acl import AclRule, VppAcl, VppAclInterface
43 from vpp_policer import PolicerAction, VppPolicer, Dir
46 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
47 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
49 p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
51 crypt_key = mk_scapy_crypt_key(p)
53 p.tun_dst = tun_if.remote_ip
54 p.tun_src = tun_if.local_ip
60 is_default_port = p.nat_header.dport == 4500
62 is_default_port = True
65 outbound_nat_header = p.nat_header
67 outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
68 bind_layers(UDP, ESP, dport=p.nat_header.dport)
70 p.scapy_tun_sa = SecurityAssociation(
73 crypt_algo=p.crypt_algo,
75 auth_algo=p.auth_algo,
77 tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
78 nat_t_header=outbound_nat_header,
81 p.vpp_tun_sa = SecurityAssociation(
84 crypt_algo=p.crypt_algo,
86 auth_algo=p.auth_algo,
88 tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
89 nat_t_header=p.nat_header,
94 def config_tra_params(p, encryption_type, tun_if):
95 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
97 p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
99 crypt_key = mk_scapy_crypt_key(p)
100 p.tun_dst = tun_if.remote_ip
101 p.tun_src = tun_if.local_ip
104 is_default_port = p.nat_header.dport == 4500
106 is_default_port = True
109 outbound_nat_header = p.nat_header
111 outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
112 bind_layers(UDP, ESP, dport=p.nat_header.dport)
114 p.scapy_tun_sa = SecurityAssociation(
117 crypt_algo=p.crypt_algo,
119 auth_algo=p.auth_algo,
122 nat_t_header=outbound_nat_header,
124 p.vpp_tun_sa = SecurityAssociation(
127 crypt_algo=p.crypt_algo,
129 auth_algo=p.auth_algo,
132 nat_t_header=p.nat_header,
136 class TemplateIpsec4TunProtect(object):
137 """IPsec IPv4 Tunnel protect"""
139 encryption_type = ESP
140 tun4_encrypt_node_name = "esp4-encrypt-tun"
141 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
142 tun4_input_node = "ipsec4-tun-input"
144 def config_sa_tra(self, p):
145 config_tun_params(p, self.encryption_type, p.tun_if)
147 p.tun_sa_out = VppIpsecSA(
155 self.vpp_esp_protocol,
158 p.tun_sa_out.add_vpp_config()
160 p.tun_sa_in = VppIpsecSA(
168 self.vpp_esp_protocol,
171 p.tun_sa_in.add_vpp_config()
173 def config_sa_tun(self, p):
174 config_tun_params(p, self.encryption_type, p.tun_if)
176 p.tun_sa_out = VppIpsecSA(
184 self.vpp_esp_protocol,
185 self.tun_if.local_addr[p.addr_type],
186 self.tun_if.remote_addr[p.addr_type],
189 p.tun_sa_out.add_vpp_config()
191 p.tun_sa_in = VppIpsecSA(
199 self.vpp_esp_protocol,
200 self.tun_if.remote_addr[p.addr_type],
201 self.tun_if.local_addr[p.addr_type],
204 p.tun_sa_in.add_vpp_config()
206 def config_protect(self, p):
207 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
208 p.tun_protect.add_vpp_config()
210 def config_network(self, p):
211 if hasattr(p, "tun_dst"):
214 tun_dst = self.pg0.remote_ip4
215 p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip4, tun_dst)
216 p.tun_if.add_vpp_config()
218 p.tun_if.config_ip4()
219 p.tun_if.config_ip6()
221 p.route = VppIpRoute(
223 p.remote_tun_if_host,
225 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
227 p.route.add_vpp_config()
230 p.remote_tun_if_host6,
234 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
240 def unconfig_network(self, p):
241 p.route.remove_vpp_config()
242 p.tun_if.remove_vpp_config()
244 def unconfig_protect(self, p):
245 p.tun_protect.remove_vpp_config()
247 def unconfig_sa(self, p):
248 p.tun_sa_out.remove_vpp_config()
249 p.tun_sa_in.remove_vpp_config()
252 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec):
253 """IPsec tunnel interface tests"""
255 encryption_type = ESP
259 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
262 def tearDownClass(cls):
263 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
266 super(TemplateIpsec4TunIfEsp, self).setUp()
268 self.tun_if = self.pg0
272 self.config_network(p)
273 self.config_sa_tra(p)
274 self.config_protect(p)
277 super(TemplateIpsec4TunIfEsp, self).tearDown()
280 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect, TemplateIpsec):
281 """IPsec UDP tunnel interface tests"""
283 tun4_encrypt_node_name = "esp4-encrypt-tun"
284 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
285 encryption_type = ESP
289 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
292 def tearDownClass(cls):
293 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
295 def verify_encrypted(self, p, sa, rxs):
298 # ensure the UDP ports are correct before we decrypt
300 self.assertTrue(rx.haslayer(UDP))
301 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
302 self.assert_equal(rx[UDP].dport, p.nat_header.dport)
304 pkt = sa.decrypt(rx[IP])
305 if not pkt.haslayer(IP):
306 pkt = IP(pkt[Raw].load)
308 self.assert_packet_checksums_valid(pkt)
309 self.assert_equal(pkt[IP].dst, "1.1.1.1")
310 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
311 except (IndexError, AssertionError):
312 self.logger.debug(ppp("Unexpected packet:", rx))
314 self.logger.debug(ppp("Decrypted packet:", pkt))
319 def config_sa_tra(self, p):
320 config_tun_params(p, self.encryption_type, p.tun_if)
322 p.tun_sa_out = VppIpsecSA(
330 self.vpp_esp_protocol,
332 udp_src=p.nat_header.sport,
333 udp_dst=p.nat_header.dport,
335 p.tun_sa_out.add_vpp_config()
337 p.tun_sa_in = VppIpsecSA(
345 self.vpp_esp_protocol,
347 | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
348 udp_src=p.nat_header.sport,
349 udp_dst=p.nat_header.dport,
351 p.tun_sa_in.add_vpp_config()
354 super(TemplateIpsec4TunIfEspUdp, self).setUp()
357 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
358 p.nat_header = UDP(sport=5454, dport=4500)
360 self.tun_if = self.pg0
362 self.config_network(p)
363 self.config_sa_tra(p)
364 self.config_protect(p)
367 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
370 class TemplateIpsec4TunTfc:
371 """IPsec IPv4 tunnel with TFC"""
373 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
375 IP(src=src, dst=dst, len=28 + payload_size)
377 / Raw(b"X" * payload_size)
378 / Padding(b"Y" * 100)
381 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / sa.encrypt(pkt)
382 for i in range(count)
385 def verify_decrypted(self, p, rxs):
387 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
388 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
389 self.assert_equal(rx[IP].len, len(rx[IP]))
390 self.assert_packet_checksums_valid(rx)
393 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
394 """Ipsec ESP - TUN tests"""
396 tun4_encrypt_node_name = "esp4-encrypt-tun"
397 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
399 def test_tun_basic64(self):
400 """ipsec 6o4 tunnel basic test"""
401 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
403 self.verify_tun_64(self.params[socket.AF_INET], count=1)
405 def test_tun_burst64(self):
406 """ipsec 6o4 tunnel basic test"""
407 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
409 self.verify_tun_64(self.params[socket.AF_INET], count=257)
411 def test_tun_basic_frag44(self):
412 """ipsec 4o4 tunnel frag basic test"""
413 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
417 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [1500, 0, 0, 0])
419 self.params[socket.AF_INET], count=1, payload_size=1800, n_rx=2
421 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [9000, 0, 0, 0])
424 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
425 """Ipsec ESP UDP tests"""
427 tun4_input_node = "ipsec4-tun-input"
430 super(TestIpsec4TunIfEspUdp, self).setUp()
432 def test_keepalive(self):
433 """IPSEC NAT Keepalive"""
434 self.verify_keepalive(self.ipv4_params)
437 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
438 """Ipsec ESP UDP GCM tests"""
440 tun4_input_node = "ipsec4-tun-input"
443 super(TestIpsec4TunIfEspUdpGCM, self).setUp()
445 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
446 p.crypt_algo_vpp_id = (
447 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
449 p.crypt_algo = "AES-GCM"
451 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
455 class TestIpsec4TunIfEspUdpUpdate(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
456 """Ipsec ESP UDP update tests"""
458 tun4_input_node = "ipsec4-tun-input"
461 super(TestIpsec4TunIfEspUdpUpdate, self).setUp()
463 p.nat_header = UDP(sport=6565, dport=7676)
464 config_tun_params(p, self.encryption_type, p.tun_if)
465 p.tun_sa_in.update_vpp_config(
466 udp_src=p.nat_header.dport, udp_dst=p.nat_header.sport
468 p.tun_sa_out.update_vpp_config(
469 udp_src=p.nat_header.sport, udp_dst=p.nat_header.dport
473 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
474 """Ipsec ESP - TCP tests"""
479 class TemplateIpsec6TunProtect(object):
480 """IPsec IPv6 Tunnel protect"""
482 def config_sa_tra(self, p):
483 config_tun_params(p, self.encryption_type, p.tun_if)
485 p.tun_sa_out = VppIpsecSA(
493 self.vpp_esp_protocol,
495 p.tun_sa_out.add_vpp_config()
497 p.tun_sa_in = VppIpsecSA(
505 self.vpp_esp_protocol,
507 p.tun_sa_in.add_vpp_config()
509 def config_sa_tun(self, p):
510 config_tun_params(p, self.encryption_type, p.tun_if)
512 p.tun_sa_out = VppIpsecSA(
520 self.vpp_esp_protocol,
521 self.tun_if.local_addr[p.addr_type],
522 self.tun_if.remote_addr[p.addr_type],
524 p.tun_sa_out.add_vpp_config()
526 p.tun_sa_in = VppIpsecSA(
534 self.vpp_esp_protocol,
535 self.tun_if.remote_addr[p.addr_type],
536 self.tun_if.local_addr[p.addr_type],
538 p.tun_sa_in.add_vpp_config()
540 def config_protect(self, p):
541 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
542 p.tun_protect.add_vpp_config()
544 def config_network(self, p):
545 if hasattr(p, "tun_dst"):
548 tun_dst = self.pg0.remote_ip6
549 p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip6, tun_dst)
550 p.tun_if.add_vpp_config()
552 p.tun_if.config_ip6()
553 p.tun_if.config_ip4()
555 p.route = VppIpRoute(
557 p.remote_tun_if_host,
561 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
565 p.route.add_vpp_config()
568 p.remote_tun_if_host4,
570 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
574 def unconfig_network(self, p):
575 p.route.remove_vpp_config()
576 p.tun_if.remove_vpp_config()
578 def unconfig_protect(self, p):
579 p.tun_protect.remove_vpp_config()
581 def unconfig_sa(self, p):
582 p.tun_sa_out.remove_vpp_config()
583 p.tun_sa_in.remove_vpp_config()
586 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec):
587 """IPsec tunnel interface tests"""
589 encryption_type = ESP
592 super(TemplateIpsec6TunIfEsp, self).setUp()
594 self.tun_if = self.pg0
597 self.config_network(p)
598 self.config_sa_tra(p)
599 self.config_protect(p)
602 super(TemplateIpsec6TunIfEsp, self).tearDown()
605 class TemplateIpsec6TunIfEspUdp(TemplateIpsec6TunProtect, TemplateIpsec):
606 """IPsec6 UDP tunnel interface tests"""
608 tun4_encrypt_node_name = "esp6-encrypt-tun"
609 tun4_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
610 encryption_type = ESP
614 super(TemplateIpsec6TunIfEspUdp, cls).setUpClass()
617 def tearDownClass(cls):
618 super(TemplateIpsec6TunIfEspUdp, cls).tearDownClass()
620 def verify_encrypted(self, p, sa, rxs):
623 # ensure the UDP ports are correct before we decrypt
625 self.assertTrue(rx.haslayer(UDP))
626 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
627 self.assert_equal(rx[UDP].dport, p.nat_header.dport)
629 pkt = sa.decrypt(rx[IP])
630 if not pkt.haslayer(IP):
631 pkt = IP(pkt[Raw].load)
633 self.assert_packet_checksums_valid(pkt)
635 pkt[IP].dst, "1111:1111:1111:1111:1111:1111:1111:1111"
637 self.assert_equal(pkt[IP].src, self.pg1.remote_ip6)
638 except (IndexError, AssertionError):
639 self.logger.debug(ppp("Unexpected packet:", rx))
641 self.logger.debug(ppp("Decrypted packet:", pkt))
646 def config_sa_tra(self, p):
647 config_tun_params(p, self.encryption_type, p.tun_if)
649 p.tun_sa_out = VppIpsecSA(
657 self.vpp_esp_protocol,
659 udp_src=p.nat_header.sport,
660 udp_dst=p.nat_header.dport,
662 p.tun_sa_out.add_vpp_config()
664 p.tun_sa_in = VppIpsecSA(
672 self.vpp_esp_protocol,
674 | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
675 udp_src=p.nat_header.sport,
676 udp_dst=p.nat_header.dport,
678 p.tun_sa_in.add_vpp_config()
681 super(TemplateIpsec6TunIfEspUdp, self).setUp()
684 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
685 p.nat_header = UDP(sport=5454, dport=4500)
687 self.tun_if = self.pg0
689 self.config_network(p)
690 self.config_sa_tra(p)
691 self.config_protect(p)
694 super(TemplateIpsec6TunIfEspUdp, self).tearDown()
697 class TemplateIpsec6TunTfc:
698 """IPsec IPv6 tunnel with TFC"""
700 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
702 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
704 IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
705 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
706 / Padding(b"Y" * 100)
708 for i in range(count)
711 def verify_decrypted6(self, p, rxs):
713 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
714 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
715 self.assert_equal(rx[IPv6].plen, len(rx[IPv6].payload))
716 self.assert_packet_checksums_valid(rx)
719 class TestIpsec6TunIfEspUdp(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
720 """Ipsec ESP 6 UDP tests"""
722 tun6_input_node = "ipsec6-tun-input"
723 tun6_encrypt_node_name = "esp6-encrypt-tun"
724 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
727 super(TestIpsec6TunIfEspUdp, self).setUp()
729 def test_keepalive(self):
730 """IPSEC6 NAT Keepalive"""
731 self.verify_keepalive(self.ipv6_params)
734 class TestIpsec6TunIfEspUdpGCM(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
735 """Ipsec ESP 6 UDP GCM tests"""
737 tun6_input_node = "ipsec6-tun-input"
738 tun6_encrypt_node_name = "esp6-encrypt-tun"
739 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
742 super(TestIpsec6TunIfEspUdpGCM, self).setUp()
744 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
745 p.crypt_algo_vpp_id = (
746 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
748 p.crypt_algo = "AES-GCM"
750 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
754 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
755 """Ipsec ESP - TUN tests"""
757 tun6_encrypt_node_name = "esp6-encrypt-tun"
758 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
760 def test_tun_basic46(self):
761 """ipsec 4o6 tunnel basic test"""
762 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
763 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
765 def test_tun_burst46(self):
766 """ipsec 4o6 tunnel burst test"""
767 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
768 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
771 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp, IpsecTun6HandoffTests):
772 """Ipsec ESP 6 Handoff tests"""
774 tun6_encrypt_node_name = "esp6-encrypt-tun"
775 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
777 def test_tun_handoff_66_police(self):
778 """ESP 6o6 tunnel with policer worker hand-off test"""
779 self.vapi.cli("clear errors")
780 self.vapi.cli("clear ipsec sa")
783 p = self.params[socket.AF_INET6]
785 action_tx = PolicerAction(
786 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
788 policer = VppPolicer(
795 conform_action=action_tx,
796 exceed_action=action_tx,
797 violate_action=action_tx,
799 policer.add_vpp_config()
801 # Start policing on tun
802 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
804 for pol_bind in [1, 0]:
805 policer.bind_vpp_config(pol_bind, True)
807 # inject alternately on worker 0 and 1.
808 for worker in [0, 1, 0, 1]:
809 send_pkts = self.gen_encrypt_pkts6(
813 src=p.remote_tun_if_host,
814 dst=self.pg1.remote_ip6,
817 recv_pkts = self.send_and_expect(
818 self.tun_if, send_pkts, self.pg1, worker=worker
820 self.verify_decrypted6(p, recv_pkts)
821 self.logger.debug(self.vapi.cli("show trace max 100"))
823 stats = policer.get_stats()
824 stats0 = policer.get_stats(worker=0)
825 stats1 = policer.get_stats(worker=1)
828 # First pass: Worker 1, should have done all the policing
829 self.assertEqual(stats, stats1)
831 # Worker 0, should have handed everything off
832 self.assertEqual(stats0["conform_packets"], 0)
833 self.assertEqual(stats0["exceed_packets"], 0)
834 self.assertEqual(stats0["violate_packets"], 0)
836 # Second pass: both workers should have policed equal amounts
837 self.assertGreater(stats1["conform_packets"], 0)
838 self.assertEqual(stats1["exceed_packets"], 0)
839 self.assertGreater(stats1["violate_packets"], 0)
841 self.assertGreater(stats0["conform_packets"], 0)
842 self.assertEqual(stats0["exceed_packets"], 0)
843 self.assertGreater(stats0["violate_packets"], 0)
846 stats0["conform_packets"] + stats0["violate_packets"],
847 stats1["conform_packets"] + stats1["violate_packets"],
850 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
851 policer.remove_vpp_config()
854 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp, IpsecTun4HandoffTests):
855 """Ipsec ESP 4 Handoff tests"""
857 tun4_encrypt_node_name = "esp4-encrypt-tun"
858 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
860 def test_tun_handoff_44_police(self):
861 """ESP 4o4 tunnel with policer worker hand-off test"""
862 self.vapi.cli("clear errors")
863 self.vapi.cli("clear ipsec sa")
866 p = self.params[socket.AF_INET]
868 action_tx = PolicerAction(
869 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
871 policer = VppPolicer(
878 conform_action=action_tx,
879 exceed_action=action_tx,
880 violate_action=action_tx,
882 policer.add_vpp_config()
884 # Start policing on tun
885 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
887 for pol_bind in [1, 0]:
888 policer.bind_vpp_config(pol_bind, True)
890 # inject alternately on worker 0 and 1.
891 for worker in [0, 1, 0, 1]:
892 send_pkts = self.gen_encrypt_pkts(
896 src=p.remote_tun_if_host,
897 dst=self.pg1.remote_ip4,
900 recv_pkts = self.send_and_expect(
901 self.tun_if, send_pkts, self.pg1, worker=worker
903 self.verify_decrypted(p, recv_pkts)
904 self.logger.debug(self.vapi.cli("show trace max 100"))
906 stats = policer.get_stats()
907 stats0 = policer.get_stats(worker=0)
908 stats1 = policer.get_stats(worker=1)
911 # First pass: Worker 1, should have done all the policing
912 self.assertEqual(stats, stats1)
914 # Worker 0, should have handed everything off
915 self.assertEqual(stats0["conform_packets"], 0)
916 self.assertEqual(stats0["exceed_packets"], 0)
917 self.assertEqual(stats0["violate_packets"], 0)
919 # Second pass: both workers should have policed equal amounts
920 self.assertGreater(stats1["conform_packets"], 0)
921 self.assertEqual(stats1["exceed_packets"], 0)
922 self.assertGreater(stats1["violate_packets"], 0)
924 self.assertGreater(stats0["conform_packets"], 0)
925 self.assertEqual(stats0["exceed_packets"], 0)
926 self.assertGreater(stats0["violate_packets"], 0)
929 stats0["conform_packets"] + stats0["violate_packets"],
930 stats1["conform_packets"] + stats1["violate_packets"],
933 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
934 policer.remove_vpp_config()
937 @tag_fixme_vpp_workers
938 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
939 """IPsec IPv4 Multi Tunnel interface"""
941 encryption_type = ESP
942 tun4_encrypt_node_name = "esp4-encrypt-tun"
943 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
946 super(TestIpsec4MultiTunIfEsp, self).setUp()
948 self.tun_if = self.pg0
950 self.multi_params = []
951 self.pg0.generate_remote_hosts(10)
952 self.pg0.configure_ipv4_neighbors()
955 p = copy.copy(self.ipv4_params)
957 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
958 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
959 p.scapy_tun_spi = p.scapy_tun_spi + ii
960 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
961 p.vpp_tun_spi = p.vpp_tun_spi + ii
963 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
964 p.scapy_tra_spi = p.scapy_tra_spi + ii
965 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
966 p.vpp_tra_spi = p.vpp_tra_spi + ii
967 p.tun_dst = self.pg0.remote_hosts[ii].ip4
969 self.multi_params.append(p)
970 self.config_network(p)
971 self.config_sa_tra(p)
972 self.config_protect(p)
975 super(TestIpsec4MultiTunIfEsp, self).tearDown()
977 def test_tun_44(self):
978 """Multiple IPSEC tunnel interfaces"""
979 for p in self.multi_params:
980 self.verify_tun_44(p, count=127)
981 self.assertEqual(p.tun_if.get_rx_stats(), 127)
982 self.assertEqual(p.tun_if.get_tx_stats(), 127)
984 def test_tun_rr_44(self):
985 """Round-robin packets acrros multiple interface"""
987 for p in self.multi_params:
988 tx = tx + self.gen_encrypt_pkts(
992 src=p.remote_tun_if_host,
993 dst=self.pg1.remote_ip4,
995 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
997 for rx, p in zip(rxs, self.multi_params):
998 self.verify_decrypted(p, [rx])
1001 for p in self.multi_params:
1002 tx = tx + self.gen_pkts(
1003 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host
1005 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
1007 for rx, p in zip(rxs, self.multi_params):
1008 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
1011 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
1012 """IPsec IPv4 Tunnel interface all Algos"""
1014 encryption_type = ESP
1015 tun4_encrypt_node_name = "esp4-encrypt-tun"
1016 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1019 super(TestIpsec4TunIfEspAll, self).setUp()
1021 self.tun_if = self.pg0
1022 p = self.ipv4_params
1024 self.config_network(p)
1025 self.config_sa_tra(p)
1026 self.config_protect(p)
1029 p = self.ipv4_params
1030 self.unconfig_protect(p)
1031 self.unconfig_network(p)
1034 super(TestIpsec4TunIfEspAll, self).tearDown()
1038 # change the key and the SPI
1041 p.crypt_key = b"X" + p.crypt_key[1:]
1042 p.scapy_tun_spi += 1
1043 p.scapy_tun_sa_id += 1
1045 p.vpp_tun_sa_id += 1
1046 p.tun_if.local_spi = p.vpp_tun_spi
1047 p.tun_if.remote_spi = p.scapy_tun_spi
1049 config_tun_params(p, self.encryption_type, p.tun_if)
1051 p.tun_sa_out = VppIpsecSA(
1057 p.crypt_algo_vpp_id,
1059 self.vpp_esp_protocol,
1063 p.tun_sa_in = VppIpsecSA(
1069 p.crypt_algo_vpp_id,
1071 self.vpp_esp_protocol,
1075 p.tun_sa_in.add_vpp_config()
1076 p.tun_sa_out.add_vpp_config()
1078 self.config_protect(p)
1079 np.tun_sa_out.remove_vpp_config()
1080 np.tun_sa_in.remove_vpp_config()
1081 self.logger.info(self.vapi.cli("sh ipsec sa"))
1083 def test_tun_44(self):
1084 """IPSEC tunnel all algos"""
1086 # foreach VPP crypto engine
1087 engines = ["ia32", "ipsecmb", "openssl"]
1089 # foreach crypto algorithm
1093 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_128
1096 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1098 "scapy-crypto": "AES-GCM",
1099 "scapy-integ": "NULL",
1100 "key": b"JPjyOWBeVEQiMe7h",
1105 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_192
1108 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1110 "scapy-crypto": "AES-GCM",
1111 "scapy-integ": "NULL",
1112 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1117 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
1120 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1122 "scapy-crypto": "AES-GCM",
1123 "scapy-integ": "NULL",
1124 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1129 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
1132 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1134 "scapy-crypto": "AES-CBC",
1135 "scapy-integ": "HMAC-SHA1-96",
1137 "key": b"JPjyOWBeVEQiMe7h",
1141 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_192
1144 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256
1146 "scapy-crypto": "AES-CBC",
1147 "scapy-integ": "SHA2-512-256",
1149 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1153 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_256
1156 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128
1158 "scapy-crypto": "AES-CBC",
1159 "scapy-integ": "SHA2-256-128",
1161 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1165 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1168 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1170 "scapy-crypto": "NULL",
1171 "scapy-integ": "HMAC-SHA1-96",
1173 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1177 for engine in engines:
1178 self.vapi.cli("set crypto handler all %s" % engine)
1181 # loop through each of the algorithms
1184 # with self.subTest(algo=algo['scapy']):
1186 p = self.ipv4_params
1187 p.auth_algo_vpp_id = algo["vpp-integ"]
1188 p.crypt_algo_vpp_id = algo["vpp-crypto"]
1189 p.crypt_algo = algo["scapy-crypto"]
1190 p.auth_algo = algo["scapy-integ"]
1191 p.crypt_key = algo["key"]
1192 p.salt = algo["salt"]
1198 self.verify_tun_44(p, count=127)
1201 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
1202 """IPsec IPv4 Tunnel interface no Algos"""
1204 encryption_type = ESP
1205 tun4_encrypt_node_name = "esp4-encrypt-tun"
1206 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1209 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
1211 self.tun_if = self.pg0
1212 p = self.ipv4_params
1213 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1214 p.auth_algo = "NULL"
1217 p.crypt_algo_vpp_id = (
1218 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1220 p.crypt_algo = "NULL"
1224 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
1226 def test_tun_44(self):
1227 """IPSec SA with NULL algos"""
1228 p = self.ipv4_params
1230 self.config_network(p)
1231 self.config_sa_tra(p)
1232 self.config_protect(p)
1234 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host)
1235 self.send_and_assert_no_replies(self.pg1, tx)
1237 self.unconfig_protect(p)
1239 self.unconfig_network(p)
1242 @tag_fixme_vpp_workers
1243 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec, IpsecTun6):
1244 """IPsec IPv6 Multi Tunnel interface"""
1246 encryption_type = ESP
1247 tun6_encrypt_node_name = "esp6-encrypt-tun"
1248 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1251 super(TestIpsec6MultiTunIfEsp, self).setUp()
1253 self.tun_if = self.pg0
1255 self.multi_params = []
1256 self.pg0.generate_remote_hosts(10)
1257 self.pg0.configure_ipv6_neighbors()
1259 for ii in range(10):
1260 p = copy.copy(self.ipv6_params)
1262 p.remote_tun_if_host = "1111::%d" % (ii + 1)
1263 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1264 p.scapy_tun_spi = p.scapy_tun_spi + ii
1265 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1266 p.vpp_tun_spi = p.vpp_tun_spi + ii
1268 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1269 p.scapy_tra_spi = p.scapy_tra_spi + ii
1270 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1271 p.vpp_tra_spi = p.vpp_tra_spi + ii
1272 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1274 self.multi_params.append(p)
1275 self.config_network(p)
1276 self.config_sa_tra(p)
1277 self.config_protect(p)
1280 super(TestIpsec6MultiTunIfEsp, self).tearDown()
1282 def test_tun_66(self):
1283 """Multiple IPSEC tunnel interfaces"""
1284 for p in self.multi_params:
1285 self.verify_tun_66(p, count=127)
1286 self.assertEqual(p.tun_if.get_rx_stats(), 127)
1287 self.assertEqual(p.tun_if.get_tx_stats(), 127)
1290 class TestIpsecGreTebIfEsp(TemplateIpsec, IpsecTun4Tests):
1291 """Ipsec GRE TEB ESP - TUN tests"""
1293 tun4_encrypt_node_name = "esp4-encrypt-tun"
1294 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1295 encryption_type = ESP
1296 omac = "00:11:22:33:44:55"
1298 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1300 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1302 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1304 / Ether(dst=self.omac)
1305 / IP(src="1.1.1.1", dst="1.1.1.2")
1306 / UDP(sport=1144, dport=2233)
1307 / Raw(b"X" * payload_size)
1309 for i in range(count)
1312 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1314 Ether(dst=self.omac)
1315 / IP(src="1.1.1.1", dst="1.1.1.2")
1316 / UDP(sport=1144, dport=2233)
1317 / Raw(b"X" * payload_size)
1318 for i in range(count)
1321 def verify_decrypted(self, p, rxs):
1323 self.assert_equal(rx[Ether].dst, self.omac)
1324 self.assert_equal(rx[IP].dst, "1.1.1.2")
1326 def verify_encrypted(self, p, sa, rxs):
1329 pkt = sa.decrypt(rx[IP])
1330 if not pkt.haslayer(IP):
1331 pkt = IP(pkt[Raw].load)
1332 self.assert_packet_checksums_valid(pkt)
1333 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1334 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1335 self.assertTrue(pkt.haslayer(GRE))
1337 self.assertEqual(e[Ether].dst, self.omac)
1338 self.assertEqual(e[IP].dst, "1.1.1.2")
1339 except (IndexError, AssertionError):
1340 self.logger.debug(ppp("Unexpected packet:", rx))
1342 self.logger.debug(ppp("Decrypted packet:", pkt))
1348 super(TestIpsecGreTebIfEsp, self).setUp()
1350 self.tun_if = self.pg0
1352 p = self.ipv4_params
1354 bd1 = VppBridgeDomain(self, 1)
1355 bd1.add_vpp_config()
1357 p.tun_sa_out = VppIpsecSA(
1363 p.crypt_algo_vpp_id,
1365 self.vpp_esp_protocol,
1367 self.pg0.remote_ip4,
1369 p.tun_sa_out.add_vpp_config()
1371 p.tun_sa_in = VppIpsecSA(
1377 p.crypt_algo_vpp_id,
1379 self.vpp_esp_protocol,
1380 self.pg0.remote_ip4,
1383 p.tun_sa_in.add_vpp_config()
1385 p.tun_if = VppGreInterface(
1388 self.pg0.remote_ip4,
1389 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1391 p.tun_if.add_vpp_config()
1393 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1395 p.tun_protect.add_vpp_config()
1398 p.tun_if.config_ip4()
1399 config_tun_params(p, self.encryption_type, p.tun_if)
1401 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1402 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1404 self.vapi.cli("clear ipsec sa")
1405 self.vapi.cli("sh adj")
1406 self.vapi.cli("sh ipsec tun")
1409 p = self.ipv4_params
1410 p.tun_if.unconfig_ip4()
1411 super(TestIpsecGreTebIfEsp, self).tearDown()
1414 class TestIpsecGreTebVlanIfEsp(TemplateIpsec, IpsecTun4Tests):
1415 """Ipsec GRE TEB ESP - TUN tests"""
1417 tun4_encrypt_node_name = "esp4-encrypt-tun"
1418 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1419 encryption_type = ESP
1420 omac = "00:11:22:33:44:55"
1422 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1424 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1426 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1428 / Ether(dst=self.omac)
1429 / IP(src="1.1.1.1", dst="1.1.1.2")
1430 / UDP(sport=1144, dport=2233)
1431 / Raw(b"X" * payload_size)
1433 for i in range(count)
1436 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1438 Ether(dst=self.omac)
1440 / IP(src="1.1.1.1", dst="1.1.1.2")
1441 / UDP(sport=1144, dport=2233)
1442 / Raw(b"X" * payload_size)
1443 for i in range(count)
1446 def verify_decrypted(self, p, rxs):
1448 self.assert_equal(rx[Ether].dst, self.omac)
1449 self.assert_equal(rx[Dot1Q].vlan, 11)
1450 self.assert_equal(rx[IP].dst, "1.1.1.2")
1452 def verify_encrypted(self, p, sa, rxs):
1455 pkt = sa.decrypt(rx[IP])
1456 if not pkt.haslayer(IP):
1457 pkt = IP(pkt[Raw].load)
1458 self.assert_packet_checksums_valid(pkt)
1459 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1460 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1461 self.assertTrue(pkt.haslayer(GRE))
1463 self.assertEqual(e[Ether].dst, self.omac)
1464 self.assertFalse(e.haslayer(Dot1Q))
1465 self.assertEqual(e[IP].dst, "1.1.1.2")
1466 except (IndexError, AssertionError):
1467 self.logger.debug(ppp("Unexpected packet:", rx))
1469 self.logger.debug(ppp("Decrypted packet:", pkt))
1475 super(TestIpsecGreTebVlanIfEsp, self).setUp()
1477 self.tun_if = self.pg0
1479 p = self.ipv4_params
1481 bd1 = VppBridgeDomain(self, 1)
1482 bd1.add_vpp_config()
1484 self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1485 self.vapi.l2_interface_vlan_tag_rewrite(
1486 sw_if_index=self.pg1_11.sw_if_index,
1487 vtr_op=L2_VTR_OP.L2_POP_1,
1490 self.pg1_11.admin_up()
1492 p.tun_sa_out = VppIpsecSA(
1498 p.crypt_algo_vpp_id,
1500 self.vpp_esp_protocol,
1502 self.pg0.remote_ip4,
1504 p.tun_sa_out.add_vpp_config()
1506 p.tun_sa_in = VppIpsecSA(
1512 p.crypt_algo_vpp_id,
1514 self.vpp_esp_protocol,
1515 self.pg0.remote_ip4,
1518 p.tun_sa_in.add_vpp_config()
1520 p.tun_if = VppGreInterface(
1523 self.pg0.remote_ip4,
1524 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1526 p.tun_if.add_vpp_config()
1528 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1530 p.tun_protect.add_vpp_config()
1533 p.tun_if.config_ip4()
1534 config_tun_params(p, self.encryption_type, p.tun_if)
1536 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1537 VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1539 self.vapi.cli("clear ipsec sa")
1542 p = self.ipv4_params
1543 p.tun_if.unconfig_ip4()
1544 super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1545 self.pg1_11.admin_down()
1546 self.pg1_11.remove_vpp_config()
1549 class TestIpsecGreTebIfEspTra(TemplateIpsec, IpsecTun4Tests):
1550 """Ipsec GRE TEB ESP - Tra tests"""
1552 tun4_encrypt_node_name = "esp4-encrypt-tun"
1553 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1554 encryption_type = ESP
1555 omac = "00:11:22:33:44:55"
1557 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1559 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1561 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1563 / Ether(dst=self.omac)
1564 / IP(src="1.1.1.1", dst="1.1.1.2")
1565 / UDP(sport=1144, dport=2233)
1566 / Raw(b"X" * payload_size)
1568 for i in range(count)
1571 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1573 Ether(dst=self.omac)
1574 / IP(src="1.1.1.1", dst="1.1.1.2")
1575 / UDP(sport=1144, dport=2233)
1576 / Raw(b"X" * payload_size)
1577 for i in range(count)
1580 def verify_decrypted(self, p, rxs):
1582 self.assert_equal(rx[Ether].dst, self.omac)
1583 self.assert_equal(rx[IP].dst, "1.1.1.2")
1585 def verify_encrypted(self, p, sa, rxs):
1588 pkt = sa.decrypt(rx[IP])
1589 if not pkt.haslayer(IP):
1590 pkt = IP(pkt[Raw].load)
1591 self.assert_packet_checksums_valid(pkt)
1592 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1593 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1594 self.assertTrue(pkt.haslayer(GRE))
1596 self.assertEqual(e[Ether].dst, self.omac)
1597 self.assertEqual(e[IP].dst, "1.1.1.2")
1598 except (IndexError, AssertionError):
1599 self.logger.debug(ppp("Unexpected packet:", rx))
1601 self.logger.debug(ppp("Decrypted packet:", pkt))
1607 super(TestIpsecGreTebIfEspTra, self).setUp()
1609 self.tun_if = self.pg0
1611 p = self.ipv4_params
1613 bd1 = VppBridgeDomain(self, 1)
1614 bd1.add_vpp_config()
1616 p.tun_sa_out = VppIpsecSA(
1622 p.crypt_algo_vpp_id,
1624 self.vpp_esp_protocol,
1626 p.tun_sa_out.add_vpp_config()
1628 p.tun_sa_in = VppIpsecSA(
1634 p.crypt_algo_vpp_id,
1636 self.vpp_esp_protocol,
1638 p.tun_sa_in.add_vpp_config()
1640 p.tun_if = VppGreInterface(
1643 self.pg0.remote_ip4,
1644 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1646 p.tun_if.add_vpp_config()
1648 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1650 p.tun_protect.add_vpp_config()
1653 p.tun_if.config_ip4()
1654 config_tra_params(p, self.encryption_type, p.tun_if)
1656 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1657 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1659 self.vapi.cli("clear ipsec sa")
1662 p = self.ipv4_params
1663 p.tun_if.unconfig_ip4()
1664 super(TestIpsecGreTebIfEspTra, self).tearDown()
1667 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec, IpsecTun4Tests):
1668 """Ipsec GRE TEB UDP ESP - Tra tests"""
1670 tun4_encrypt_node_name = "esp4-encrypt-tun"
1671 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1672 encryption_type = ESP
1673 omac = "00:11:22:33:44:55"
1675 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1677 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1679 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1681 / Ether(dst=self.omac)
1682 / IP(src="1.1.1.1", dst="1.1.1.2")
1683 / UDP(sport=1144, dport=2233)
1684 / Raw(b"X" * payload_size)
1686 for i in range(count)
1689 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1691 Ether(dst=self.omac)
1692 / IP(src="1.1.1.1", dst="1.1.1.2")
1693 / UDP(sport=1144, dport=2233)
1694 / Raw(b"X" * payload_size)
1695 for i in range(count)
1698 def verify_decrypted(self, p, rxs):
1700 self.assert_equal(rx[Ether].dst, self.omac)
1701 self.assert_equal(rx[IP].dst, "1.1.1.2")
1703 def verify_encrypted(self, p, sa, rxs):
1705 self.assertTrue(rx.haslayer(UDP))
1706 self.assertEqual(rx[UDP].dport, 4545)
1707 self.assertEqual(rx[UDP].sport, 5454)
1709 pkt = sa.decrypt(rx[IP])
1710 if not pkt.haslayer(IP):
1711 pkt = IP(pkt[Raw].load)
1712 self.assert_packet_checksums_valid(pkt)
1713 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1714 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1715 self.assertTrue(pkt.haslayer(GRE))
1717 self.assertEqual(e[Ether].dst, self.omac)
1718 self.assertEqual(e[IP].dst, "1.1.1.2")
1719 except (IndexError, AssertionError):
1720 self.logger.debug(ppp("Unexpected packet:", rx))
1722 self.logger.debug(ppp("Decrypted packet:", pkt))
1728 super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1730 self.tun_if = self.pg0
1732 p = self.ipv4_params
1733 p = self.ipv4_params
1734 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
1735 p.nat_header = UDP(sport=5454, dport=4545)
1737 bd1 = VppBridgeDomain(self, 1)
1738 bd1.add_vpp_config()
1740 p.tun_sa_out = VppIpsecSA(
1746 p.crypt_algo_vpp_id,
1748 self.vpp_esp_protocol,
1753 p.tun_sa_out.add_vpp_config()
1755 p.tun_sa_in = VppIpsecSA(
1761 p.crypt_algo_vpp_id,
1763 self.vpp_esp_protocol,
1765 p.flags | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND
1770 p.tun_sa_in.add_vpp_config()
1772 p.tun_if = VppGreInterface(
1775 self.pg0.remote_ip4,
1776 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1778 p.tun_if.add_vpp_config()
1780 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1782 p.tun_protect.add_vpp_config()
1785 p.tun_if.config_ip4()
1786 config_tra_params(p, self.encryption_type, p.tun_if)
1788 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1789 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1791 self.vapi.cli("clear ipsec sa")
1792 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1795 p = self.ipv4_params
1796 p.tun_if.unconfig_ip4()
1797 super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1800 class TestIpsecGreIfEsp(TemplateIpsec, IpsecTun4Tests):
1801 """Ipsec GRE ESP - TUN tests"""
1803 tun4_encrypt_node_name = "esp4-encrypt-tun"
1804 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1805 encryption_type = ESP
1807 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1809 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1811 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1813 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1814 / UDP(sport=1144, dport=2233)
1815 / Raw(b"X" * payload_size)
1817 for i in range(count)
1820 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1822 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1823 / IP(src="1.1.1.1", dst="1.1.1.2")
1824 / UDP(sport=1144, dport=2233)
1825 / Raw(b"X" * payload_size)
1826 for i in range(count)
1829 def verify_decrypted(self, p, rxs):
1831 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1832 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1834 def verify_encrypted(self, p, sa, rxs):
1837 pkt = sa.decrypt(rx[IP])
1838 if not pkt.haslayer(IP):
1839 pkt = IP(pkt[Raw].load)
1840 self.assert_packet_checksums_valid(pkt)
1841 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1842 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1843 self.assertTrue(pkt.haslayer(GRE))
1845 self.assertEqual(e[IP].dst, "1.1.1.2")
1846 except (IndexError, AssertionError):
1847 self.logger.debug(ppp("Unexpected packet:", rx))
1849 self.logger.debug(ppp("Decrypted packet:", pkt))
1855 super(TestIpsecGreIfEsp, self).setUp()
1857 self.tun_if = self.pg0
1859 p = self.ipv4_params
1861 bd1 = VppBridgeDomain(self, 1)
1862 bd1.add_vpp_config()
1864 p.tun_sa_out = VppIpsecSA(
1870 p.crypt_algo_vpp_id,
1872 self.vpp_esp_protocol,
1874 self.pg0.remote_ip4,
1876 p.tun_sa_out.add_vpp_config()
1878 p.tun_sa_in = VppIpsecSA(
1884 p.crypt_algo_vpp_id,
1886 self.vpp_esp_protocol,
1887 self.pg0.remote_ip4,
1890 p.tun_sa_in.add_vpp_config()
1892 p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1893 p.tun_if.add_vpp_config()
1895 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1896 p.tun_protect.add_vpp_config()
1899 p.tun_if.config_ip4()
1900 config_tun_params(p, self.encryption_type, p.tun_if)
1903 self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1907 p = self.ipv4_params
1908 p.tun_if.unconfig_ip4()
1909 super(TestIpsecGreIfEsp, self).tearDown()
1912 class TestIpsecGreIfEspTra(TemplateIpsec, IpsecTun4Tests):
1913 """Ipsec GRE ESP - TRA tests"""
1915 tun4_encrypt_node_name = "esp4-encrypt-tun"
1916 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1917 encryption_type = ESP
1919 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1921 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1923 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1925 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1926 / UDP(sport=1144, dport=2233)
1927 / Raw(b"X" * payload_size)
1929 for i in range(count)
1932 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1, payload_size=100):
1934 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1936 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1938 / UDP(sport=1144, dport=2233)
1939 / Raw(b"X" * payload_size)
1941 for i in range(count)
1944 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1946 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1947 / IP(src="1.1.1.1", dst="1.1.1.2")
1948 / UDP(sport=1144, dport=2233)
1949 / Raw(b"X" * payload_size)
1950 for i in range(count)
1953 def verify_decrypted(self, p, rxs):
1955 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1956 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1958 def verify_encrypted(self, p, sa, rxs):
1961 pkt = sa.decrypt(rx[IP])
1962 if not pkt.haslayer(IP):
1963 pkt = IP(pkt[Raw].load)
1964 self.assert_packet_checksums_valid(pkt)
1965 self.assertTrue(pkt.haslayer(GRE))
1967 self.assertEqual(e[IP].dst, "1.1.1.2")
1968 except (IndexError, AssertionError):
1969 self.logger.debug(ppp("Unexpected packet:", rx))
1971 self.logger.debug(ppp("Decrypted packet:", pkt))
1977 super(TestIpsecGreIfEspTra, self).setUp()
1979 self.tun_if = self.pg0
1981 p = self.ipv4_params
1983 p.tun_sa_out = VppIpsecSA(
1989 p.crypt_algo_vpp_id,
1991 self.vpp_esp_protocol,
1993 p.tun_sa_out.add_vpp_config()
1995 p.tun_sa_in = VppIpsecSA(
2001 p.crypt_algo_vpp_id,
2003 self.vpp_esp_protocol,
2005 p.tun_sa_in.add_vpp_config()
2007 p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
2008 p.tun_if.add_vpp_config()
2010 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2011 p.tun_protect.add_vpp_config()
2014 p.tun_if.config_ip4()
2015 config_tra_params(p, self.encryption_type, p.tun_if)
2018 self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
2022 p = self.ipv4_params
2023 p.tun_if.unconfig_ip4()
2024 super(TestIpsecGreIfEspTra, self).tearDown()
2026 def test_gre_non_ip(self):
2027 p = self.ipv4_params
2028 tx = self.gen_encrypt_non_ip_pkts(
2031 src=p.remote_tun_if_host,
2032 dst=self.pg1.remote_ip6,
2034 self.send_and_assert_no_replies(self.tun_if, tx)
2035 node_name = "/err/%s/unsup_payload" % self.tun4_decrypt_node_name[0]
2036 self.assertEqual(1, self.statistics.get_err_counter(node_name))
2037 err = p.tun_sa_in.get_err("unsup_payload")
2038 self.assertEqual(err, 1)
2041 class TestIpsecGre6IfEspTra(TemplateIpsec, IpsecTun6Tests):
2042 """Ipsec GRE ESP - TRA tests"""
2044 tun6_encrypt_node_name = "esp6-encrypt-tun"
2045 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2046 encryption_type = ESP
2048 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2050 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2052 IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
2054 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2055 / UDP(sport=1144, dport=2233)
2056 / Raw(b"X" * payload_size)
2058 for i in range(count)
2061 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2063 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2064 / IPv6(src="1::1", dst="1::2")
2065 / UDP(sport=1144, dport=2233)
2066 / Raw(b"X" * payload_size)
2067 for i in range(count)
2070 def verify_decrypted6(self, p, rxs):
2072 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2073 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2075 def verify_encrypted6(self, p, sa, rxs):
2078 pkt = sa.decrypt(rx[IPv6])
2079 if not pkt.haslayer(IPv6):
2080 pkt = IPv6(pkt[Raw].load)
2081 self.assert_packet_checksums_valid(pkt)
2082 self.assertTrue(pkt.haslayer(GRE))
2084 self.assertEqual(e[IPv6].dst, "1::2")
2085 except (IndexError, AssertionError):
2086 self.logger.debug(ppp("Unexpected packet:", rx))
2088 self.logger.debug(ppp("Decrypted packet:", pkt))
2094 super(TestIpsecGre6IfEspTra, self).setUp()
2096 self.tun_if = self.pg0
2098 p = self.ipv6_params
2100 bd1 = VppBridgeDomain(self, 1)
2101 bd1.add_vpp_config()
2103 p.tun_sa_out = VppIpsecSA(
2109 p.crypt_algo_vpp_id,
2111 self.vpp_esp_protocol,
2113 p.tun_sa_out.add_vpp_config()
2115 p.tun_sa_in = VppIpsecSA(
2121 p.crypt_algo_vpp_id,
2123 self.vpp_esp_protocol,
2125 p.tun_sa_in.add_vpp_config()
2127 p.tun_if = VppGreInterface(self, self.pg0.local_ip6, self.pg0.remote_ip6)
2128 p.tun_if.add_vpp_config()
2130 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2131 p.tun_protect.add_vpp_config()
2134 p.tun_if.config_ip6()
2135 config_tra_params(p, self.encryption_type, p.tun_if)
2143 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
2150 p = self.ipv6_params
2151 p.tun_if.unconfig_ip6()
2152 super(TestIpsecGre6IfEspTra, self).tearDown()
2155 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
2156 """Ipsec mGRE ESP v4 TRA tests"""
2158 tun4_encrypt_node_name = "esp4-encrypt-tun"
2159 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2160 encryption_type = ESP
2162 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2164 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2166 IP(src=p.tun_dst, dst=self.pg0.local_ip4)
2168 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
2169 / UDP(sport=1144, dport=2233)
2170 / Raw(b"X" * payload_size)
2172 for i in range(count)
2175 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2177 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2178 / IP(src="1.1.1.1", dst=dst)
2179 / UDP(sport=1144, dport=2233)
2180 / Raw(b"X" * payload_size)
2181 for i in range(count)
2184 def verify_decrypted(self, p, rxs):
2186 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2187 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2189 def verify_encrypted(self, p, sa, rxs):
2192 pkt = sa.decrypt(rx[IP])
2193 if not pkt.haslayer(IP):
2194 pkt = IP(pkt[Raw].load)
2195 self.assert_packet_checksums_valid(pkt)
2196 self.assertTrue(pkt.haslayer(GRE))
2198 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2199 except (IndexError, AssertionError):
2200 self.logger.debug(ppp("Unexpected packet:", rx))
2202 self.logger.debug(ppp("Decrypted packet:", pkt))
2208 super(TestIpsecMGreIfEspTra4, self).setUp()
2211 self.tun_if = self.pg0
2212 p = self.ipv4_params
2213 p.tun_if = VppGreInterface(
2217 mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2219 p.tun_if.add_vpp_config()
2221 p.tun_if.config_ip4()
2222 p.tun_if.generate_remote_hosts(N_NHS)
2223 self.pg0.generate_remote_hosts(N_NHS)
2224 self.pg0.configure_ipv4_neighbors()
2226 # setup some SAs for several next-hops on the interface
2227 self.multi_params = []
2229 for ii in range(N_NHS):
2230 p = copy.copy(self.ipv4_params)
2232 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2233 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2234 p.scapy_tun_spi = p.scapy_tun_spi + ii
2235 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2236 p.vpp_tun_spi = p.vpp_tun_spi + ii
2238 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2239 p.scapy_tra_spi = p.scapy_tra_spi + ii
2240 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2241 p.vpp_tra_spi = p.vpp_tra_spi + ii
2242 p.tun_sa_out = VppIpsecSA(
2248 p.crypt_algo_vpp_id,
2250 self.vpp_esp_protocol,
2252 p.tun_sa_out.add_vpp_config()
2254 p.tun_sa_in = VppIpsecSA(
2260 p.crypt_algo_vpp_id,
2262 self.vpp_esp_protocol,
2264 p.tun_sa_in.add_vpp_config()
2266 p.tun_protect = VppIpsecTunProtect(
2271 nh=p.tun_if.remote_hosts[ii].ip4,
2273 p.tun_protect.add_vpp_config()
2274 config_tra_params(p, self.encryption_type, p.tun_if)
2275 self.multi_params.append(p)
2279 p.remote_tun_if_host,
2281 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
2284 # in this v4 variant add the teibs after the protect
2288 p.tun_if.remote_hosts[ii].ip4,
2289 self.pg0.remote_hosts[ii].ip4,
2291 p.tun_dst = self.pg0.remote_hosts[ii].ip4
2292 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2295 p = self.ipv4_params
2296 p.tun_if.unconfig_ip4()
2297 super(TestIpsecMGreIfEspTra4, self).tearDown()
2299 def test_tun_44(self):
2302 for p in self.multi_params:
2303 self.verify_tun_44(p, count=N_PKTS)
2304 p.teib.remove_vpp_config()
2305 self.verify_tun_dropped_44(p, count=N_PKTS)
2306 p.teib.add_vpp_config()
2307 self.verify_tun_44(p, count=N_PKTS)
2310 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
2311 """Ipsec mGRE ESP v6 TRA tests"""
2313 tun6_encrypt_node_name = "esp6-encrypt-tun"
2314 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2315 encryption_type = ESP
2317 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2319 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2321 IPv6(src=p.tun_dst, dst=self.pg0.local_ip6)
2323 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2324 / UDP(sport=1144, dport=2233)
2325 / Raw(b"X" * payload_size)
2327 for i in range(count)
2330 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2332 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2333 / IPv6(src="1::1", dst=dst)
2334 / UDP(sport=1144, dport=2233)
2335 / Raw(b"X" * payload_size)
2336 for i in range(count)
2339 def verify_decrypted6(self, p, rxs):
2341 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2342 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2344 def verify_encrypted6(self, p, sa, rxs):
2347 pkt = sa.decrypt(rx[IPv6])
2348 if not pkt.haslayer(IPv6):
2349 pkt = IPv6(pkt[Raw].load)
2350 self.assert_packet_checksums_valid(pkt)
2351 self.assertTrue(pkt.haslayer(GRE))
2353 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
2354 except (IndexError, AssertionError):
2355 self.logger.debug(ppp("Unexpected packet:", rx))
2357 self.logger.debug(ppp("Decrypted packet:", pkt))
2363 super(TestIpsecMGreIfEspTra6, self).setUp()
2365 self.vapi.cli("set logging class ipsec level debug")
2368 self.tun_if = self.pg0
2369 p = self.ipv6_params
2370 p.tun_if = VppGreInterface(
2374 mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2376 p.tun_if.add_vpp_config()
2378 p.tun_if.config_ip6()
2379 p.tun_if.generate_remote_hosts(N_NHS)
2380 self.pg0.generate_remote_hosts(N_NHS)
2381 self.pg0.configure_ipv6_neighbors()
2383 # setup some SAs for several next-hops on the interface
2384 self.multi_params = []
2386 for ii in range(N_NHS):
2387 p = copy.copy(self.ipv6_params)
2389 p.remote_tun_if_host = "1::%d" % (ii + 1)
2390 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2391 p.scapy_tun_spi = p.scapy_tun_spi + ii
2392 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2393 p.vpp_tun_spi = p.vpp_tun_spi + ii
2395 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2396 p.scapy_tra_spi = p.scapy_tra_spi + ii
2397 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2398 p.vpp_tra_spi = p.vpp_tra_spi + ii
2399 p.tun_sa_out = VppIpsecSA(
2405 p.crypt_algo_vpp_id,
2407 self.vpp_esp_protocol,
2409 p.tun_sa_out.add_vpp_config()
2411 p.tun_sa_in = VppIpsecSA(
2417 p.crypt_algo_vpp_id,
2419 self.vpp_esp_protocol,
2421 p.tun_sa_in.add_vpp_config()
2423 # in this v6 variant add the teibs first then the protection
2424 p.tun_dst = self.pg0.remote_hosts[ii].ip6
2426 self, p.tun_if, p.tun_if.remote_hosts[ii].ip6, p.tun_dst
2429 p.tun_protect = VppIpsecTunProtect(
2434 nh=p.tun_if.remote_hosts[ii].ip6,
2436 p.tun_protect.add_vpp_config()
2437 config_tra_params(p, self.encryption_type, p.tun_if)
2438 self.multi_params.append(p)
2442 p.remote_tun_if_host,
2444 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6, p.tun_if.sw_if_index)],
2446 p.tun_dst = self.pg0.remote_hosts[ii].ip6
2448 self.logger.info(self.vapi.cli("sh log"))
2449 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2450 self.logger.info(self.vapi.cli("sh adj 41"))
2453 p = self.ipv6_params
2454 p.tun_if.unconfig_ip6()
2455 super(TestIpsecMGreIfEspTra6, self).tearDown()
2457 def test_tun_66(self):
2459 for p in self.multi_params:
2460 self.verify_tun_66(p, count=63)
2463 @tag_fixme_vpp_workers
2464 class TestIpsec4TunProtect(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2465 """IPsec IPv4 Tunnel protect - transport mode"""
2468 super(TestIpsec4TunProtect, self).setUp()
2470 self.tun_if = self.pg0
2473 super(TestIpsec4TunProtect, self).tearDown()
2475 def test_tun_44(self):
2476 """IPSEC tunnel protect"""
2478 p = self.ipv4_params
2480 self.config_network(p)
2481 self.config_sa_tra(p)
2482 self.config_protect(p)
2484 self.verify_tun_44(p, count=127)
2485 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2486 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2488 self.vapi.cli("clear ipsec sa")
2489 self.verify_tun_64(p, count=127)
2490 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2491 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2493 # rekey - create new SAs and update the tunnel protection
2495 np.crypt_key = b"X" + p.crypt_key[1:]
2496 np.scapy_tun_spi += 100
2497 np.scapy_tun_sa_id += 1
2498 np.vpp_tun_spi += 100
2499 np.vpp_tun_sa_id += 1
2500 np.tun_if.local_spi = p.vpp_tun_spi
2501 np.tun_if.remote_spi = p.scapy_tun_spi
2503 self.config_sa_tra(np)
2504 self.config_protect(np)
2507 self.verify_tun_44(np, count=127)
2508 self.assertEqual(p.tun_if.get_rx_stats(), 381)
2509 self.assertEqual(p.tun_if.get_tx_stats(), 381)
2512 self.unconfig_protect(np)
2513 self.unconfig_sa(np)
2514 self.unconfig_network(p)
2517 @tag_fixme_vpp_workers
2518 class TestIpsec4TunProtectTfc(TemplateIpsec4TunTfc, TestIpsec4TunProtect):
2519 """IPsec IPv4 Tunnel protect with TFC - transport mode"""
2522 @tag_fixme_vpp_workers
2523 class TestIpsec4TunProtectUdp(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2524 """IPsec IPv4 UDP Tunnel protect - transport mode"""
2527 super(TestIpsec4TunProtectUdp, self).setUp()
2529 self.tun_if = self.pg0
2531 p = self.ipv4_params
2532 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
2533 p.nat_header = UDP(sport=4500, dport=4500)
2534 self.config_network(p)
2535 self.config_sa_tra(p)
2536 self.config_protect(p)
2539 p = self.ipv4_params
2540 self.unconfig_protect(p)
2542 self.unconfig_network(p)
2543 super(TestIpsec4TunProtectUdp, self).tearDown()
2545 def verify_encrypted(self, p, sa, rxs):
2546 # ensure encrypted packets are recieved with the default UDP ports
2548 self.assertEqual(rx[UDP].sport, 4500)
2549 self.assertEqual(rx[UDP].dport, 4500)
2550 super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2552 def test_tun_44(self):
2553 """IPSEC UDP tunnel protect"""
2555 p = self.ipv4_params
2557 self.verify_tun_44(p, count=127)
2558 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2559 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2561 def test_keepalive(self):
2562 """IPSEC NAT Keepalive"""
2563 self.verify_keepalive(self.ipv4_params)
2566 @tag_fixme_vpp_workers
2567 class TestIpsec4TunProtectUdpTfc(TemplateIpsec4TunTfc, TestIpsec4TunProtectUdp):
2568 """IPsec IPv4 UDP Tunnel protect with TFC - transport mode"""
2571 @tag_fixme_vpp_workers
2572 class TestIpsec4TunProtectTun(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2573 """IPsec IPv4 Tunnel protect - tunnel mode"""
2575 encryption_type = ESP
2576 tun4_encrypt_node_name = "esp4-encrypt-tun"
2577 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2580 super(TestIpsec4TunProtectTun, self).setUp()
2582 self.tun_if = self.pg0
2585 super(TestIpsec4TunProtectTun, self).tearDown()
2587 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2589 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2591 IP(src=sw_intf.remote_ip4, dst=sw_intf.local_ip4)
2592 / IP(src=src, dst=dst)
2593 / UDP(sport=1144, dport=2233)
2594 / Raw(b"X" * payload_size)
2596 for i in range(count)
2599 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2601 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2602 / IP(src=src, dst=dst)
2603 / UDP(sport=1144, dport=2233)
2604 / Raw(b"X" * payload_size)
2605 for i in range(count)
2608 def verify_decrypted(self, p, rxs):
2610 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2611 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2612 self.assert_packet_checksums_valid(rx)
2614 def verify_encrypted(self, p, sa, rxs):
2617 pkt = sa.decrypt(rx[IP])
2618 if not pkt.haslayer(IP):
2619 pkt = IP(pkt[Raw].load)
2620 self.assert_packet_checksums_valid(pkt)
2621 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2622 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2623 inner = pkt[IP].payload
2624 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2626 except (IndexError, AssertionError):
2627 self.logger.debug(ppp("Unexpected packet:", rx))
2629 self.logger.debug(ppp("Decrypted packet:", pkt))
2634 def test_tun_44(self):
2635 """IPSEC tunnel protect"""
2637 p = self.ipv4_params
2639 self.config_network(p)
2640 self.config_sa_tun(p)
2641 self.config_protect(p)
2643 # also add an output features on the tunnel and physical interface
2644 # so we test they still work
2645 r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
2646 a = VppAcl(self, [r_all]).add_vpp_config()
2648 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2649 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2651 self.verify_tun_44(p, count=127)
2653 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2654 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2656 # rekey - create new SAs and update the tunnel protection
2658 np.crypt_key = b"X" + p.crypt_key[1:]
2659 np.scapy_tun_spi += 100
2660 np.scapy_tun_sa_id += 1
2661 np.vpp_tun_spi += 100
2662 np.vpp_tun_sa_id += 1
2663 np.tun_if.local_spi = p.vpp_tun_spi
2664 np.tun_if.remote_spi = p.scapy_tun_spi
2666 self.config_sa_tun(np)
2667 self.config_protect(np)
2670 self.verify_tun_44(np, count=127)
2671 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2672 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2675 self.unconfig_protect(np)
2676 self.unconfig_sa(np)
2677 self.unconfig_network(p)
2680 class TestIpsec4TunProtectTunDrop(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2681 """IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2683 encryption_type = ESP
2684 tun4_encrypt_node_name = "esp4-encrypt-tun"
2685 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2688 super(TestIpsec4TunProtectTunDrop, self).setUp()
2690 self.tun_if = self.pg0
2693 super(TestIpsec4TunProtectTunDrop, self).tearDown()
2695 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2697 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2699 IP(src=sw_intf.remote_ip4, dst="5.5.5.5")
2700 / IP(src=src, dst=dst)
2701 / UDP(sport=1144, dport=2233)
2702 / Raw(b"X" * payload_size)
2704 for i in range(count)
2707 def test_tun_drop_44(self):
2708 """IPSEC tunnel protect bogus tunnel header"""
2710 p = self.ipv4_params
2712 self.config_network(p)
2713 self.config_sa_tun(p)
2714 self.config_protect(p)
2716 tx = self.gen_encrypt_pkts(
2720 src=p.remote_tun_if_host,
2721 dst=self.pg1.remote_ip4,
2724 self.send_and_assert_no_replies(self.tun_if, tx)
2727 self.unconfig_protect(p)
2729 self.unconfig_network(p)
2732 @tag_fixme_vpp_workers
2733 class TestIpsec6TunProtect(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2734 """IPsec IPv6 Tunnel protect - transport mode"""
2736 encryption_type = ESP
2737 tun6_encrypt_node_name = "esp6-encrypt-tun"
2738 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2741 super(TestIpsec6TunProtect, self).setUp()
2743 self.tun_if = self.pg0
2746 super(TestIpsec6TunProtect, self).tearDown()
2748 def test_tun_66(self):
2749 """IPSEC tunnel protect 6o6"""
2751 p = self.ipv6_params
2753 self.config_network(p)
2754 self.config_sa_tra(p)
2755 self.config_protect(p)
2757 self.verify_tun_66(p, count=127)
2758 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2759 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2761 # rekey - create new SAs and update the tunnel protection
2763 np.crypt_key = b"X" + p.crypt_key[1:]
2764 np.scapy_tun_spi += 100
2765 np.scapy_tun_sa_id += 1
2766 np.vpp_tun_spi += 100
2767 np.vpp_tun_sa_id += 1
2768 np.tun_if.local_spi = p.vpp_tun_spi
2769 np.tun_if.remote_spi = p.scapy_tun_spi
2771 self.config_sa_tra(np)
2772 self.config_protect(np)
2775 self.verify_tun_66(np, count=127)
2776 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2777 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2779 # bounce the interface state
2780 p.tun_if.admin_down()
2781 self.verify_drop_tun_66(np, count=127)
2782 node = "/err/ipsec6-tun-input/disabled"
2783 self.assertEqual(127, self.statistics.get_err_counter(node))
2785 self.verify_tun_66(np, count=127)
2788 # 1) add two input SAs [old, new]
2789 # 2) swap output SA to [new]
2790 # 3) use only [new] input SA
2792 np3.crypt_key = b"Z" + p.crypt_key[1:]
2793 np3.scapy_tun_spi += 100
2794 np3.scapy_tun_sa_id += 1
2795 np3.vpp_tun_spi += 100
2796 np3.vpp_tun_sa_id += 1
2797 np3.tun_if.local_spi = p.vpp_tun_spi
2798 np3.tun_if.remote_spi = p.scapy_tun_spi
2800 self.config_sa_tra(np3)
2803 p.tun_protect.update_vpp_config(np.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2804 self.verify_tun_66(np, np, count=127)
2805 self.verify_tun_66(np3, np, count=127)
2808 p.tun_protect.update_vpp_config(np3.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2809 self.verify_tun_66(np, np3, count=127)
2810 self.verify_tun_66(np3, np3, count=127)
2813 p.tun_protect.update_vpp_config(np3.tun_sa_out, [np3.tun_sa_in])
2814 self.verify_tun_66(np3, np3, count=127)
2815 self.verify_drop_tun_rx_66(np, count=127)
2817 self.assertEqual(p.tun_if.get_rx_stats(), 127 * 9)
2818 self.assertEqual(p.tun_if.get_tx_stats(), 127 * 8)
2819 self.unconfig_sa(np)
2822 self.unconfig_protect(np3)
2823 self.unconfig_sa(np3)
2824 self.unconfig_network(p)
2826 def test_tun_46(self):
2827 """IPSEC tunnel protect 4o6"""
2829 p = self.ipv6_params
2831 self.config_network(p)
2832 self.config_sa_tra(p)
2833 self.config_protect(p)
2835 self.verify_tun_46(p, count=127)
2836 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2837 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2840 self.unconfig_protect(p)
2842 self.unconfig_network(p)
2845 @tag_fixme_vpp_workers
2846 class TestIpsec6TunProtectTfc(TemplateIpsec6TunTfc, TestIpsec6TunProtect):
2847 """IPsec IPv6 Tunnel protect with TFC - transport mode"""
2850 @tag_fixme_vpp_workers
2851 class TestIpsec6TunProtectTun(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2852 """IPsec IPv6 Tunnel protect - tunnel mode"""
2854 encryption_type = ESP
2855 tun6_encrypt_node_name = "esp6-encrypt-tun"
2856 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2859 super(TestIpsec6TunProtectTun, self).setUp()
2861 self.tun_if = self.pg0
2864 super(TestIpsec6TunProtectTun, self).tearDown()
2866 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2868 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2870 IPv6(src=sw_intf.remote_ip6, dst=sw_intf.local_ip6)
2871 / IPv6(src=src, dst=dst)
2872 / UDP(sport=1166, dport=2233)
2873 / Raw(b"X" * payload_size)
2875 for i in range(count)
2878 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2880 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2881 / IPv6(src=src, dst=dst)
2882 / UDP(sport=1166, dport=2233)
2883 / Raw(b"X" * payload_size)
2884 for i in range(count)
2887 def verify_decrypted6(self, p, rxs):
2889 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2890 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2891 self.assert_packet_checksums_valid(rx)
2893 def verify_encrypted6(self, p, sa, rxs):
2896 pkt = sa.decrypt(rx[IPv6])
2897 if not pkt.haslayer(IPv6):
2898 pkt = IPv6(pkt[Raw].load)
2899 self.assert_packet_checksums_valid(pkt)
2900 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2901 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2902 inner = pkt[IPv6].payload
2903 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2905 except (IndexError, AssertionError):
2906 self.logger.debug(ppp("Unexpected packet:", rx))
2908 self.logger.debug(ppp("Decrypted packet:", pkt))
2913 def test_tun_66(self):
2914 """IPSEC tunnel protect"""
2916 p = self.ipv6_params
2918 self.config_network(p)
2919 self.config_sa_tun(p)
2920 self.config_protect(p)
2922 self.verify_tun_66(p, count=127)
2924 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2925 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2927 # rekey - create new SAs and update the tunnel protection
2929 np.crypt_key = b"X" + p.crypt_key[1:]
2930 np.scapy_tun_spi += 100
2931 np.scapy_tun_sa_id += 1
2932 np.vpp_tun_spi += 100
2933 np.vpp_tun_sa_id += 1
2934 np.tun_if.local_spi = p.vpp_tun_spi
2935 np.tun_if.remote_spi = p.scapy_tun_spi
2937 self.config_sa_tun(np)
2938 self.config_protect(np)
2941 self.verify_tun_66(np, count=127)
2942 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2943 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2946 self.unconfig_protect(np)
2947 self.unconfig_sa(np)
2948 self.unconfig_network(p)
2951 class TestIpsec6TunProtectTunDrop(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2952 """IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2954 encryption_type = ESP
2955 tun6_encrypt_node_name = "esp6-encrypt-tun"
2956 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2959 super(TestIpsec6TunProtectTunDrop, self).setUp()
2961 self.tun_if = self.pg0
2964 super(TestIpsec6TunProtectTunDrop, self).tearDown()
2966 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2967 # the IP destination of the revelaed packet does not match
2968 # that assigned to the tunnel
2970 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2972 IPv6(src=sw_intf.remote_ip6, dst="5::5")
2973 / IPv6(src=src, dst=dst)
2974 / UDP(sport=1144, dport=2233)
2975 / Raw(b"X" * payload_size)
2977 for i in range(count)
2980 def test_tun_drop_66(self):
2981 """IPSEC 6 tunnel protect bogus tunnel header"""
2983 p = self.ipv6_params
2985 self.config_network(p)
2986 self.config_sa_tun(p)
2987 self.config_protect(p)
2989 tx = self.gen_encrypt_pkts6(
2993 src=p.remote_tun_if_host,
2994 dst=self.pg1.remote_ip6,
2997 self.send_and_assert_no_replies(self.tun_if, tx)
2999 self.unconfig_protect(p)
3001 self.unconfig_network(p)
3004 class TemplateIpsecItf4(object):
3005 """IPsec Interface IPv4"""
3007 encryption_type = ESP
3008 tun4_encrypt_node_name = "esp4-encrypt-tun"
3009 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3010 tun4_input_node = "ipsec4-tun-input"
3012 def config_sa_tun(self, p, src, dst):
3013 config_tun_params(p, self.encryption_type, None, src, dst)
3015 p.tun_sa_out = VppIpsecSA(
3021 p.crypt_algo_vpp_id,
3023 self.vpp_esp_protocol,
3028 p.tun_sa_out.add_vpp_config()
3030 p.tun_sa_in = VppIpsecSA(
3036 p.crypt_algo_vpp_id,
3038 self.vpp_esp_protocol,
3042 | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
3044 p.tun_sa_in.add_vpp_config()
3046 def config_protect(self, p):
3047 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
3048 p.tun_protect.add_vpp_config()
3050 def config_network(self, p, instance=0xFFFFFFFF):
3051 p.tun_if = VppIpsecInterface(self, instance=instance)
3053 p.tun_if.add_vpp_config()
3055 p.tun_if.config_ip4()
3056 p.tun_if.config_ip6()
3058 p.route = VppIpRoute(
3060 p.remote_tun_if_host,
3062 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3064 p.route.add_vpp_config()
3067 p.remote_tun_if_host6,
3071 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3077 def unconfig_network(self, p):
3078 p.route.remove_vpp_config()
3079 p.tun_if.remove_vpp_config()
3081 def unconfig_protect(self, p):
3082 p.tun_protect.remove_vpp_config()
3084 def unconfig_sa(self, p):
3085 p.tun_sa_out.remove_vpp_config()
3086 p.tun_sa_in.remove_vpp_config()
3089 @tag_fixme_vpp_workers
3090 class TestIpsecItf4(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3091 """IPsec Interface IPv4"""
3094 super(TestIpsecItf4, self).setUp()
3096 self.tun_if = self.pg0
3099 super(TestIpsecItf4, self).tearDown()
3101 def test_tun_instance_44(self):
3102 p = self.ipv4_params
3103 self.config_network(p, instance=3)
3105 with self.assertRaises(CliFailedCommandError):
3106 self.vapi.cli("show interface ipsec0")
3108 output = self.vapi.cli("show interface ipsec3")
3109 self.assertTrue("unknown" not in output)
3111 self.unconfig_network(p)
3113 def test_tun_44(self):
3114 """IPSEC interface IPv4"""
3117 p = self.ipv4_params
3119 self.config_network(p)
3121 p, self.encryption_type, None, self.pg0.local_ip4, self.pg0.remote_ip4
3123 self.verify_tun_dropped_44(p, count=n_pkts)
3124 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3125 self.config_protect(p)
3127 self.verify_tun_44(p, count=n_pkts)
3128 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3129 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3131 p.tun_if.admin_down()
3132 self.verify_tun_dropped_44(p, count=n_pkts)
3134 self.verify_tun_44(p, count=n_pkts)
3136 self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3137 self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3139 # it's a v6 packet when its encrypted
3140 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
3142 self.verify_tun_64(p, count=n_pkts)
3143 self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3144 self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3146 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
3148 # update the SA tunnel
3150 p, self.encryption_type, None, self.pg2.local_ip4, self.pg2.remote_ip4
3152 p.tun_sa_in.update_vpp_config(
3153 is_tun=True, tun_src=self.pg2.remote_ip4, tun_dst=self.pg2.local_ip4
3155 p.tun_sa_out.update_vpp_config(
3156 is_tun=True, tun_src=self.pg2.local_ip4, tun_dst=self.pg2.remote_ip4
3158 self.verify_tun_44(p, count=n_pkts)
3159 self.assertEqual(p.tun_if.get_rx_stats(), 5 * n_pkts)
3160 self.assertEqual(p.tun_if.get_tx_stats(), 4 * n_pkts)
3162 self.vapi.cli("clear interfaces")
3164 # rekey - create new SAs and update the tunnel protection
3166 np.crypt_key = b"X" + p.crypt_key[1:]
3167 np.scapy_tun_spi += 100
3168 np.scapy_tun_sa_id += 1
3169 np.vpp_tun_spi += 100
3170 np.vpp_tun_sa_id += 1
3171 np.tun_if.local_spi = p.vpp_tun_spi
3172 np.tun_if.remote_spi = p.scapy_tun_spi
3174 self.config_sa_tun(np, self.pg0.local_ip4, self.pg0.remote_ip4)
3175 self.config_protect(np)
3178 self.verify_tun_44(np, count=n_pkts)
3179 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3180 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3183 self.unconfig_protect(np)
3184 self.unconfig_sa(np)
3185 self.unconfig_network(p)
3187 def test_tun_44_null(self):
3188 """IPSEC interface IPv4 NULL auth/crypto"""
3191 p = copy.copy(self.ipv4_params)
3193 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
3194 p.crypt_algo_vpp_id = (
3195 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
3197 p.crypt_algo = "NULL"
3198 p.auth_algo = "NULL"
3200 self.config_network(p)
3201 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3202 self.config_protect(p)
3204 self.logger.info(self.vapi.cli("sh ipsec sa"))
3205 self.verify_tun_44(p, count=n_pkts)
3208 self.unconfig_protect(p)
3210 self.unconfig_network(p)
3212 def test_tun_44_police(self):
3213 """IPSEC interface IPv4 with input policer"""
3215 p = self.ipv4_params
3217 self.config_network(p)
3218 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3219 self.config_protect(p)
3221 action_tx = PolicerAction(
3222 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3224 policer = VppPolicer(
3231 conform_action=action_tx,
3232 exceed_action=action_tx,
3233 violate_action=action_tx,
3235 policer.add_vpp_config()
3237 # Start policing on tun
3238 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3240 self.verify_tun_44(p, count=n_pkts)
3241 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3242 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3244 stats = policer.get_stats()
3246 # Single rate, 2 colour policer - expect conform, violate but no exceed
3247 self.assertGreater(stats["conform_packets"], 0)
3248 self.assertEqual(stats["exceed_packets"], 0)
3249 self.assertGreater(stats["violate_packets"], 0)
3251 # Stop policing on tun
3252 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3253 self.verify_tun_44(p, count=n_pkts)
3255 # No new policer stats
3256 statsnew = policer.get_stats()
3257 self.assertEqual(stats, statsnew)
3260 policer.remove_vpp_config()
3261 self.unconfig_protect(p)
3263 self.unconfig_network(p)
3266 @tag_fixme_vpp_workers
3267 class TestIpsecItf4Tfc(TemplateIpsec4TunTfc, TestIpsecItf4):
3268 """IPsec Interface IPv4 with TFC"""
3271 class TestIpsecItf4MPLS(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3272 """IPsec Interface MPLSoIPv4"""
3274 tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
3277 super(TestIpsecItf4MPLS, self).setUp()
3279 self.tun_if = self.pg0
3282 super(TestIpsecItf4MPLS, self).tearDown()
3284 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3286 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3288 MPLS(label=44, ttl=3)
3289 / IP(src=src, dst=dst)
3290 / UDP(sport=1166, dport=2233)
3291 / Raw(b"X" * payload_size)
3293 for i in range(count)
3296 def verify_encrypted(self, p, sa, rxs):
3299 pkt = sa.decrypt(rx[IP])
3300 if not pkt.haslayer(IP):
3301 pkt = IP(pkt[Raw].load)
3302 self.assert_packet_checksums_valid(pkt)
3303 self.assert_equal(pkt[MPLS].label, 44)
3304 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
3305 except (IndexError, AssertionError):
3306 self.logger.debug(ppp("Unexpected packet:", rx))
3308 self.logger.debug(ppp("Decrypted packet:", pkt))
3313 def test_tun_mpls_o_ip4(self):
3314 """IPSEC interface MPLS over IPv4"""
3317 p = self.ipv4_params
3320 tbl = VppMplsTable(self, 0)
3321 tbl.add_vpp_config()
3323 self.config_network(p)
3324 # deag MPLS routes from the tunnel
3326 self, 44, 1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
3331 p.tun_if.remote_ip4, p.tun_if.sw_if_index, labels=[VppMplsLabel(44)]
3335 p.tun_if.enable_mpls()
3337 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3338 self.config_protect(p)
3340 self.verify_tun_44(p, count=n_pkts)
3343 p.tun_if.disable_mpls()
3344 self.unconfig_protect(p)
3346 self.unconfig_network(p)
3349 class TemplateIpsecItf6(object):
3350 """IPsec Interface IPv6"""
3352 encryption_type = ESP
3353 tun6_encrypt_node_name = "esp6-encrypt-tun"
3354 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
3355 tun6_input_node = "ipsec6-tun-input"
3357 def config_sa_tun(self, p, src, dst):
3358 config_tun_params(p, self.encryption_type, None, src, dst)
3360 if not hasattr(p, "tun_flags"):
3362 if not hasattr(p, "hop_limit"):
3365 p.tun_sa_out = VppIpsecSA(
3371 p.crypt_algo_vpp_id,
3373 self.vpp_esp_protocol,
3377 tun_flags=p.tun_flags,
3378 hop_limit=p.hop_limit,
3380 p.tun_sa_out.add_vpp_config()
3382 p.tun_sa_in = VppIpsecSA(
3388 p.crypt_algo_vpp_id,
3390 self.vpp_esp_protocol,
3395 p.tun_sa_in.add_vpp_config()
3397 def config_protect(self, p):
3398 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
3399 p.tun_protect.add_vpp_config()
3401 def config_network(self, p):
3402 p.tun_if = VppIpsecInterface(self)
3404 p.tun_if.add_vpp_config()
3406 p.tun_if.config_ip4()
3407 p.tun_if.config_ip6()
3411 p.remote_tun_if_host4,
3413 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3417 p.route = VppIpRoute(
3419 p.remote_tun_if_host,
3423 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3427 p.route.add_vpp_config()
3429 def unconfig_network(self, p):
3430 p.route.remove_vpp_config()
3431 p.tun_if.remove_vpp_config()
3433 def unconfig_protect(self, p):
3434 p.tun_protect.remove_vpp_config()
3436 def unconfig_sa(self, p):
3437 p.tun_sa_out.remove_vpp_config()
3438 p.tun_sa_in.remove_vpp_config()
3441 @tag_fixme_vpp_workers
3442 class TestIpsecItf6(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3443 """IPsec Interface IPv6"""
3446 super(TestIpsecItf6, self).setUp()
3448 self.tun_if = self.pg0
3451 super(TestIpsecItf6, self).tearDown()
3453 def test_tun_66(self):
3454 """IPSEC interface IPv6"""
3456 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3458 p = self.ipv6_params
3459 p.inner_hop_limit = 24
3460 p.outer_hop_limit = 23
3461 p.outer_flow_label = 243224
3462 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3464 self.config_network(p)
3466 p, self.encryption_type, None, self.pg0.local_ip6, self.pg0.remote_ip6
3468 self.verify_drop_tun_66(p, count=n_pkts)
3469 self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3470 self.config_protect(p)
3472 self.verify_tun_66(p, count=n_pkts)
3473 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3474 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3476 p.tun_if.admin_down()
3477 self.verify_drop_tun_66(p, count=n_pkts)
3479 self.verify_tun_66(p, count=n_pkts)
3481 self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3482 self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3484 # it's a v4 packet when its encrypted
3485 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
3487 self.verify_tun_46(p, count=n_pkts)
3488 self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3489 self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3491 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
3493 self.vapi.cli("clear interfaces")
3495 # rekey - create new SAs and update the tunnel protection
3497 np.crypt_key = b"X" + p.crypt_key[1:]
3498 np.scapy_tun_spi += 100
3499 np.scapy_tun_sa_id += 1
3500 np.vpp_tun_spi += 100
3501 np.vpp_tun_sa_id += 1
3502 np.tun_if.local_spi = p.vpp_tun_spi
3503 np.tun_if.remote_spi = p.scapy_tun_spi
3504 np.inner_hop_limit = 24
3505 np.outer_hop_limit = 128
3506 np.inner_flow_label = 0xABCDE
3507 np.outer_flow_label = 0xABCDE
3509 np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
3511 self.config_sa_tun(np, self.pg0.local_ip6, self.pg0.remote_ip6)
3512 self.config_protect(np)
3515 self.verify_tun_66(np, count=n_pkts)
3516 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3517 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3520 self.unconfig_protect(np)
3521 self.unconfig_sa(np)
3522 self.unconfig_network(p)
3524 def test_tun_66_police(self):
3525 """IPSEC interface IPv6 with input policer"""
3526 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3528 p = self.ipv6_params
3529 p.inner_hop_limit = 24
3530 p.outer_hop_limit = 23
3531 p.outer_flow_label = 243224
3532 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3534 self.config_network(p)
3535 self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3536 self.config_protect(p)
3538 action_tx = PolicerAction(
3539 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3541 policer = VppPolicer(
3548 conform_action=action_tx,
3549 exceed_action=action_tx,
3550 violate_action=action_tx,
3552 policer.add_vpp_config()
3554 # Start policing on tun
3555 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3557 self.verify_tun_66(p, count=n_pkts)
3558 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3559 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3561 stats = policer.get_stats()
3563 # Single rate, 2 colour policer - expect conform, violate but no exceed
3564 self.assertGreater(stats["conform_packets"], 0)
3565 self.assertEqual(stats["exceed_packets"], 0)
3566 self.assertGreater(stats["violate_packets"], 0)
3568 # Stop policing on tun
3569 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3570 self.verify_tun_66(p, count=n_pkts)
3572 # No new policer stats
3573 statsnew = policer.get_stats()
3574 self.assertEqual(stats, statsnew)
3577 policer.remove_vpp_config()
3578 self.unconfig_protect(p)
3580 self.unconfig_network(p)
3583 @tag_fixme_vpp_workers
3584 class TestIpsecItf6Tfc(TemplateIpsec6TunTfc, TestIpsecItf6):
3585 """IPsec Interface IPv6 with TFC"""
3588 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3589 """Ipsec P2MP ESP v4 tests"""
3591 tun4_encrypt_node_name = "esp4-encrypt-tun"
3592 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3593 encryption_type = ESP
3595 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3597 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3599 IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
3600 / UDP(sport=1144, dport=2233)
3601 / Raw(b"X" * payload_size)
3603 for i in range(count)
3606 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
3608 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3609 / IP(src="1.1.1.1", dst=dst)
3610 / UDP(sport=1144, dport=2233)
3611 / Raw(b"X" * payload_size)
3612 for i in range(count)
3615 def verify_decrypted(self, p, rxs):
3617 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3618 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3620 def verify_encrypted(self, p, sa, rxs):
3624 rx[IP].tos, VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2
3626 self.assertEqual(rx[IP].ttl, p.hop_limit)
3627 pkt = sa.decrypt(rx[IP])
3628 if not pkt.haslayer(IP):
3629 pkt = IP(pkt[Raw].load)
3630 self.assert_packet_checksums_valid(pkt)
3632 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3633 except (IndexError, AssertionError):
3634 self.logger.debug(ppp("Unexpected packet:", rx))
3636 self.logger.debug(ppp("Decrypted packet:", pkt))
3642 super(TestIpsecMIfEsp4, self).setUp()
3645 self.tun_if = self.pg0
3646 p = self.ipv4_params
3647 p.tun_if = VppIpsecInterface(
3648 self, mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP)
3650 p.tun_if.add_vpp_config()
3652 p.tun_if.config_ip4()
3653 p.tun_if.unconfig_ip4()
3654 p.tun_if.config_ip4()
3655 p.tun_if.generate_remote_hosts(N_NHS)
3656 self.pg0.generate_remote_hosts(N_NHS)
3657 self.pg0.configure_ipv4_neighbors()
3659 r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
3660 a = VppAcl(self, [r_all]).add_vpp_config()
3662 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
3663 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
3665 # setup some SAs for several next-hops on the interface
3666 self.multi_params = []
3668 for ii in range(N_NHS):
3669 p = copy.copy(self.ipv4_params)
3671 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3672 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3673 p.scapy_tun_spi = p.scapy_tun_spi + ii
3674 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3675 p.vpp_tun_spi = p.vpp_tun_spi + ii
3677 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3678 p.scapy_tra_spi = p.scapy_tra_spi + ii
3679 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3680 p.vpp_tra_spi = p.vpp_tra_spi + ii
3681 p.hop_limit = ii + 10
3682 p.tun_sa_out = VppIpsecSA(
3688 p.crypt_algo_vpp_id,
3690 self.vpp_esp_protocol,
3692 self.pg0.remote_hosts[ii].ip4,
3693 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3694 hop_limit=p.hop_limit,
3696 p.tun_sa_out.add_vpp_config()
3698 p.tun_sa_in = VppIpsecSA(
3704 p.crypt_algo_vpp_id,
3706 self.vpp_esp_protocol,
3707 self.pg0.remote_hosts[ii].ip4,
3709 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3710 hop_limit=p.hop_limit,
3712 p.tun_sa_in.add_vpp_config()
3714 p.tun_protect = VppIpsecTunProtect(
3719 nh=p.tun_if.remote_hosts[ii].ip4,
3721 p.tun_protect.add_vpp_config()
3724 self.encryption_type,
3727 self.pg0.remote_hosts[ii].ip4,
3729 self.multi_params.append(p)
3731 p.via_tun_route = VppIpRoute(
3733 p.remote_tun_if_host,
3735 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
3738 p.tun_dst = self.pg0.remote_hosts[ii].ip4
3741 p = self.ipv4_params
3742 p.tun_if.unconfig_ip4()
3743 super(TestIpsecMIfEsp4, self).tearDown()
3745 def test_tun_44(self):
3748 for p in self.multi_params:
3749 self.verify_tun_44(p, count=N_PKTS)
3751 # remove one tunnel protect, the rest should still work
3752 self.multi_params[0].tun_protect.remove_vpp_config()
3753 self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3754 self.multi_params[0].via_tun_route.remove_vpp_config()
3755 self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3757 for p in self.multi_params[1:]:
3758 self.verify_tun_44(p, count=N_PKTS)
3760 self.multi_params[0].tun_protect.add_vpp_config()
3761 self.multi_params[0].via_tun_route.add_vpp_config()
3763 for p in self.multi_params:
3764 self.verify_tun_44(p, count=N_PKTS)
3767 class TestIpsecItf6MPLS(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3768 """IPsec Interface MPLSoIPv6"""
3770 tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3773 super(TestIpsecItf6MPLS, self).setUp()
3775 self.tun_if = self.pg0
3778 super(TestIpsecItf6MPLS, self).tearDown()
3780 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3782 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3784 MPLS(label=66, ttl=3)
3785 / IPv6(src=src, dst=dst)
3786 / UDP(sport=1166, dport=2233)
3787 / Raw(b"X" * payload_size)
3789 for i in range(count)
3792 def verify_encrypted6(self, p, sa, rxs):
3795 pkt = sa.decrypt(rx[IPv6])
3796 if not pkt.haslayer(IPv6):
3797 pkt = IP(pkt[Raw].load)
3798 self.assert_packet_checksums_valid(pkt)
3799 self.assert_equal(pkt[MPLS].label, 66)
3800 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3801 except (IndexError, AssertionError):
3802 self.logger.debug(ppp("Unexpected packet:", rx))
3804 self.logger.debug(ppp("Decrypted packet:", pkt))
3809 def test_tun_mpls_o_ip6(self):
3810 """IPSEC interface MPLS over IPv6"""
3813 p = self.ipv6_params
3816 tbl = VppMplsTable(self, 0)
3817 tbl.add_vpp_config()
3819 self.config_network(p)
3820 # deag MPLS routes from the tunnel
3825 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3826 eos_proto=f.FIB_PATH_NH_PROTO_IP6,
3831 p.tun_if.remote_ip6, p.tun_if.sw_if_index, labels=[VppMplsLabel(66)]
3835 p.tun_if.enable_mpls()
3837 self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3838 self.config_protect(p)
3840 self.verify_tun_66(p, count=n_pkts)
3843 p.tun_if.disable_mpls()
3844 self.unconfig_protect(p)
3846 self.unconfig_network(p)
3849 if __name__ == "__main__":
3850 unittest.main(testRunner=VppTestRunner)