5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw, bind_layers
8 from scapy.layers.inet import IP, UDP, ICMP
9 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
10 from scapy.contrib.mpls import MPLS
11 from asfframework import VppTestRunner, tag_fixme_vpp_workers
12 from template_ipsec import (
20 IpsecTun6HandoffTests,
21 IpsecTun4HandoffTests,
24 from vpp_gre_interface import VppGreInterface
25 from vpp_ipip_tun_interface import VppIpIpTunInterface
26 from vpp_ip_route import (
35 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
36 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
37 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
38 from vpp_teib import VppTeib
40 from vpp_papi import VppEnum
41 from vpp_papi_provider import CliFailedCommandError
42 from vpp_acl import AclRule, VppAcl, VppAclInterface
43 from vpp_policer import PolicerAction, VppPolicer, Dir
46 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
47 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
49 p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
51 crypt_key = mk_scapy_crypt_key(p)
53 p.tun_dst = tun_if.remote_ip
54 p.tun_src = tun_if.local_ip
60 is_default_port = p.nat_header.dport == 4500
62 is_default_port = True
65 outbound_nat_header = p.nat_header
67 outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
68 bind_layers(UDP, ESP, dport=p.nat_header.dport)
70 p.scapy_tun_sa = SecurityAssociation(
73 crypt_algo=p.crypt_algo,
75 auth_algo=p.auth_algo,
77 tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
78 nat_t_header=outbound_nat_header,
81 p.vpp_tun_sa = SecurityAssociation(
84 crypt_algo=p.crypt_algo,
86 auth_algo=p.auth_algo,
88 tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
89 nat_t_header=p.nat_header,
94 def config_tra_params(p, encryption_type, tun_if):
95 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
97 p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
99 crypt_key = mk_scapy_crypt_key(p)
100 p.tun_dst = tun_if.remote_ip
101 p.tun_src = tun_if.local_ip
104 is_default_port = p.nat_header.dport == 4500
106 is_default_port = True
109 outbound_nat_header = p.nat_header
111 outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
112 bind_layers(UDP, ESP, dport=p.nat_header.dport)
114 p.scapy_tun_sa = SecurityAssociation(
117 crypt_algo=p.crypt_algo,
119 auth_algo=p.auth_algo,
122 nat_t_header=outbound_nat_header,
124 p.vpp_tun_sa = SecurityAssociation(
127 crypt_algo=p.crypt_algo,
129 auth_algo=p.auth_algo,
132 nat_t_header=p.nat_header,
136 class TemplateIpsec4TunProtect(object):
137 """IPsec IPv4 Tunnel protect"""
139 encryption_type = ESP
140 tun4_encrypt_node_name = "esp4-encrypt-tun"
141 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
142 tun4_input_node = "ipsec4-tun-input"
144 def config_sa_tra(self, p):
145 config_tun_params(p, self.encryption_type, p.tun_if)
147 p.tun_sa_out = VppIpsecSA(
155 self.vpp_esp_protocol,
158 p.tun_sa_out.add_vpp_config()
160 p.tun_sa_in = VppIpsecSA(
168 self.vpp_esp_protocol,
171 p.tun_sa_in.add_vpp_config()
173 def config_sa_tun(self, p):
174 config_tun_params(p, self.encryption_type, p.tun_if)
176 p.tun_sa_out = VppIpsecSA(
184 self.vpp_esp_protocol,
185 self.tun_if.local_addr[p.addr_type],
186 self.tun_if.remote_addr[p.addr_type],
189 p.tun_sa_out.add_vpp_config()
191 p.tun_sa_in = VppIpsecSA(
199 self.vpp_esp_protocol,
200 self.tun_if.remote_addr[p.addr_type],
201 self.tun_if.local_addr[p.addr_type],
204 p.tun_sa_in.add_vpp_config()
206 def config_protect(self, p):
207 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
208 p.tun_protect.add_vpp_config()
210 def config_network(self, p):
211 if hasattr(p, "tun_dst"):
214 tun_dst = self.pg0.remote_ip4
215 p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip4, tun_dst)
216 p.tun_if.add_vpp_config()
218 p.tun_if.config_ip4()
219 p.tun_if.config_ip6()
221 p.route = VppIpRoute(
223 p.remote_tun_if_host,
225 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
227 p.route.add_vpp_config()
230 p.remote_tun_if_host6,
234 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
240 def unconfig_network(self, p):
241 p.route.remove_vpp_config()
242 p.tun_if.remove_vpp_config()
244 def unconfig_protect(self, p):
245 p.tun_protect.remove_vpp_config()
247 def unconfig_sa(self, p):
248 p.tun_sa_out.remove_vpp_config()
249 p.tun_sa_in.remove_vpp_config()
252 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec):
253 """IPsec tunnel interface tests"""
255 encryption_type = ESP
259 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
262 def tearDownClass(cls):
263 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
266 super(TemplateIpsec4TunIfEsp, self).setUp()
268 self.tun_if = self.pg0
272 self.config_network(p)
273 self.config_sa_tra(p)
274 self.config_protect(p)
277 super(TemplateIpsec4TunIfEsp, self).tearDown()
280 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect, TemplateIpsec):
281 """IPsec UDP tunnel interface tests"""
283 tun4_encrypt_node_name = "esp4-encrypt-tun"
284 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
285 encryption_type = ESP
289 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
292 def tearDownClass(cls):
293 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
295 def verify_encrypted(self, p, sa, rxs):
298 # ensure the UDP ports are correct before we decrypt
300 self.assertTrue(rx.haslayer(UDP))
301 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
302 self.assert_equal(rx[UDP].dport, p.nat_header.dport)
304 pkt = sa.decrypt(rx[IP])
305 if not pkt.haslayer(IP):
306 pkt = IP(pkt[Raw].load)
308 self.assert_packet_checksums_valid(pkt)
309 self.assert_equal(pkt[IP].dst, "1.1.1.1")
310 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
311 except (IndexError, AssertionError):
312 self.logger.debug(ppp("Unexpected packet:", rx))
314 self.logger.debug(ppp("Decrypted packet:", pkt))
319 def config_sa_tra(self, p):
320 config_tun_params(p, self.encryption_type, p.tun_if)
322 p.tun_sa_out = VppIpsecSA(
330 self.vpp_esp_protocol,
332 udp_src=p.nat_header.sport,
333 udp_dst=p.nat_header.dport,
335 p.tun_sa_out.add_vpp_config()
337 p.tun_sa_in = VppIpsecSA(
345 self.vpp_esp_protocol,
347 | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
348 udp_src=p.nat_header.sport,
349 udp_dst=p.nat_header.dport,
351 p.tun_sa_in.add_vpp_config()
354 super(TemplateIpsec4TunIfEspUdp, self).setUp()
357 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
358 p.nat_header = UDP(sport=5454, dport=4500)
360 self.tun_if = self.pg0
362 self.config_network(p)
363 self.config_sa_tra(p)
364 self.config_protect(p)
367 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
370 class TemplateIpsec4TunTfc:
371 """IPsec IPv4 tunnel with TFC"""
373 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
375 IP(src=src, dst=dst, len=28 + payload_size)
377 / Raw(b"X" * payload_size)
378 / Padding(b"Y" * 100)
381 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / sa.encrypt(pkt)
382 for i in range(count)
385 def verify_decrypted(self, p, rxs):
387 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
388 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
389 self.assert_equal(rx[IP].len, len(rx[IP]))
390 self.assert_packet_checksums_valid(rx)
393 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
394 """Ipsec ESP - TUN tests"""
396 tun4_encrypt_node_name = "esp4-encrypt-tun"
397 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
399 def test_tun_basic64(self):
400 """ipsec 6o4 tunnel basic test"""
401 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
403 self.verify_tun_64(self.params[socket.AF_INET], count=1)
405 def test_tun_burst64(self):
406 """ipsec 6o4 tunnel basic test"""
407 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
409 self.verify_tun_64(self.params[socket.AF_INET], count=257)
411 def test_tun_basic_frag44(self):
412 """ipsec 4o4 tunnel frag basic test"""
413 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
417 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [1500, 0, 0, 0])
419 self.params[socket.AF_INET], count=1, payload_size=1800, n_rx=2
421 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [9000, 0, 0, 0])
424 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
425 """Ipsec ESP UDP tests"""
427 tun4_input_node = "ipsec4-tun-input"
430 super(TestIpsec4TunIfEspUdp, self).setUp()
432 def test_keepalive(self):
433 """IPSEC NAT Keepalive"""
434 self.verify_keepalive(self.ipv4_params)
437 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
438 """Ipsec ESP UDP GCM tests"""
440 tun4_input_node = "ipsec4-tun-input"
443 super(TestIpsec4TunIfEspUdpGCM, self).setUp()
445 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
446 p.crypt_algo_vpp_id = (
447 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
449 p.crypt_algo = "AES-GCM"
451 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
455 class TestIpsec4TunIfEspUdpUpdate(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
456 """Ipsec ESP UDP update tests"""
458 tun4_input_node = "ipsec4-tun-input"
461 super(TestIpsec4TunIfEspUdpUpdate, self).setUp()
463 p.nat_header = UDP(sport=6565, dport=7676)
464 config_tun_params(p, self.encryption_type, p.tun_if)
465 p.tun_sa_in.update_vpp_config(
466 udp_src=p.nat_header.dport, udp_dst=p.nat_header.sport
468 p.tun_sa_out.update_vpp_config(
469 udp_src=p.nat_header.sport, udp_dst=p.nat_header.dport
473 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
474 """Ipsec ESP - TCP tests"""
479 class TemplateIpsec6TunProtect(object):
480 """IPsec IPv6 Tunnel protect"""
482 def config_sa_tra(self, p):
483 config_tun_params(p, self.encryption_type, p.tun_if)
485 p.tun_sa_out = VppIpsecSA(
493 self.vpp_esp_protocol,
495 p.tun_sa_out.add_vpp_config()
497 p.tun_sa_in = VppIpsecSA(
505 self.vpp_esp_protocol,
507 p.tun_sa_in.add_vpp_config()
509 def config_sa_tun(self, p):
510 config_tun_params(p, self.encryption_type, p.tun_if)
512 p.tun_sa_out = VppIpsecSA(
520 self.vpp_esp_protocol,
521 self.tun_if.local_addr[p.addr_type],
522 self.tun_if.remote_addr[p.addr_type],
524 p.tun_sa_out.add_vpp_config()
526 p.tun_sa_in = VppIpsecSA(
534 self.vpp_esp_protocol,
535 self.tun_if.remote_addr[p.addr_type],
536 self.tun_if.local_addr[p.addr_type],
538 p.tun_sa_in.add_vpp_config()
540 def config_protect(self, p):
541 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
542 p.tun_protect.add_vpp_config()
544 def config_network(self, p):
545 if hasattr(p, "tun_dst"):
548 tun_dst = self.pg0.remote_ip6
549 p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip6, tun_dst)
550 p.tun_if.add_vpp_config()
552 p.tun_if.config_ip6()
553 p.tun_if.config_ip4()
555 p.route = VppIpRoute(
557 p.remote_tun_if_host,
561 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
565 p.route.add_vpp_config()
568 p.remote_tun_if_host4,
570 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
574 def unconfig_network(self, p):
575 p.route.remove_vpp_config()
576 p.tun_if.remove_vpp_config()
578 def unconfig_protect(self, p):
579 p.tun_protect.remove_vpp_config()
581 def unconfig_sa(self, p):
582 p.tun_sa_out.remove_vpp_config()
583 p.tun_sa_in.remove_vpp_config()
586 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec):
587 """IPsec tunnel interface tests"""
589 encryption_type = ESP
592 super(TemplateIpsec6TunIfEsp, self).setUp()
594 self.tun_if = self.pg0
597 self.config_network(p)
598 self.config_sa_tra(p)
599 self.config_protect(p)
602 super(TemplateIpsec6TunIfEsp, self).tearDown()
605 class TemplateIpsec6TunIfEspUdp(TemplateIpsec6TunProtect, TemplateIpsec):
606 """IPsec6 UDP tunnel interface tests"""
608 tun4_encrypt_node_name = "esp6-encrypt-tun"
609 tun4_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
610 encryption_type = ESP
614 super(TemplateIpsec6TunIfEspUdp, cls).setUpClass()
617 def tearDownClass(cls):
618 super(TemplateIpsec6TunIfEspUdp, cls).tearDownClass()
620 def verify_encrypted(self, p, sa, rxs):
623 # ensure the UDP ports are correct before we decrypt
625 self.assertTrue(rx.haslayer(UDP))
626 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
627 self.assert_equal(rx[UDP].dport, p.nat_header.dport)
629 pkt = sa.decrypt(rx[IP])
630 if not pkt.haslayer(IP):
631 pkt = IP(pkt[Raw].load)
633 self.assert_packet_checksums_valid(pkt)
635 pkt[IP].dst, "1111:1111:1111:1111:1111:1111:1111:1111"
637 self.assert_equal(pkt[IP].src, self.pg1.remote_ip6)
638 except (IndexError, AssertionError):
639 self.logger.debug(ppp("Unexpected packet:", rx))
641 self.logger.debug(ppp("Decrypted packet:", pkt))
646 def config_sa_tra(self, p):
647 config_tun_params(p, self.encryption_type, p.tun_if)
649 p.tun_sa_out = VppIpsecSA(
657 self.vpp_esp_protocol,
659 udp_src=p.nat_header.sport,
660 udp_dst=p.nat_header.dport,
662 p.tun_sa_out.add_vpp_config()
664 p.tun_sa_in = VppIpsecSA(
672 self.vpp_esp_protocol,
674 | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
675 udp_src=p.nat_header.sport,
676 udp_dst=p.nat_header.dport,
678 p.tun_sa_in.add_vpp_config()
681 super(TemplateIpsec6TunIfEspUdp, self).setUp()
684 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
685 p.nat_header = UDP(sport=5454, dport=4500)
687 self.tun_if = self.pg0
689 self.config_network(p)
690 self.config_sa_tra(p)
691 self.config_protect(p)
694 super(TemplateIpsec6TunIfEspUdp, self).tearDown()
697 class TemplateIpsec6TunTfc:
698 """IPsec IPv6 tunnel with TFC"""
700 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
702 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
704 IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
705 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
706 / Padding(b"Y" * 100)
708 for i in range(count)
711 def verify_decrypted6(self, p, rxs):
713 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
714 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
715 self.assert_equal(rx[IPv6].plen, len(rx[IPv6].payload))
716 self.assert_packet_checksums_valid(rx)
719 class TestIpsec6TunIfEspUdp(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
720 """Ipsec ESP 6 UDP tests"""
722 tun6_input_node = "ipsec6-tun-input"
723 tun6_encrypt_node_name = "esp6-encrypt-tun"
724 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
727 super(TestIpsec6TunIfEspUdp, self).setUp()
729 def test_keepalive(self):
730 """IPSEC6 NAT Keepalive"""
731 self.verify_keepalive(self.ipv6_params)
734 class TestIpsec6TunIfEspUdpGCM(TemplateIpsec6TunIfEspUdp, IpsecTun6Tests):
735 """Ipsec ESP 6 UDP GCM tests"""
737 tun6_input_node = "ipsec6-tun-input"
738 tun6_encrypt_node_name = "esp6-encrypt-tun"
739 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
742 super(TestIpsec6TunIfEspUdpGCM, self).setUp()
744 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
745 p.crypt_algo_vpp_id = (
746 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
748 p.crypt_algo = "AES-GCM"
750 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
754 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
755 """Ipsec ESP - TUN tests"""
757 tun6_encrypt_node_name = "esp6-encrypt-tun"
758 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
760 def test_tun_basic46(self):
761 """ipsec 4o6 tunnel basic test"""
762 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
763 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
765 def test_tun_burst46(self):
766 """ipsec 4o6 tunnel burst test"""
767 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
768 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
771 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp, IpsecTun6HandoffTests):
772 """Ipsec ESP 6 Handoff tests"""
774 tun6_encrypt_node_name = "esp6-encrypt-tun"
775 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
777 def test_tun_handoff_66_police(self):
778 """ESP 6o6 tunnel with policer worker hand-off test"""
779 self.vapi.cli("clear errors")
780 self.vapi.cli("clear ipsec sa")
783 p = self.params[socket.AF_INET6]
785 action_tx = PolicerAction(
786 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
788 policer = VppPolicer(
795 conform_action=action_tx,
796 exceed_action=action_tx,
797 violate_action=action_tx,
799 policer.add_vpp_config()
801 # Start policing on tun
802 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
804 for pol_bind in [1, 0]:
805 policer.bind_vpp_config(pol_bind, True)
807 # inject alternately on worker 0 and 1.
808 for worker in [0, 1, 0, 1]:
809 send_pkts = self.gen_encrypt_pkts6(
813 src=p.remote_tun_if_host,
814 dst=self.pg1.remote_ip6,
817 recv_pkts = self.send_and_expect(
818 self.tun_if, send_pkts, self.pg1, worker=worker
820 self.verify_decrypted6(p, recv_pkts)
821 self.logger.debug(self.vapi.cli("show trace max 100"))
823 stats = policer.get_stats()
824 stats0 = policer.get_stats(worker=0)
825 stats1 = policer.get_stats(worker=1)
828 # First pass: Worker 1, should have done all the policing
829 self.assertEqual(stats, stats1)
831 # Worker 0, should have handed everything off
832 self.assertEqual(stats0["conform_packets"], 0)
833 self.assertEqual(stats0["exceed_packets"], 0)
834 self.assertEqual(stats0["violate_packets"], 0)
836 # Second pass: both workers should have policed equal amounts
837 self.assertGreater(stats1["conform_packets"], 0)
838 self.assertEqual(stats1["exceed_packets"], 0)
839 self.assertGreater(stats1["violate_packets"], 0)
841 self.assertGreater(stats0["conform_packets"], 0)
842 self.assertEqual(stats0["exceed_packets"], 0)
843 self.assertGreater(stats0["violate_packets"], 0)
846 stats0["conform_packets"] + stats0["violate_packets"],
847 stats1["conform_packets"] + stats1["violate_packets"],
850 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
851 policer.remove_vpp_config()
854 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp, IpsecTun4HandoffTests):
855 """Ipsec ESP 4 Handoff tests"""
857 tun4_encrypt_node_name = "esp4-encrypt-tun"
858 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
860 def test_tun_handoff_44_police(self):
861 """ESP 4o4 tunnel with policer worker hand-off test"""
862 self.vapi.cli("clear errors")
863 self.vapi.cli("clear ipsec sa")
866 p = self.params[socket.AF_INET]
868 action_tx = PolicerAction(
869 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
871 policer = VppPolicer(
878 conform_action=action_tx,
879 exceed_action=action_tx,
880 violate_action=action_tx,
882 policer.add_vpp_config()
884 # Start policing on tun
885 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
887 for pol_bind in [1, 0]:
888 policer.bind_vpp_config(pol_bind, True)
890 # inject alternately on worker 0 and 1.
891 for worker in [0, 1, 0, 1]:
892 send_pkts = self.gen_encrypt_pkts(
896 src=p.remote_tun_if_host,
897 dst=self.pg1.remote_ip4,
900 recv_pkts = self.send_and_expect(
901 self.tun_if, send_pkts, self.pg1, worker=worker
903 self.verify_decrypted(p, recv_pkts)
904 self.logger.debug(self.vapi.cli("show trace max 100"))
906 stats = policer.get_stats()
907 stats0 = policer.get_stats(worker=0)
908 stats1 = policer.get_stats(worker=1)
911 # First pass: Worker 1, should have done all the policing
912 self.assertEqual(stats, stats1)
914 # Worker 0, should have handed everything off
915 self.assertEqual(stats0["conform_packets"], 0)
916 self.assertEqual(stats0["exceed_packets"], 0)
917 self.assertEqual(stats0["violate_packets"], 0)
919 # Second pass: both workers should have policed equal amounts
920 self.assertGreater(stats1["conform_packets"], 0)
921 self.assertEqual(stats1["exceed_packets"], 0)
922 self.assertGreater(stats1["violate_packets"], 0)
924 self.assertGreater(stats0["conform_packets"], 0)
925 self.assertEqual(stats0["exceed_packets"], 0)
926 self.assertGreater(stats0["violate_packets"], 0)
929 stats0["conform_packets"] + stats0["violate_packets"],
930 stats1["conform_packets"] + stats1["violate_packets"],
933 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
934 policer.remove_vpp_config()
937 @tag_fixme_vpp_workers
938 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
939 """IPsec IPv4 Multi Tunnel interface"""
941 encryption_type = ESP
942 tun4_encrypt_node_name = "esp4-encrypt-tun"
943 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
946 super(TestIpsec4MultiTunIfEsp, self).setUp()
948 self.tun_if = self.pg0
950 self.multi_params = []
951 self.pg0.generate_remote_hosts(10)
952 self.pg0.configure_ipv4_neighbors()
955 p = copy.copy(self.ipv4_params)
957 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
958 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
959 p.scapy_tun_spi = p.scapy_tun_spi + ii
960 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
961 p.vpp_tun_spi = p.vpp_tun_spi + ii
963 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
964 p.scapy_tra_spi = p.scapy_tra_spi + ii
965 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
966 p.vpp_tra_spi = p.vpp_tra_spi + ii
967 p.tun_dst = self.pg0.remote_hosts[ii].ip4
969 self.multi_params.append(p)
970 self.config_network(p)
971 self.config_sa_tra(p)
972 self.config_protect(p)
975 super(TestIpsec4MultiTunIfEsp, self).tearDown()
977 def test_tun_44(self):
978 """Multiple IPSEC tunnel interfaces"""
979 for p in self.multi_params:
980 self.verify_tun_44(p, count=127)
981 self.assertEqual(p.tun_if.get_rx_stats(), 127)
982 self.assertEqual(p.tun_if.get_tx_stats(), 127)
984 def test_tun_rr_44(self):
985 """Round-robin packets acrros multiple interface"""
987 for p in self.multi_params:
988 tx = tx + self.gen_encrypt_pkts(
992 src=p.remote_tun_if_host,
993 dst=self.pg1.remote_ip4,
995 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
997 for rx, p in zip(rxs, self.multi_params):
998 self.verify_decrypted(p, [rx])
1001 for p in self.multi_params:
1002 tx = tx + self.gen_pkts(
1003 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host
1005 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
1007 for rx, p in zip(rxs, self.multi_params):
1008 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
1011 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
1012 """IPsec IPv4 Tunnel interface all Algos"""
1014 encryption_type = ESP
1015 tun4_encrypt_node_name = "esp4-encrypt-tun"
1016 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1019 super(TestIpsec4TunIfEspAll, self).setUp()
1021 self.tun_if = self.pg0
1022 p = self.ipv4_params
1024 self.config_network(p)
1025 self.config_sa_tra(p)
1026 self.config_protect(p)
1029 p = self.ipv4_params
1030 self.unconfig_protect(p)
1031 self.unconfig_network(p)
1034 super(TestIpsec4TunIfEspAll, self).tearDown()
1038 # change the key and the SPI
1041 p.crypt_key = b"X" + p.crypt_key[1:]
1042 p.scapy_tun_spi += 1
1043 p.scapy_tun_sa_id += 1
1045 p.vpp_tun_sa_id += 1
1046 p.tun_if.local_spi = p.vpp_tun_spi
1047 p.tun_if.remote_spi = p.scapy_tun_spi
1049 config_tun_params(p, self.encryption_type, p.tun_if)
1051 p.tun_sa_out = VppIpsecSA(
1057 p.crypt_algo_vpp_id,
1059 self.vpp_esp_protocol,
1063 p.tun_sa_in = VppIpsecSA(
1069 p.crypt_algo_vpp_id,
1071 self.vpp_esp_protocol,
1075 p.tun_sa_in.add_vpp_config()
1076 p.tun_sa_out.add_vpp_config()
1078 self.config_protect(p)
1079 np.tun_sa_out.remove_vpp_config()
1080 np.tun_sa_in.remove_vpp_config()
1081 self.logger.info(self.vapi.cli("sh ipsec sa"))
1083 def test_tun_44(self):
1084 """IPSEC tunnel all algos"""
1086 # foreach VPP crypto engine
1087 engines = ["ia32", "ipsecmb", "openssl"]
1089 # foreach crypto algorithm
1093 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_128
1096 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1098 "scapy-crypto": "AES-GCM",
1099 "scapy-integ": "NULL",
1100 "key": b"JPjyOWBeVEQiMe7h",
1105 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_192
1108 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1110 "scapy-crypto": "AES-GCM",
1111 "scapy-integ": "NULL",
1112 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1117 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
1120 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1122 "scapy-crypto": "AES-GCM",
1123 "scapy-integ": "NULL",
1124 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1129 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
1132 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1134 "scapy-crypto": "AES-CBC",
1135 "scapy-integ": "HMAC-SHA1-96",
1137 "key": b"JPjyOWBeVEQiMe7h",
1141 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_192
1144 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256
1146 "scapy-crypto": "AES-CBC",
1147 "scapy-integ": "SHA2-512-256",
1149 "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
1153 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_256
1156 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128
1158 "scapy-crypto": "AES-CBC",
1159 "scapy-integ": "SHA2-256-128",
1161 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1165 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1168 VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
1170 "scapy-crypto": "NULL",
1171 "scapy-integ": "HMAC-SHA1-96",
1173 "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
1177 for engine in engines:
1178 self.vapi.cli("set crypto handler all %s" % engine)
1181 # loop through each of the algorithms
1184 # with self.subTest(algo=algo['scapy']):
1186 p = self.ipv4_params
1187 p.auth_algo_vpp_id = algo["vpp-integ"]
1188 p.crypt_algo_vpp_id = algo["vpp-crypto"]
1189 p.crypt_algo = algo["scapy-crypto"]
1190 p.auth_algo = algo["scapy-integ"]
1191 p.crypt_key = algo["key"]
1192 p.salt = algo["salt"]
1198 self.verify_tun_44(p, count=127)
1201 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
1202 """IPsec IPv4 Tunnel interface no Algos"""
1204 encryption_type = ESP
1205 tun4_encrypt_node_name = "esp4-encrypt-tun"
1206 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1209 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
1211 self.tun_if = self.pg0
1212 p = self.ipv4_params
1213 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
1214 p.auth_algo = "NULL"
1217 p.crypt_algo_vpp_id = (
1218 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
1220 p.crypt_algo = "NULL"
1224 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
1226 def test_tun_44(self):
1227 """IPSec SA with NULL algos"""
1228 p = self.ipv4_params
1230 self.config_network(p)
1231 self.config_sa_tra(p)
1232 self.config_protect(p)
1235 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host, count=127
1237 self.send_and_assert_no_replies(self.pg1, tx)
1239 self.unconfig_protect(p)
1241 self.unconfig_network(p)
1243 def test_tun_44_async(self):
1244 """IPSec SA with NULL algos using async crypto"""
1245 p = self.ipv4_params
1247 self.vapi.ipsec_set_async_mode(async_enable=True)
1248 self.config_network(p)
1249 self.config_sa_tra(p)
1250 self.config_protect(p)
1253 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host, count=127
1255 self.send_and_assert_no_replies(self.pg1, tx)
1257 self.unconfig_protect(p)
1259 self.unconfig_network(p)
1261 self.vapi.ipsec_set_async_mode(async_enable=False)
1264 @tag_fixme_vpp_workers
1265 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec, IpsecTun6):
1266 """IPsec IPv6 Multi Tunnel interface"""
1268 encryption_type = ESP
1269 tun6_encrypt_node_name = "esp6-encrypt-tun"
1270 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1273 super(TestIpsec6MultiTunIfEsp, self).setUp()
1275 self.tun_if = self.pg0
1277 self.multi_params = []
1278 self.pg0.generate_remote_hosts(10)
1279 self.pg0.configure_ipv6_neighbors()
1281 for ii in range(10):
1282 p = copy.copy(self.ipv6_params)
1284 p.remote_tun_if_host = "1111::%d" % (ii + 1)
1285 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1286 p.scapy_tun_spi = p.scapy_tun_spi + ii
1287 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1288 p.vpp_tun_spi = p.vpp_tun_spi + ii
1290 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1291 p.scapy_tra_spi = p.scapy_tra_spi + ii
1292 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1293 p.vpp_tra_spi = p.vpp_tra_spi + ii
1294 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1296 self.multi_params.append(p)
1297 self.config_network(p)
1298 self.config_sa_tra(p)
1299 self.config_protect(p)
1302 super(TestIpsec6MultiTunIfEsp, self).tearDown()
1304 def test_tun_66(self):
1305 """Multiple IPSEC tunnel interfaces"""
1306 for p in self.multi_params:
1307 self.verify_tun_66(p, count=127)
1308 self.assertEqual(p.tun_if.get_rx_stats(), 127)
1309 self.assertEqual(p.tun_if.get_tx_stats(), 127)
1312 class TestIpsecGreTebIfEsp(TemplateIpsec, IpsecTun4Tests):
1313 """Ipsec GRE TEB ESP - TUN tests"""
1315 tun4_encrypt_node_name = "esp4-encrypt-tun"
1316 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1317 encryption_type = ESP
1318 omac = "00:11:22:33:44:55"
1320 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1322 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1324 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1326 / Ether(dst=self.omac)
1327 / IP(src="1.1.1.1", dst="1.1.1.2")
1328 / UDP(sport=1144, dport=2233)
1329 / Raw(b"X" * payload_size)
1331 for i in range(count)
1334 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1336 Ether(dst=self.omac)
1337 / IP(src="1.1.1.1", dst="1.1.1.2")
1338 / UDP(sport=1144, dport=2233)
1339 / Raw(b"X" * payload_size)
1340 for i in range(count)
1343 def verify_decrypted(self, p, rxs):
1345 self.assert_equal(rx[Ether].dst, self.omac)
1346 self.assert_equal(rx[IP].dst, "1.1.1.2")
1348 def verify_encrypted(self, p, sa, rxs):
1351 pkt = sa.decrypt(rx[IP])
1352 if not pkt.haslayer(IP):
1353 pkt = IP(pkt[Raw].load)
1354 self.assert_packet_checksums_valid(pkt)
1355 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1356 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1357 self.assertTrue(pkt.haslayer(GRE))
1359 self.assertEqual(e[Ether].dst, self.omac)
1360 self.assertEqual(e[IP].dst, "1.1.1.2")
1361 except (IndexError, AssertionError):
1362 self.logger.debug(ppp("Unexpected packet:", rx))
1364 self.logger.debug(ppp("Decrypted packet:", pkt))
1370 super(TestIpsecGreTebIfEsp, self).setUp()
1372 self.tun_if = self.pg0
1374 p = self.ipv4_params
1376 bd1 = VppBridgeDomain(self, 1)
1377 bd1.add_vpp_config()
1379 p.tun_sa_out = VppIpsecSA(
1385 p.crypt_algo_vpp_id,
1387 self.vpp_esp_protocol,
1389 self.pg0.remote_ip4,
1391 p.tun_sa_out.add_vpp_config()
1393 p.tun_sa_in = VppIpsecSA(
1399 p.crypt_algo_vpp_id,
1401 self.vpp_esp_protocol,
1402 self.pg0.remote_ip4,
1405 p.tun_sa_in.add_vpp_config()
1407 p.tun_if = VppGreInterface(
1410 self.pg0.remote_ip4,
1411 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1413 p.tun_if.add_vpp_config()
1415 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1417 p.tun_protect.add_vpp_config()
1420 p.tun_if.config_ip4()
1421 config_tun_params(p, self.encryption_type, p.tun_if)
1423 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1424 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1426 self.vapi.cli("clear ipsec sa")
1427 self.vapi.cli("sh adj")
1428 self.vapi.cli("sh ipsec tun")
1431 p = self.ipv4_params
1432 p.tun_if.unconfig_ip4()
1433 super(TestIpsecGreTebIfEsp, self).tearDown()
1436 class TestIpsecGreTebVlanIfEsp(TemplateIpsec, IpsecTun4Tests):
1437 """Ipsec GRE TEB ESP - TUN tests"""
1439 tun4_encrypt_node_name = "esp4-encrypt-tun"
1440 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1441 encryption_type = ESP
1442 omac = "00:11:22:33:44:55"
1444 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1446 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1448 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1450 / Ether(dst=self.omac)
1451 / IP(src="1.1.1.1", dst="1.1.1.2")
1452 / UDP(sport=1144, dport=2233)
1453 / Raw(b"X" * payload_size)
1455 for i in range(count)
1458 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1460 Ether(dst=self.omac)
1462 / IP(src="1.1.1.1", dst="1.1.1.2")
1463 / UDP(sport=1144, dport=2233)
1464 / Raw(b"X" * payload_size)
1465 for i in range(count)
1468 def verify_decrypted(self, p, rxs):
1470 self.assert_equal(rx[Ether].dst, self.omac)
1471 self.assert_equal(rx[Dot1Q].vlan, 11)
1472 self.assert_equal(rx[IP].dst, "1.1.1.2")
1474 def verify_encrypted(self, p, sa, rxs):
1477 pkt = sa.decrypt(rx[IP])
1478 if not pkt.haslayer(IP):
1479 pkt = IP(pkt[Raw].load)
1480 self.assert_packet_checksums_valid(pkt)
1481 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1482 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1483 self.assertTrue(pkt.haslayer(GRE))
1485 self.assertEqual(e[Ether].dst, self.omac)
1486 self.assertFalse(e.haslayer(Dot1Q))
1487 self.assertEqual(e[IP].dst, "1.1.1.2")
1488 except (IndexError, AssertionError):
1489 self.logger.debug(ppp("Unexpected packet:", rx))
1491 self.logger.debug(ppp("Decrypted packet:", pkt))
1497 super(TestIpsecGreTebVlanIfEsp, self).setUp()
1499 self.tun_if = self.pg0
1501 p = self.ipv4_params
1503 bd1 = VppBridgeDomain(self, 1)
1504 bd1.add_vpp_config()
1506 self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1507 self.vapi.l2_interface_vlan_tag_rewrite(
1508 sw_if_index=self.pg1_11.sw_if_index,
1509 vtr_op=L2_VTR_OP.L2_POP_1,
1512 self.pg1_11.admin_up()
1514 p.tun_sa_out = VppIpsecSA(
1520 p.crypt_algo_vpp_id,
1522 self.vpp_esp_protocol,
1524 self.pg0.remote_ip4,
1526 p.tun_sa_out.add_vpp_config()
1528 p.tun_sa_in = VppIpsecSA(
1534 p.crypt_algo_vpp_id,
1536 self.vpp_esp_protocol,
1537 self.pg0.remote_ip4,
1540 p.tun_sa_in.add_vpp_config()
1542 p.tun_if = VppGreInterface(
1545 self.pg0.remote_ip4,
1546 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1548 p.tun_if.add_vpp_config()
1550 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1552 p.tun_protect.add_vpp_config()
1555 p.tun_if.config_ip4()
1556 config_tun_params(p, self.encryption_type, p.tun_if)
1558 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1559 VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1561 self.vapi.cli("clear ipsec sa")
1564 p = self.ipv4_params
1565 p.tun_if.unconfig_ip4()
1566 super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1567 self.pg1_11.admin_down()
1568 self.pg1_11.remove_vpp_config()
1571 class TestIpsecGreTebIfEspTra(TemplateIpsec, IpsecTun4Tests):
1572 """Ipsec GRE TEB ESP - Tra tests"""
1574 tun4_encrypt_node_name = "esp4-encrypt-tun"
1575 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1576 encryption_type = ESP
1577 omac = "00:11:22:33:44:55"
1579 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1581 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1583 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1585 / Ether(dst=self.omac)
1586 / IP(src="1.1.1.1", dst="1.1.1.2")
1587 / UDP(sport=1144, dport=2233)
1588 / Raw(b"X" * payload_size)
1590 for i in range(count)
1593 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1595 Ether(dst=self.omac)
1596 / IP(src="1.1.1.1", dst="1.1.1.2")
1597 / UDP(sport=1144, dport=2233)
1598 / Raw(b"X" * payload_size)
1599 for i in range(count)
1602 def verify_decrypted(self, p, rxs):
1604 self.assert_equal(rx[Ether].dst, self.omac)
1605 self.assert_equal(rx[IP].dst, "1.1.1.2")
1607 def verify_encrypted(self, p, sa, rxs):
1610 pkt = sa.decrypt(rx[IP])
1611 if not pkt.haslayer(IP):
1612 pkt = IP(pkt[Raw].load)
1613 self.assert_packet_checksums_valid(pkt)
1614 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1615 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1616 self.assertTrue(pkt.haslayer(GRE))
1618 self.assertEqual(e[Ether].dst, self.omac)
1619 self.assertEqual(e[IP].dst, "1.1.1.2")
1620 except (IndexError, AssertionError):
1621 self.logger.debug(ppp("Unexpected packet:", rx))
1623 self.logger.debug(ppp("Decrypted packet:", pkt))
1629 super(TestIpsecGreTebIfEspTra, self).setUp()
1631 self.tun_if = self.pg0
1633 p = self.ipv4_params
1635 bd1 = VppBridgeDomain(self, 1)
1636 bd1.add_vpp_config()
1638 p.tun_sa_out = VppIpsecSA(
1644 p.crypt_algo_vpp_id,
1646 self.vpp_esp_protocol,
1648 p.tun_sa_out.add_vpp_config()
1650 p.tun_sa_in = VppIpsecSA(
1656 p.crypt_algo_vpp_id,
1658 self.vpp_esp_protocol,
1660 p.tun_sa_in.add_vpp_config()
1662 p.tun_if = VppGreInterface(
1665 self.pg0.remote_ip4,
1666 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1668 p.tun_if.add_vpp_config()
1670 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1672 p.tun_protect.add_vpp_config()
1675 p.tun_if.config_ip4()
1676 config_tra_params(p, self.encryption_type, p.tun_if)
1678 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1679 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1681 self.vapi.cli("clear ipsec sa")
1684 p = self.ipv4_params
1685 p.tun_if.unconfig_ip4()
1686 super(TestIpsecGreTebIfEspTra, self).tearDown()
1689 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec, IpsecTun4Tests):
1690 """Ipsec GRE TEB UDP ESP - Tra tests"""
1692 tun4_encrypt_node_name = "esp4-encrypt-tun"
1693 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1694 encryption_type = ESP
1695 omac = "00:11:22:33:44:55"
1697 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1699 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1701 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1703 / Ether(dst=self.omac)
1704 / IP(src="1.1.1.1", dst="1.1.1.2")
1705 / UDP(sport=1144, dport=2233)
1706 / Raw(b"X" * payload_size)
1708 for i in range(count)
1711 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1713 Ether(dst=self.omac)
1714 / IP(src="1.1.1.1", dst="1.1.1.2")
1715 / UDP(sport=1144, dport=2233)
1716 / Raw(b"X" * payload_size)
1717 for i in range(count)
1720 def verify_decrypted(self, p, rxs):
1722 self.assert_equal(rx[Ether].dst, self.omac)
1723 self.assert_equal(rx[IP].dst, "1.1.1.2")
1725 def verify_encrypted(self, p, sa, rxs):
1727 self.assertTrue(rx.haslayer(UDP))
1728 self.assertEqual(rx[UDP].dport, 4545)
1729 self.assertEqual(rx[UDP].sport, 5454)
1731 pkt = sa.decrypt(rx[IP])
1732 if not pkt.haslayer(IP):
1733 pkt = IP(pkt[Raw].load)
1734 self.assert_packet_checksums_valid(pkt)
1735 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1736 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1737 self.assertTrue(pkt.haslayer(GRE))
1739 self.assertEqual(e[Ether].dst, self.omac)
1740 self.assertEqual(e[IP].dst, "1.1.1.2")
1741 except (IndexError, AssertionError):
1742 self.logger.debug(ppp("Unexpected packet:", rx))
1744 self.logger.debug(ppp("Decrypted packet:", pkt))
1750 super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1752 self.tun_if = self.pg0
1754 p = self.ipv4_params
1755 p = self.ipv4_params
1756 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
1757 p.nat_header = UDP(sport=5454, dport=4545)
1759 bd1 = VppBridgeDomain(self, 1)
1760 bd1.add_vpp_config()
1762 p.tun_sa_out = VppIpsecSA(
1768 p.crypt_algo_vpp_id,
1770 self.vpp_esp_protocol,
1775 p.tun_sa_out.add_vpp_config()
1777 p.tun_sa_in = VppIpsecSA(
1783 p.crypt_algo_vpp_id,
1785 self.vpp_esp_protocol,
1787 p.flags | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND
1792 p.tun_sa_in.add_vpp_config()
1794 p.tun_if = VppGreInterface(
1797 self.pg0.remote_ip4,
1798 type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
1800 p.tun_if.add_vpp_config()
1802 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1804 p.tun_protect.add_vpp_config()
1807 p.tun_if.config_ip4()
1808 config_tra_params(p, self.encryption_type, p.tun_if)
1810 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1811 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1813 self.vapi.cli("clear ipsec sa")
1814 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1817 p = self.ipv4_params
1818 p.tun_if.unconfig_ip4()
1819 super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1822 class TestIpsecGreIfEsp(TemplateIpsec, IpsecTun4Tests):
1823 """Ipsec GRE ESP - TUN tests"""
1825 tun4_encrypt_node_name = "esp4-encrypt-tun"
1826 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1827 encryption_type = ESP
1829 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1831 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1833 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1835 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1836 / UDP(sport=1144, dport=2233)
1837 / Raw(b"X" * payload_size)
1839 for i in range(count)
1842 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1844 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1845 / IP(src="1.1.1.1", dst="1.1.1.2")
1846 / UDP(sport=1144, dport=2233)
1847 / Raw(b"X" * payload_size)
1848 for i in range(count)
1851 def verify_decrypted(self, p, rxs):
1853 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1854 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1856 def verify_encrypted(self, p, sa, rxs):
1859 pkt = sa.decrypt(rx[IP])
1860 if not pkt.haslayer(IP):
1861 pkt = IP(pkt[Raw].load)
1862 self.assert_packet_checksums_valid(pkt)
1863 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1864 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1865 self.assertTrue(pkt.haslayer(GRE))
1867 self.assertEqual(e[IP].dst, "1.1.1.2")
1868 except (IndexError, AssertionError):
1869 self.logger.debug(ppp("Unexpected packet:", rx))
1871 self.logger.debug(ppp("Decrypted packet:", pkt))
1877 super(TestIpsecGreIfEsp, self).setUp()
1879 self.tun_if = self.pg0
1881 p = self.ipv4_params
1883 bd1 = VppBridgeDomain(self, 1)
1884 bd1.add_vpp_config()
1886 p.tun_sa_out = VppIpsecSA(
1892 p.crypt_algo_vpp_id,
1894 self.vpp_esp_protocol,
1896 self.pg0.remote_ip4,
1898 p.tun_sa_out.add_vpp_config()
1900 p.tun_sa_in = VppIpsecSA(
1906 p.crypt_algo_vpp_id,
1908 self.vpp_esp_protocol,
1909 self.pg0.remote_ip4,
1912 p.tun_sa_in.add_vpp_config()
1914 p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
1915 p.tun_if.add_vpp_config()
1917 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
1918 p.tun_protect.add_vpp_config()
1921 p.tun_if.config_ip4()
1922 config_tun_params(p, self.encryption_type, p.tun_if)
1925 self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
1929 p = self.ipv4_params
1930 p.tun_if.unconfig_ip4()
1931 super(TestIpsecGreIfEsp, self).tearDown()
1934 class TestIpsecGreIfEspTra(TemplateIpsec, IpsecTun4Tests):
1935 """Ipsec GRE ESP - TRA tests"""
1937 tun4_encrypt_node_name = "esp4-encrypt-tun"
1938 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1939 encryption_type = ESP
1941 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
1943 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1945 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1947 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
1948 / UDP(sport=1144, dport=2233)
1949 / Raw(b"X" * payload_size)
1951 for i in range(count)
1954 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1, payload_size=100):
1956 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1958 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
1960 / UDP(sport=1144, dport=2233)
1961 / Raw(b"X" * payload_size)
1963 for i in range(count)
1966 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
1968 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1969 / IP(src="1.1.1.1", dst="1.1.1.2")
1970 / UDP(sport=1144, dport=2233)
1971 / Raw(b"X" * payload_size)
1972 for i in range(count)
1975 def verify_decrypted(self, p, rxs):
1977 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1978 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1980 def verify_encrypted(self, p, sa, rxs):
1983 pkt = sa.decrypt(rx[IP])
1984 if not pkt.haslayer(IP):
1985 pkt = IP(pkt[Raw].load)
1986 self.assert_packet_checksums_valid(pkt)
1987 self.assertTrue(pkt.haslayer(GRE))
1989 self.assertEqual(e[IP].dst, "1.1.1.2")
1990 except (IndexError, AssertionError):
1991 self.logger.debug(ppp("Unexpected packet:", rx))
1993 self.logger.debug(ppp("Decrypted packet:", pkt))
1999 super(TestIpsecGreIfEspTra, self).setUp()
2001 self.tun_if = self.pg0
2003 p = self.ipv4_params
2005 p.tun_sa_out = VppIpsecSA(
2011 p.crypt_algo_vpp_id,
2013 self.vpp_esp_protocol,
2015 p.tun_sa_out.add_vpp_config()
2017 p.tun_sa_in = VppIpsecSA(
2023 p.crypt_algo_vpp_id,
2025 self.vpp_esp_protocol,
2027 p.tun_sa_in.add_vpp_config()
2029 p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
2030 p.tun_if.add_vpp_config()
2032 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2033 p.tun_protect.add_vpp_config()
2036 p.tun_if.config_ip4()
2037 config_tra_params(p, self.encryption_type, p.tun_if)
2040 self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
2044 p = self.ipv4_params
2045 p.tun_if.unconfig_ip4()
2046 super(TestIpsecGreIfEspTra, self).tearDown()
2048 def test_gre_non_ip(self):
2049 p = self.ipv4_params
2050 tx = self.gen_encrypt_non_ip_pkts(
2053 src=p.remote_tun_if_host,
2054 dst=self.pg1.remote_ip6,
2056 self.send_and_assert_no_replies(self.tun_if, tx)
2057 node_name = "/err/%s/unsup_payload" % self.tun4_decrypt_node_name[0]
2058 self.assertEqual(1, self.statistics.get_err_counter(node_name))
2059 err = p.tun_sa_in.get_err("unsup_payload")
2060 self.assertEqual(err, 1)
2063 class TestIpsecGre6IfEspTra(TemplateIpsec, IpsecTun6Tests):
2064 """Ipsec GRE ESP - TRA tests"""
2066 tun6_encrypt_node_name = "esp6-encrypt-tun"
2067 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2068 encryption_type = ESP
2070 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2072 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2074 IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
2076 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2077 / UDP(sport=1144, dport=2233)
2078 / Raw(b"X" * payload_size)
2080 for i in range(count)
2083 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2085 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2086 / IPv6(src="1::1", dst="1::2")
2087 / UDP(sport=1144, dport=2233)
2088 / Raw(b"X" * payload_size)
2089 for i in range(count)
2092 def verify_decrypted6(self, p, rxs):
2094 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2095 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2097 def verify_encrypted6(self, p, sa, rxs):
2100 pkt = sa.decrypt(rx[IPv6])
2101 if not pkt.haslayer(IPv6):
2102 pkt = IPv6(pkt[Raw].load)
2103 self.assert_packet_checksums_valid(pkt)
2104 self.assertTrue(pkt.haslayer(GRE))
2106 self.assertEqual(e[IPv6].dst, "1::2")
2107 except (IndexError, AssertionError):
2108 self.logger.debug(ppp("Unexpected packet:", rx))
2110 self.logger.debug(ppp("Decrypted packet:", pkt))
2116 super(TestIpsecGre6IfEspTra, self).setUp()
2118 self.tun_if = self.pg0
2120 p = self.ipv6_params
2122 bd1 = VppBridgeDomain(self, 1)
2123 bd1.add_vpp_config()
2125 p.tun_sa_out = VppIpsecSA(
2131 p.crypt_algo_vpp_id,
2133 self.vpp_esp_protocol,
2135 p.tun_sa_out.add_vpp_config()
2137 p.tun_sa_in = VppIpsecSA(
2143 p.crypt_algo_vpp_id,
2145 self.vpp_esp_protocol,
2147 p.tun_sa_in.add_vpp_config()
2149 p.tun_if = VppGreInterface(self, self.pg0.local_ip6, self.pg0.remote_ip6)
2150 p.tun_if.add_vpp_config()
2152 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
2153 p.tun_protect.add_vpp_config()
2156 p.tun_if.config_ip6()
2157 config_tra_params(p, self.encryption_type, p.tun_if)
2165 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
2172 p = self.ipv6_params
2173 p.tun_if.unconfig_ip6()
2174 super(TestIpsecGre6IfEspTra, self).tearDown()
2177 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
2178 """Ipsec mGRE ESP v4 TRA tests"""
2180 tun4_encrypt_node_name = "esp4-encrypt-tun"
2181 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2182 encryption_type = ESP
2184 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2186 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2188 IP(src=p.tun_dst, dst=self.pg0.local_ip4)
2190 / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
2191 / UDP(sport=1144, dport=2233)
2192 / Raw(b"X" * payload_size)
2194 for i in range(count)
2197 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2199 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2200 / IP(src="1.1.1.1", dst=dst)
2201 / UDP(sport=1144, dport=2233)
2202 / Raw(b"X" * payload_size)
2203 for i in range(count)
2206 def verify_decrypted(self, p, rxs):
2208 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2209 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2211 def verify_encrypted(self, p, sa, rxs):
2214 pkt = sa.decrypt(rx[IP])
2215 if not pkt.haslayer(IP):
2216 pkt = IP(pkt[Raw].load)
2217 self.assert_packet_checksums_valid(pkt)
2218 self.assertTrue(pkt.haslayer(GRE))
2220 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2221 except (IndexError, AssertionError):
2222 self.logger.debug(ppp("Unexpected packet:", rx))
2224 self.logger.debug(ppp("Decrypted packet:", pkt))
2230 super(TestIpsecMGreIfEspTra4, self).setUp()
2233 self.tun_if = self.pg0
2234 p = self.ipv4_params
2235 p.tun_if = VppGreInterface(
2239 mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2241 p.tun_if.add_vpp_config()
2243 p.tun_if.config_ip4()
2244 p.tun_if.generate_remote_hosts(N_NHS)
2245 self.pg0.generate_remote_hosts(N_NHS)
2246 self.pg0.configure_ipv4_neighbors()
2248 # setup some SAs for several next-hops on the interface
2249 self.multi_params = []
2251 for ii in range(N_NHS):
2252 p = copy.copy(self.ipv4_params)
2254 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2255 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2256 p.scapy_tun_spi = p.scapy_tun_spi + ii
2257 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2258 p.vpp_tun_spi = p.vpp_tun_spi + ii
2260 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2261 p.scapy_tra_spi = p.scapy_tra_spi + ii
2262 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2263 p.vpp_tra_spi = p.vpp_tra_spi + ii
2264 p.tun_sa_out = VppIpsecSA(
2270 p.crypt_algo_vpp_id,
2272 self.vpp_esp_protocol,
2274 p.tun_sa_out.add_vpp_config()
2276 p.tun_sa_in = VppIpsecSA(
2282 p.crypt_algo_vpp_id,
2284 self.vpp_esp_protocol,
2286 p.tun_sa_in.add_vpp_config()
2288 p.tun_protect = VppIpsecTunProtect(
2293 nh=p.tun_if.remote_hosts[ii].ip4,
2295 p.tun_protect.add_vpp_config()
2296 config_tra_params(p, self.encryption_type, p.tun_if)
2297 self.multi_params.append(p)
2301 p.remote_tun_if_host,
2303 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
2306 # in this v4 variant add the teibs after the protect
2310 p.tun_if.remote_hosts[ii].ip4,
2311 self.pg0.remote_hosts[ii].ip4,
2313 p.tun_dst = self.pg0.remote_hosts[ii].ip4
2314 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2317 p = self.ipv4_params
2318 p.tun_if.unconfig_ip4()
2319 super(TestIpsecMGreIfEspTra4, self).tearDown()
2321 def test_tun_44(self):
2324 for p in self.multi_params:
2325 self.verify_tun_44(p, count=N_PKTS)
2326 p.teib.remove_vpp_config()
2327 self.verify_tun_dropped_44(p, count=N_PKTS)
2328 p.teib.add_vpp_config()
2329 self.verify_tun_44(p, count=N_PKTS)
2332 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
2333 """Ipsec mGRE ESP v6 TRA tests"""
2335 tun6_encrypt_node_name = "esp6-encrypt-tun"
2336 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2337 encryption_type = ESP
2339 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2341 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2343 IPv6(src=p.tun_dst, dst=self.pg0.local_ip6)
2345 / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
2346 / UDP(sport=1144, dport=2233)
2347 / Raw(b"X" * payload_size)
2349 for i in range(count)
2352 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2354 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2355 / IPv6(src="1::1", dst=dst)
2356 / UDP(sport=1144, dport=2233)
2357 / Raw(b"X" * payload_size)
2358 for i in range(count)
2361 def verify_decrypted6(self, p, rxs):
2363 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2364 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2366 def verify_encrypted6(self, p, sa, rxs):
2369 pkt = sa.decrypt(rx[IPv6])
2370 if not pkt.haslayer(IPv6):
2371 pkt = IPv6(pkt[Raw].load)
2372 self.assert_packet_checksums_valid(pkt)
2373 self.assertTrue(pkt.haslayer(GRE))
2375 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
2376 except (IndexError, AssertionError):
2377 self.logger.debug(ppp("Unexpected packet:", rx))
2379 self.logger.debug(ppp("Decrypted packet:", pkt))
2385 super(TestIpsecMGreIfEspTra6, self).setUp()
2387 self.vapi.cli("set logging class ipsec level debug")
2390 self.tun_if = self.pg0
2391 p = self.ipv6_params
2392 p.tun_if = VppGreInterface(
2396 mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
2398 p.tun_if.add_vpp_config()
2400 p.tun_if.config_ip6()
2401 p.tun_if.generate_remote_hosts(N_NHS)
2402 self.pg0.generate_remote_hosts(N_NHS)
2403 self.pg0.configure_ipv6_neighbors()
2405 # setup some SAs for several next-hops on the interface
2406 self.multi_params = []
2408 for ii in range(N_NHS):
2409 p = copy.copy(self.ipv6_params)
2411 p.remote_tun_if_host = "1::%d" % (ii + 1)
2412 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2413 p.scapy_tun_spi = p.scapy_tun_spi + ii
2414 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2415 p.vpp_tun_spi = p.vpp_tun_spi + ii
2417 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2418 p.scapy_tra_spi = p.scapy_tra_spi + ii
2419 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2420 p.vpp_tra_spi = p.vpp_tra_spi + ii
2421 p.tun_sa_out = VppIpsecSA(
2427 p.crypt_algo_vpp_id,
2429 self.vpp_esp_protocol,
2431 p.tun_sa_out.add_vpp_config()
2433 p.tun_sa_in = VppIpsecSA(
2439 p.crypt_algo_vpp_id,
2441 self.vpp_esp_protocol,
2443 p.tun_sa_in.add_vpp_config()
2445 # in this v6 variant add the teibs first then the protection
2446 p.tun_dst = self.pg0.remote_hosts[ii].ip6
2448 self, p.tun_if, p.tun_if.remote_hosts[ii].ip6, p.tun_dst
2451 p.tun_protect = VppIpsecTunProtect(
2456 nh=p.tun_if.remote_hosts[ii].ip6,
2458 p.tun_protect.add_vpp_config()
2459 config_tra_params(p, self.encryption_type, p.tun_if)
2460 self.multi_params.append(p)
2464 p.remote_tun_if_host,
2466 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6, p.tun_if.sw_if_index)],
2468 p.tun_dst = self.pg0.remote_hosts[ii].ip6
2470 self.logger.info(self.vapi.cli("sh log"))
2471 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
2472 self.logger.info(self.vapi.cli("sh adj 41"))
2475 p = self.ipv6_params
2476 p.tun_if.unconfig_ip6()
2477 super(TestIpsecMGreIfEspTra6, self).tearDown()
2479 def test_tun_66(self):
2481 for p in self.multi_params:
2482 self.verify_tun_66(p, count=63)
2485 @tag_fixme_vpp_workers
2486 class TestIpsec4TunProtect(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2487 """IPsec IPv4 Tunnel protect - transport mode"""
2490 super(TestIpsec4TunProtect, self).setUp()
2492 self.tun_if = self.pg0
2495 super(TestIpsec4TunProtect, self).tearDown()
2497 def test_tun_44(self):
2498 """IPSEC tunnel protect"""
2500 p = self.ipv4_params
2502 self.config_network(p)
2503 self.config_sa_tra(p)
2504 self.config_protect(p)
2506 self.verify_tun_44(p, count=127)
2507 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2508 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2510 self.vapi.cli("clear ipsec sa")
2511 self.verify_tun_64(p, count=127)
2512 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2513 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2515 # rekey - create new SAs and update the tunnel protection
2517 np.crypt_key = b"X" + p.crypt_key[1:]
2518 np.scapy_tun_spi += 100
2519 np.scapy_tun_sa_id += 1
2520 np.vpp_tun_spi += 100
2521 np.vpp_tun_sa_id += 1
2522 np.tun_if.local_spi = p.vpp_tun_spi
2523 np.tun_if.remote_spi = p.scapy_tun_spi
2525 self.config_sa_tra(np)
2526 self.config_protect(np)
2529 self.verify_tun_44(np, count=127)
2530 self.assertEqual(p.tun_if.get_rx_stats(), 381)
2531 self.assertEqual(p.tun_if.get_tx_stats(), 381)
2534 self.unconfig_protect(np)
2535 self.unconfig_sa(np)
2536 self.unconfig_network(p)
2539 @tag_fixme_vpp_workers
2540 class TestIpsec4TunProtectTfc(TemplateIpsec4TunTfc, TestIpsec4TunProtect):
2541 """IPsec IPv4 Tunnel protect with TFC - transport mode"""
2544 @tag_fixme_vpp_workers
2545 class TestIpsec4TunProtectUdp(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2546 """IPsec IPv4 UDP Tunnel protect - transport mode"""
2549 super(TestIpsec4TunProtectUdp, self).setUp()
2551 self.tun_if = self.pg0
2553 p = self.ipv4_params
2554 p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
2555 p.nat_header = UDP(sport=4500, dport=4500)
2556 self.config_network(p)
2557 self.config_sa_tra(p)
2558 self.config_protect(p)
2561 p = self.ipv4_params
2562 self.unconfig_protect(p)
2564 self.unconfig_network(p)
2565 super(TestIpsec4TunProtectUdp, self).tearDown()
2567 def verify_encrypted(self, p, sa, rxs):
2568 # ensure encrypted packets are recieved with the default UDP ports
2570 self.assertEqual(rx[UDP].sport, 4500)
2571 self.assertEqual(rx[UDP].dport, 4500)
2572 super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2574 def test_tun_44(self):
2575 """IPSEC UDP tunnel protect"""
2577 p = self.ipv4_params
2579 self.verify_tun_44(p, count=127)
2580 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2581 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2583 def test_keepalive(self):
2584 """IPSEC NAT Keepalive"""
2585 self.verify_keepalive(self.ipv4_params)
2588 @tag_fixme_vpp_workers
2589 class TestIpsec4TunProtectUdpTfc(TemplateIpsec4TunTfc, TestIpsec4TunProtectUdp):
2590 """IPsec IPv4 UDP Tunnel protect with TFC - transport mode"""
2593 @tag_fixme_vpp_workers
2594 class TestIpsec4TunProtectTun(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2595 """IPsec IPv4 Tunnel protect - tunnel mode"""
2597 encryption_type = ESP
2598 tun4_encrypt_node_name = "esp4-encrypt-tun"
2599 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2602 super(TestIpsec4TunProtectTun, self).setUp()
2604 self.tun_if = self.pg0
2607 super(TestIpsec4TunProtectTun, self).tearDown()
2609 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2611 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2613 IP(src=sw_intf.remote_ip4, dst=sw_intf.local_ip4)
2614 / IP(src=src, dst=dst)
2615 / UDP(sport=1144, dport=2233)
2616 / Raw(b"X" * payload_size)
2618 for i in range(count)
2621 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
2623 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2624 / IP(src=src, dst=dst)
2625 / UDP(sport=1144, dport=2233)
2626 / Raw(b"X" * payload_size)
2627 for i in range(count)
2630 def verify_decrypted(self, p, rxs):
2632 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2633 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2634 self.assert_packet_checksums_valid(rx)
2636 def verify_encrypted(self, p, sa, rxs):
2639 pkt = sa.decrypt(rx[IP])
2640 if not pkt.haslayer(IP):
2641 pkt = IP(pkt[Raw].load)
2642 self.assert_packet_checksums_valid(pkt)
2643 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2644 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2645 inner = pkt[IP].payload
2646 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2648 except (IndexError, AssertionError):
2649 self.logger.debug(ppp("Unexpected packet:", rx))
2651 self.logger.debug(ppp("Decrypted packet:", pkt))
2656 def test_tun_44(self):
2657 """IPSEC tunnel protect"""
2659 p = self.ipv4_params
2661 self.config_network(p)
2662 self.config_sa_tun(p)
2663 self.config_protect(p)
2665 # also add an output features on the tunnel and physical interface
2666 # so we test they still work
2667 r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
2668 a = VppAcl(self, [r_all]).add_vpp_config()
2670 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2671 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2673 self.verify_tun_44(p, count=127)
2675 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2676 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2678 # rekey - create new SAs and update the tunnel protection
2680 np.crypt_key = b"X" + p.crypt_key[1:]
2681 np.scapy_tun_spi += 100
2682 np.scapy_tun_sa_id += 1
2683 np.vpp_tun_spi += 100
2684 np.vpp_tun_sa_id += 1
2685 np.tun_if.local_spi = p.vpp_tun_spi
2686 np.tun_if.remote_spi = p.scapy_tun_spi
2688 self.config_sa_tun(np)
2689 self.config_protect(np)
2692 self.verify_tun_44(np, count=127)
2693 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2694 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2697 self.unconfig_protect(np)
2698 self.unconfig_sa(np)
2699 self.unconfig_network(p)
2702 class TestIpsec4TunProtectTunDrop(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
2703 """IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2705 encryption_type = ESP
2706 tun4_encrypt_node_name = "esp4-encrypt-tun"
2707 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2710 super(TestIpsec4TunProtectTunDrop, self).setUp()
2712 self.tun_if = self.pg0
2715 super(TestIpsec4TunProtectTunDrop, self).tearDown()
2717 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2719 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2721 IP(src=sw_intf.remote_ip4, dst="5.5.5.5")
2722 / IP(src=src, dst=dst)
2723 / UDP(sport=1144, dport=2233)
2724 / Raw(b"X" * payload_size)
2726 for i in range(count)
2729 def test_tun_drop_44(self):
2730 """IPSEC tunnel protect bogus tunnel header"""
2732 p = self.ipv4_params
2734 self.config_network(p)
2735 self.config_sa_tun(p)
2736 self.config_protect(p)
2738 tx = self.gen_encrypt_pkts(
2742 src=p.remote_tun_if_host,
2743 dst=self.pg1.remote_ip4,
2746 self.send_and_assert_no_replies(self.tun_if, tx)
2749 self.unconfig_protect(p)
2751 self.unconfig_network(p)
2754 @tag_fixme_vpp_workers
2755 class TestIpsec6TunProtect(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2756 """IPsec IPv6 Tunnel protect - transport mode"""
2758 encryption_type = ESP
2759 tun6_encrypt_node_name = "esp6-encrypt-tun"
2760 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2763 super(TestIpsec6TunProtect, self).setUp()
2765 self.tun_if = self.pg0
2768 super(TestIpsec6TunProtect, self).tearDown()
2770 def test_tun_66(self):
2771 """IPSEC tunnel protect 6o6"""
2773 p = self.ipv6_params
2775 self.config_network(p)
2776 self.config_sa_tra(p)
2777 self.config_protect(p)
2779 self.verify_tun_66(p, count=127)
2780 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2781 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2783 # rekey - create new SAs and update the tunnel protection
2785 np.crypt_key = b"X" + p.crypt_key[1:]
2786 np.scapy_tun_spi += 100
2787 np.scapy_tun_sa_id += 1
2788 np.vpp_tun_spi += 100
2789 np.vpp_tun_sa_id += 1
2790 np.tun_if.local_spi = p.vpp_tun_spi
2791 np.tun_if.remote_spi = p.scapy_tun_spi
2793 self.config_sa_tra(np)
2794 self.config_protect(np)
2797 self.verify_tun_66(np, count=127)
2798 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2799 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2801 # bounce the interface state
2802 p.tun_if.admin_down()
2803 self.verify_drop_tun_66(np, count=127)
2804 node = "/err/ipsec6-tun-input/disabled"
2805 self.assertEqual(127, self.statistics.get_err_counter(node))
2807 self.verify_tun_66(np, count=127)
2810 # 1) add two input SAs [old, new]
2811 # 2) swap output SA to [new]
2812 # 3) use only [new] input SA
2814 np3.crypt_key = b"Z" + p.crypt_key[1:]
2815 np3.scapy_tun_spi += 100
2816 np3.scapy_tun_sa_id += 1
2817 np3.vpp_tun_spi += 100
2818 np3.vpp_tun_sa_id += 1
2819 np3.tun_if.local_spi = p.vpp_tun_spi
2820 np3.tun_if.remote_spi = p.scapy_tun_spi
2822 self.config_sa_tra(np3)
2825 p.tun_protect.update_vpp_config(np.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2826 self.verify_tun_66(np, np, count=127)
2827 self.verify_tun_66(np3, np, count=127)
2830 p.tun_protect.update_vpp_config(np3.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
2831 self.verify_tun_66(np, np3, count=127)
2832 self.verify_tun_66(np3, np3, count=127)
2835 p.tun_protect.update_vpp_config(np3.tun_sa_out, [np3.tun_sa_in])
2836 self.verify_tun_66(np3, np3, count=127)
2837 self.verify_drop_tun_rx_66(np, count=127)
2839 self.assertEqual(p.tun_if.get_rx_stats(), 127 * 9)
2840 self.assertEqual(p.tun_if.get_tx_stats(), 127 * 8)
2841 self.unconfig_sa(np)
2844 self.unconfig_protect(np3)
2845 self.unconfig_sa(np3)
2846 self.unconfig_network(p)
2848 def test_tun_46(self):
2849 """IPSEC tunnel protect 4o6"""
2851 p = self.ipv6_params
2853 self.config_network(p)
2854 self.config_sa_tra(p)
2855 self.config_protect(p)
2857 self.verify_tun_46(p, count=127)
2858 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2859 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2862 self.unconfig_protect(p)
2864 self.unconfig_network(p)
2867 @tag_fixme_vpp_workers
2868 class TestIpsec6TunProtectTfc(TemplateIpsec6TunTfc, TestIpsec6TunProtect):
2869 """IPsec IPv6 Tunnel protect with TFC - transport mode"""
2872 @tag_fixme_vpp_workers
2873 class TestIpsec6TunProtectTun(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2874 """IPsec IPv6 Tunnel protect - tunnel mode"""
2876 encryption_type = ESP
2877 tun6_encrypt_node_name = "esp6-encrypt-tun"
2878 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2881 super(TestIpsec6TunProtectTun, self).setUp()
2883 self.tun_if = self.pg0
2886 super(TestIpsec6TunProtectTun, self).tearDown()
2888 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2890 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2892 IPv6(src=sw_intf.remote_ip6, dst=sw_intf.local_ip6)
2893 / IPv6(src=src, dst=dst)
2894 / UDP(sport=1166, dport=2233)
2895 / Raw(b"X" * payload_size)
2897 for i in range(count)
2900 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
2902 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2903 / IPv6(src=src, dst=dst)
2904 / UDP(sport=1166, dport=2233)
2905 / Raw(b"X" * payload_size)
2906 for i in range(count)
2909 def verify_decrypted6(self, p, rxs):
2911 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2912 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2913 self.assert_packet_checksums_valid(rx)
2915 def verify_encrypted6(self, p, sa, rxs):
2918 pkt = sa.decrypt(rx[IPv6])
2919 if not pkt.haslayer(IPv6):
2920 pkt = IPv6(pkt[Raw].load)
2921 self.assert_packet_checksums_valid(pkt)
2922 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2923 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2924 inner = pkt[IPv6].payload
2925 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2927 except (IndexError, AssertionError):
2928 self.logger.debug(ppp("Unexpected packet:", rx))
2930 self.logger.debug(ppp("Decrypted packet:", pkt))
2935 def test_tun_66(self):
2936 """IPSEC tunnel protect"""
2938 p = self.ipv6_params
2940 self.config_network(p)
2941 self.config_sa_tun(p)
2942 self.config_protect(p)
2944 self.verify_tun_66(p, count=127)
2946 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2947 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2949 # rekey - create new SAs and update the tunnel protection
2951 np.crypt_key = b"X" + p.crypt_key[1:]
2952 np.scapy_tun_spi += 100
2953 np.scapy_tun_sa_id += 1
2954 np.vpp_tun_spi += 100
2955 np.vpp_tun_sa_id += 1
2956 np.tun_if.local_spi = p.vpp_tun_spi
2957 np.tun_if.remote_spi = p.scapy_tun_spi
2959 self.config_sa_tun(np)
2960 self.config_protect(np)
2963 self.verify_tun_66(np, count=127)
2964 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2965 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2968 self.unconfig_protect(np)
2969 self.unconfig_sa(np)
2970 self.unconfig_network(p)
2973 class TestIpsec6TunProtectTunDrop(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
2974 """IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2976 encryption_type = ESP
2977 tun6_encrypt_node_name = "esp6-encrypt-tun"
2978 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2981 super(TestIpsec6TunProtectTunDrop, self).setUp()
2983 self.tun_if = self.pg0
2986 super(TestIpsec6TunProtectTunDrop, self).tearDown()
2988 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
2989 # the IP destination of the revelaed packet does not match
2990 # that assigned to the tunnel
2992 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
2994 IPv6(src=sw_intf.remote_ip6, dst="5::5")
2995 / IPv6(src=src, dst=dst)
2996 / UDP(sport=1144, dport=2233)
2997 / Raw(b"X" * payload_size)
2999 for i in range(count)
3002 def test_tun_drop_66(self):
3003 """IPSEC 6 tunnel protect bogus tunnel header"""
3005 p = self.ipv6_params
3007 self.config_network(p)
3008 self.config_sa_tun(p)
3009 self.config_protect(p)
3011 tx = self.gen_encrypt_pkts6(
3015 src=p.remote_tun_if_host,
3016 dst=self.pg1.remote_ip6,
3019 self.send_and_assert_no_replies(self.tun_if, tx)
3021 self.unconfig_protect(p)
3023 self.unconfig_network(p)
3026 class TemplateIpsecItf4(object):
3027 """IPsec Interface IPv4"""
3029 encryption_type = ESP
3030 tun4_encrypt_node_name = "esp4-encrypt-tun"
3031 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3032 tun4_input_node = "ipsec4-tun-input"
3034 def config_sa_tun(self, p, src, dst):
3035 config_tun_params(p, self.encryption_type, None, src, dst)
3037 p.tun_sa_out = VppIpsecSA(
3043 p.crypt_algo_vpp_id,
3045 self.vpp_esp_protocol,
3050 p.tun_sa_out.add_vpp_config()
3052 p.tun_sa_in = VppIpsecSA(
3058 p.crypt_algo_vpp_id,
3060 self.vpp_esp_protocol,
3064 | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
3066 p.tun_sa_in.add_vpp_config()
3068 def config_protect(self, p):
3069 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
3070 p.tun_protect.add_vpp_config()
3072 def config_network(self, p, instance=0xFFFFFFFF):
3073 p.tun_if = VppIpsecInterface(self, instance=instance)
3075 p.tun_if.add_vpp_config()
3077 p.tun_if.config_ip4()
3078 p.tun_if.config_ip6()
3080 p.route = VppIpRoute(
3082 p.remote_tun_if_host,
3084 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3086 p.route.add_vpp_config()
3089 p.remote_tun_if_host6,
3093 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3099 def unconfig_network(self, p):
3100 p.route.remove_vpp_config()
3101 p.tun_if.remove_vpp_config()
3103 def unconfig_protect(self, p):
3104 p.tun_protect.remove_vpp_config()
3106 def unconfig_sa(self, p):
3107 p.tun_sa_out.remove_vpp_config()
3108 p.tun_sa_in.remove_vpp_config()
3111 @tag_fixme_vpp_workers
3112 class TestIpsecItf4(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3113 """IPsec Interface IPv4"""
3116 super(TestIpsecItf4, self).setUp()
3118 self.tun_if = self.pg0
3121 super(TestIpsecItf4, self).tearDown()
3123 def test_tun_instance_44(self):
3124 p = self.ipv4_params
3125 self.config_network(p, instance=3)
3127 with self.assertRaises(CliFailedCommandError):
3128 self.vapi.cli("show interface ipsec0")
3130 output = self.vapi.cli("show interface ipsec3")
3131 self.assertTrue("unknown" not in output)
3133 self.unconfig_network(p)
3135 def test_tun_44(self):
3136 """IPSEC interface IPv4"""
3139 p = self.ipv4_params
3141 self.config_network(p)
3143 p, self.encryption_type, None, self.pg0.local_ip4, self.pg0.remote_ip4
3145 self.verify_tun_dropped_44(p, count=n_pkts)
3146 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3147 self.config_protect(p)
3149 self.verify_tun_44(p, count=n_pkts)
3150 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3151 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3153 p.tun_if.admin_down()
3154 self.verify_tun_dropped_44(p, count=n_pkts)
3156 self.verify_tun_44(p, count=n_pkts)
3158 self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3159 self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3161 # it's a v6 packet when its encrypted
3162 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
3164 self.verify_tun_64(p, count=n_pkts)
3165 self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3166 self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3168 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
3170 # update the SA tunnel
3172 p, self.encryption_type, None, self.pg2.local_ip4, self.pg2.remote_ip4
3174 p.tun_sa_in.update_vpp_config(
3175 is_tun=True, tun_src=self.pg2.remote_ip4, tun_dst=self.pg2.local_ip4
3177 p.tun_sa_out.update_vpp_config(
3178 is_tun=True, tun_src=self.pg2.local_ip4, tun_dst=self.pg2.remote_ip4
3180 self.verify_tun_44(p, count=n_pkts)
3181 self.assertEqual(p.tun_if.get_rx_stats(), 5 * n_pkts)
3182 self.assertEqual(p.tun_if.get_tx_stats(), 4 * n_pkts)
3184 self.vapi.cli("clear interfaces")
3186 # rekey - create new SAs and update the tunnel protection
3188 np.crypt_key = b"X" + p.crypt_key[1:]
3189 np.scapy_tun_spi += 100
3190 np.scapy_tun_sa_id += 1
3191 np.vpp_tun_spi += 100
3192 np.vpp_tun_sa_id += 1
3193 np.tun_if.local_spi = p.vpp_tun_spi
3194 np.tun_if.remote_spi = p.scapy_tun_spi
3196 self.config_sa_tun(np, self.pg0.local_ip4, self.pg0.remote_ip4)
3197 self.config_protect(np)
3200 self.verify_tun_44(np, count=n_pkts)
3201 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3202 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3205 self.unconfig_protect(np)
3206 self.unconfig_sa(np)
3207 self.unconfig_network(p)
3209 def test_tun_44_null(self):
3210 """IPSEC interface IPv4 NULL auth/crypto"""
3213 p = copy.copy(self.ipv4_params)
3215 p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
3216 p.crypt_algo_vpp_id = (
3217 VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
3219 p.crypt_algo = "NULL"
3220 p.auth_algo = "NULL"
3222 self.config_network(p)
3223 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3224 self.config_protect(p)
3226 self.logger.info(self.vapi.cli("sh ipsec sa"))
3227 self.verify_tun_44(p, count=n_pkts)
3230 self.unconfig_protect(p)
3232 self.unconfig_network(p)
3234 def test_tun_44_police(self):
3235 """IPSEC interface IPv4 with input policer"""
3237 p = self.ipv4_params
3239 self.config_network(p)
3240 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3241 self.config_protect(p)
3243 action_tx = PolicerAction(
3244 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3246 policer = VppPolicer(
3253 conform_action=action_tx,
3254 exceed_action=action_tx,
3255 violate_action=action_tx,
3257 policer.add_vpp_config()
3259 # Start policing on tun
3260 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3262 self.verify_tun_44(p, count=n_pkts)
3263 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3264 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3266 stats = policer.get_stats()
3268 # Single rate, 2 colour policer - expect conform, violate but no exceed
3269 self.assertGreater(stats["conform_packets"], 0)
3270 self.assertEqual(stats["exceed_packets"], 0)
3271 self.assertGreater(stats["violate_packets"], 0)
3273 # Stop policing on tun
3274 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3275 self.verify_tun_44(p, count=n_pkts)
3277 # No new policer stats
3278 statsnew = policer.get_stats()
3279 self.assertEqual(stats, statsnew)
3282 policer.remove_vpp_config()
3283 self.unconfig_protect(p)
3285 self.unconfig_network(p)
3288 @tag_fixme_vpp_workers
3289 class TestIpsecItf4Tfc(TemplateIpsec4TunTfc, TestIpsecItf4):
3290 """IPsec Interface IPv4 with TFC"""
3293 class TestIpsecItf4MPLS(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
3294 """IPsec Interface MPLSoIPv4"""
3296 tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
3299 super(TestIpsecItf4MPLS, self).setUp()
3301 self.tun_if = self.pg0
3304 super(TestIpsecItf4MPLS, self).tearDown()
3306 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3308 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3310 MPLS(label=44, ttl=3)
3311 / IP(src=src, dst=dst)
3312 / UDP(sport=1166, dport=2233)
3313 / Raw(b"X" * payload_size)
3315 for i in range(count)
3318 def verify_encrypted(self, p, sa, rxs):
3321 pkt = sa.decrypt(rx[IP])
3322 if not pkt.haslayer(IP):
3323 pkt = IP(pkt[Raw].load)
3324 self.assert_packet_checksums_valid(pkt)
3325 self.assert_equal(pkt[MPLS].label, 44)
3326 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
3327 except (IndexError, AssertionError):
3328 self.logger.debug(ppp("Unexpected packet:", rx))
3330 self.logger.debug(ppp("Decrypted packet:", pkt))
3335 def test_tun_mpls_o_ip4(self):
3336 """IPSEC interface MPLS over IPv4"""
3339 p = self.ipv4_params
3342 tbl = VppMplsTable(self, 0)
3343 tbl.add_vpp_config()
3345 self.config_network(p)
3346 # deag MPLS routes from the tunnel
3348 self, 44, 1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
3353 p.tun_if.remote_ip4, p.tun_if.sw_if_index, labels=[VppMplsLabel(44)]
3357 p.tun_if.enable_mpls()
3359 self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
3360 self.config_protect(p)
3362 self.verify_tun_44(p, count=n_pkts)
3365 p.tun_if.disable_mpls()
3366 self.unconfig_protect(p)
3368 self.unconfig_network(p)
3371 class TemplateIpsecItf6(object):
3372 """IPsec Interface IPv6"""
3374 encryption_type = ESP
3375 tun6_encrypt_node_name = "esp6-encrypt-tun"
3376 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
3377 tun6_input_node = "ipsec6-tun-input"
3379 def config_sa_tun(self, p, src, dst):
3380 config_tun_params(p, self.encryption_type, None, src, dst)
3382 if not hasattr(p, "tun_flags"):
3384 if not hasattr(p, "hop_limit"):
3387 p.tun_sa_out = VppIpsecSA(
3393 p.crypt_algo_vpp_id,
3395 self.vpp_esp_protocol,
3399 tun_flags=p.tun_flags,
3400 hop_limit=p.hop_limit,
3402 p.tun_sa_out.add_vpp_config()
3404 p.tun_sa_in = VppIpsecSA(
3410 p.crypt_algo_vpp_id,
3412 self.vpp_esp_protocol,
3417 p.tun_sa_in.add_vpp_config()
3419 def config_protect(self, p):
3420 p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
3421 p.tun_protect.add_vpp_config()
3423 def config_network(self, p):
3424 p.tun_if = VppIpsecInterface(self)
3426 p.tun_if.add_vpp_config()
3428 p.tun_if.config_ip4()
3429 p.tun_if.config_ip6()
3433 p.remote_tun_if_host4,
3435 [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
3439 p.route = VppIpRoute(
3441 p.remote_tun_if_host,
3445 p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
3449 p.route.add_vpp_config()
3451 def unconfig_network(self, p):
3452 p.route.remove_vpp_config()
3453 p.tun_if.remove_vpp_config()
3455 def unconfig_protect(self, p):
3456 p.tun_protect.remove_vpp_config()
3458 def unconfig_sa(self, p):
3459 p.tun_sa_out.remove_vpp_config()
3460 p.tun_sa_in.remove_vpp_config()
3463 @tag_fixme_vpp_workers
3464 class TestIpsecItf6(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3465 """IPsec Interface IPv6"""
3468 super(TestIpsecItf6, self).setUp()
3470 self.tun_if = self.pg0
3473 super(TestIpsecItf6, self).tearDown()
3475 def test_tun_66(self):
3476 """IPSEC interface IPv6"""
3478 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3480 p = self.ipv6_params
3481 p.inner_hop_limit = 24
3482 p.outer_hop_limit = 23
3483 p.outer_flow_label = 243224
3484 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3486 self.config_network(p)
3488 p, self.encryption_type, None, self.pg0.local_ip6, self.pg0.remote_ip6
3490 self.verify_drop_tun_66(p, count=n_pkts)
3491 self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3492 self.config_protect(p)
3494 self.verify_tun_66(p, count=n_pkts)
3495 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3496 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3498 p.tun_if.admin_down()
3499 self.verify_drop_tun_66(p, count=n_pkts)
3501 self.verify_tun_66(p, count=n_pkts)
3503 self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
3504 self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
3506 # it's a v4 packet when its encrypted
3507 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
3509 self.verify_tun_46(p, count=n_pkts)
3510 self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
3511 self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
3513 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
3515 self.vapi.cli("clear interfaces")
3517 # rekey - create new SAs and update the tunnel protection
3519 np.crypt_key = b"X" + p.crypt_key[1:]
3520 np.scapy_tun_spi += 100
3521 np.scapy_tun_sa_id += 1
3522 np.vpp_tun_spi += 100
3523 np.vpp_tun_sa_id += 1
3524 np.tun_if.local_spi = p.vpp_tun_spi
3525 np.tun_if.remote_spi = p.scapy_tun_spi
3526 np.inner_hop_limit = 24
3527 np.outer_hop_limit = 128
3528 np.inner_flow_label = 0xABCDE
3529 np.outer_flow_label = 0xABCDE
3531 np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
3533 self.config_sa_tun(np, self.pg0.local_ip6, self.pg0.remote_ip6)
3534 self.config_protect(np)
3537 self.verify_tun_66(np, count=n_pkts)
3538 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3539 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3542 self.unconfig_protect(np)
3543 self.unconfig_sa(np)
3544 self.unconfig_network(p)
3546 def test_tun_66_police(self):
3547 """IPSEC interface IPv6 with input policer"""
3548 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3550 p = self.ipv6_params
3551 p.inner_hop_limit = 24
3552 p.outer_hop_limit = 23
3553 p.outer_flow_label = 243224
3554 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3556 self.config_network(p)
3557 self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3558 self.config_protect(p)
3560 action_tx = PolicerAction(
3561 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
3563 policer = VppPolicer(
3570 conform_action=action_tx,
3571 exceed_action=action_tx,
3572 violate_action=action_tx,
3574 policer.add_vpp_config()
3576 # Start policing on tun
3577 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
3579 self.verify_tun_66(p, count=n_pkts)
3580 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3581 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3583 stats = policer.get_stats()
3585 # Single rate, 2 colour policer - expect conform, violate but no exceed
3586 self.assertGreater(stats["conform_packets"], 0)
3587 self.assertEqual(stats["exceed_packets"], 0)
3588 self.assertGreater(stats["violate_packets"], 0)
3590 # Stop policing on tun
3591 policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
3592 self.verify_tun_66(p, count=n_pkts)
3594 # No new policer stats
3595 statsnew = policer.get_stats()
3596 self.assertEqual(stats, statsnew)
3599 policer.remove_vpp_config()
3600 self.unconfig_protect(p)
3602 self.unconfig_network(p)
3605 @tag_fixme_vpp_workers
3606 class TestIpsecItf6Tfc(TemplateIpsec6TunTfc, TestIpsecItf6):
3607 """IPsec Interface IPv6 with TFC"""
3610 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3611 """Ipsec P2MP ESP v4 tests"""
3613 tun4_encrypt_node_name = "esp4-encrypt-tun"
3614 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3615 encryption_type = ESP
3617 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3619 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3621 IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
3622 / UDP(sport=1144, dport=2233)
3623 / Raw(b"X" * payload_size)
3625 for i in range(count)
3628 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
3630 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3631 / IP(src="1.1.1.1", dst=dst)
3632 / UDP(sport=1144, dport=2233)
3633 / Raw(b"X" * payload_size)
3634 for i in range(count)
3637 def verify_decrypted(self, p, rxs):
3639 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3640 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3642 def verify_encrypted(self, p, sa, rxs):
3646 rx[IP].tos, VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2
3648 self.assertEqual(rx[IP].ttl, p.hop_limit)
3649 pkt = sa.decrypt(rx[IP])
3650 if not pkt.haslayer(IP):
3651 pkt = IP(pkt[Raw].load)
3652 self.assert_packet_checksums_valid(pkt)
3654 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3655 except (IndexError, AssertionError):
3656 self.logger.debug(ppp("Unexpected packet:", rx))
3658 self.logger.debug(ppp("Decrypted packet:", pkt))
3664 super(TestIpsecMIfEsp4, self).setUp()
3667 self.tun_if = self.pg0
3668 p = self.ipv4_params
3669 p.tun_if = VppIpsecInterface(
3670 self, mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP)
3672 p.tun_if.add_vpp_config()
3674 p.tun_if.config_ip4()
3675 p.tun_if.unconfig_ip4()
3676 p.tun_if.config_ip4()
3677 p.tun_if.generate_remote_hosts(N_NHS)
3678 self.pg0.generate_remote_hosts(N_NHS)
3679 self.pg0.configure_ipv4_neighbors()
3681 r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
3682 a = VppAcl(self, [r_all]).add_vpp_config()
3684 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
3685 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
3687 # setup some SAs for several next-hops on the interface
3688 self.multi_params = []
3690 for ii in range(N_NHS):
3691 p = copy.copy(self.ipv4_params)
3693 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3694 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3695 p.scapy_tun_spi = p.scapy_tun_spi + ii
3696 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3697 p.vpp_tun_spi = p.vpp_tun_spi + ii
3699 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3700 p.scapy_tra_spi = p.scapy_tra_spi + ii
3701 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3702 p.vpp_tra_spi = p.vpp_tra_spi + ii
3703 p.hop_limit = ii + 10
3704 p.tun_sa_out = VppIpsecSA(
3710 p.crypt_algo_vpp_id,
3712 self.vpp_esp_protocol,
3714 self.pg0.remote_hosts[ii].ip4,
3715 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3716 hop_limit=p.hop_limit,
3718 p.tun_sa_out.add_vpp_config()
3720 p.tun_sa_in = VppIpsecSA(
3726 p.crypt_algo_vpp_id,
3728 self.vpp_esp_protocol,
3729 self.pg0.remote_hosts[ii].ip4,
3731 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3732 hop_limit=p.hop_limit,
3734 p.tun_sa_in.add_vpp_config()
3736 p.tun_protect = VppIpsecTunProtect(
3741 nh=p.tun_if.remote_hosts[ii].ip4,
3743 p.tun_protect.add_vpp_config()
3746 self.encryption_type,
3749 self.pg0.remote_hosts[ii].ip4,
3751 self.multi_params.append(p)
3753 p.via_tun_route = VppIpRoute(
3755 p.remote_tun_if_host,
3757 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
3760 p.tun_dst = self.pg0.remote_hosts[ii].ip4
3763 p = self.ipv4_params
3764 p.tun_if.unconfig_ip4()
3765 super(TestIpsecMIfEsp4, self).tearDown()
3767 def test_tun_44(self):
3770 for p in self.multi_params:
3771 self.verify_tun_44(p, count=N_PKTS)
3773 # remove one tunnel protect, the rest should still work
3774 self.multi_params[0].tun_protect.remove_vpp_config()
3775 self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3776 self.multi_params[0].via_tun_route.remove_vpp_config()
3777 self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3779 for p in self.multi_params[1:]:
3780 self.verify_tun_44(p, count=N_PKTS)
3782 self.multi_params[0].tun_protect.add_vpp_config()
3783 self.multi_params[0].via_tun_route.add_vpp_config()
3785 for p in self.multi_params:
3786 self.verify_tun_44(p, count=N_PKTS)
3789 class TestIpsecItf6MPLS(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
3790 """IPsec Interface MPLSoIPv6"""
3792 tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3795 super(TestIpsecItf6MPLS, self).setUp()
3797 self.tun_if = self.pg0
3800 super(TestIpsecItf6MPLS, self).tearDown()
3802 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
3804 Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
3806 MPLS(label=66, ttl=3)
3807 / IPv6(src=src, dst=dst)
3808 / UDP(sport=1166, dport=2233)
3809 / Raw(b"X" * payload_size)
3811 for i in range(count)
3814 def verify_encrypted6(self, p, sa, rxs):
3817 pkt = sa.decrypt(rx[IPv6])
3818 if not pkt.haslayer(IPv6):
3819 pkt = IP(pkt[Raw].load)
3820 self.assert_packet_checksums_valid(pkt)
3821 self.assert_equal(pkt[MPLS].label, 66)
3822 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3823 except (IndexError, AssertionError):
3824 self.logger.debug(ppp("Unexpected packet:", rx))
3826 self.logger.debug(ppp("Decrypted packet:", pkt))
3831 def test_tun_mpls_o_ip6(self):
3832 """IPSEC interface MPLS over IPv6"""
3835 p = self.ipv6_params
3838 tbl = VppMplsTable(self, 0)
3839 tbl.add_vpp_config()
3841 self.config_network(p)
3842 # deag MPLS routes from the tunnel
3847 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3848 eos_proto=f.FIB_PATH_NH_PROTO_IP6,
3853 p.tun_if.remote_ip6, p.tun_if.sw_if_index, labels=[VppMplsLabel(66)]
3857 p.tun_if.enable_mpls()
3859 self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
3860 self.config_protect(p)
3862 self.verify_tun_66(p, count=n_pkts)
3865 p.tun_if.disable_mpls()
3866 self.unconfig_protect(p)
3868 self.unconfig_network(p)
3871 if __name__ == "__main__":
3872 unittest.main(testRunner=VppTestRunner)