+ def _ipsec_create_tunnel_interfaces_dut1_papi(
+ nodes: dict,
+ tun_ips: dict,
+ if1_key: str,
+ if2_key: str,
+ n_tunnels: int,
+ crypto_alg: CryptoAlg.InputType,
+ integ_alg: IntegAlg.InputType,
+ raddr_ip2: Union[IPv4Address, IPv6Address],
+ addr_incr: int,
+ spi_d: dict,
+ existing_tunnels: int = 0,
+ ) -> Tuple[List[bytes], List[bytes]]:
+ """Create multiple IPsec tunnel interfaces on DUT1 node using PAPI.
+
+ Generate random keys and return them (so DUT2 or TG can decrypt).
+
+ :param nodes: VPP nodes to create tunnel interfaces.
+ :param tun_ips: Dictionary with VPP node 1 ipsec tunnel interface
+ IPv4/IPv6 address (ip1) and VPP node 2 ipsec tunnel interface
+ IPv4/IPv6 address (ip2).
+ :param if1_key: VPP node 1 interface key from topology file.
+ :param if2_key: VPP node 2 / TG node (in case of 2-node topology)
+ interface key from topology file.
+ :param n_tunnels: Number of tunnel interfaces to be there at the end.
+ :param crypto_alg: The encryption algorithm name.
+ :param integ_alg: The integrity algorithm name.
+ :param raddr_ip2: Policy selector remote IPv4/IPv6 start address for the
+ first tunnel in direction node2->node1.
+ :param spi_d: Dictionary with SPIs for VPP node 1 and VPP node 2.
+ :param addr_incr: IP / IPv6 address incremental step.
+ :param existing_tunnels: Number of tunnel interfaces before creation.
+ Useful mainly for reconf tests. Default 0.
+ :type nodes: dict
+ :type tun_ips: dict
+ :type if1_key: str
+ :type if2_key: str
+ :type n_tunnels: int
+ :type crypto_alg: CryptoAlg.InputType
+ :type integ_alg: IntegAlg.InputType
+ :type raddr_ip2: Union[IPv4Address, IPv6Address]
+ :type addr_incr: int
+ :type spi_d: dict
+ :type existing_tunnels: int
+ :returns: Generated ckeys and ikeys.
+ :rtype: List[bytes], List[bytes]
+ """
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
+ if not existing_tunnels:
+ loop_sw_if_idx = IPsecUtil._ipsec_create_loopback_dut1_papi(
+ nodes, tun_ips, if1_key, if2_key
+ )
+ else:
+ loop_sw_if_idx = InterfaceUtil.vpp_get_interface_sw_index(
+ nodes["DUT1"], "loop0"
+ )
+ with PapiSocketExecutor(nodes["DUT1"], is_async=True) as papi_exec:
+ # Configure IP addresses on loop0 interface
+ cmd = "sw_interface_add_del_address"
+ args = dict(
+ sw_if_index=loop_sw_if_idx,
+ is_add=True,
+ del_all=False,
+ prefix=None,
+ )
+ for i in range(existing_tunnels, n_tunnels):
+ args["prefix"] = IPUtil.create_prefix_object(
+ tun_ips["ip1"] + i * addr_incr,
+ 128 if tun_ips["ip1"].version == 6 else 32,
+ )
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ # Configure IPIP tunnel interfaces
+ cmd = "ipip_add_tunnel"
+ ipip_tunnel = dict(
+ instance=Constants.BITWISE_NON_ZERO,
+ src=None,
+ dst=None,
+ table_id=0,
+ flags=int(
+ TunnelEncpaDecapFlags.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
+ ),
+ mode=int(TunnelMode.TUNNEL_API_MODE_P2P),
+ dscp=int(IpDscp.IP_API_DSCP_CS0),
+ )
+ args = dict(tunnel=ipip_tunnel)
+ ipip_tunnels = [None] * existing_tunnels
+ for i in range(existing_tunnels, n_tunnels):
+ ipip_tunnel["src"] = IPAddress.create_ip_address_object(
+ tun_ips["ip1"] + i * addr_incr
+ )
+ ipip_tunnel["dst"] = IPAddress.create_ip_address_object(
+ tun_ips["ip2"]
+ )
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = (
+ "Failed to add IPIP tunnel interfaces on host"
+ f" {nodes['DUT1']['host']}"
+ )
+ ipip_tunnels.extend(
+ [
+ reply["sw_if_index"]
+ for reply in papi_exec.get_replies(err_msg)
+ if "sw_if_index" in reply
+ ]
+ )
+ # Configure IPSec SAD entries
+ ckeys = [bytes()] * existing_tunnels
+ ikeys = [bytes()] * existing_tunnels
+ cmd = "ipsec_sad_entry_add_v2"
+ c_key = dict(length=0, data=None)
+ i_key = dict(length=0, data=None)
+ common_flags = IPsecSadFlags.IPSEC_API_SAD_FLAG_NONE
+ sad_entry = dict(
+ sad_id=None,
+ spi=None,
+ protocol=IPsecProto.ESP,
+ crypto_algorithm=crypto_alg.alg_int_repr,
+ crypto_key=c_key,
+ integrity_algorithm=integ_alg.alg_int_repr,
+ integrity_key=i_key,
+ flags=common_flags,
+ tunnel=dict(
+ src=0,
+ dst=0,
+ table_id=0,
+ encap_decap_flags=int(
+ TunnelEncpaDecapFlags.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
+ ),
+ dscp=int(IpDscp.IP_API_DSCP_CS0),
+ ),
+ salt=0,
+ udp_src_port=IPSEC_UDP_PORT_DEFAULT,
+ udp_dst_port=IPSEC_UDP_PORT_DEFAULT,
+ anti_replay_window_size=IPSEC_REPLAY_WINDOW_DEFAULT,
+ )
+ args = dict(entry=sad_entry)
+ for i in range(existing_tunnels, n_tunnels):
+ ckeys.append(gen_key(crypto_alg.key_len))
+ ikeys.append(gen_key(integ_alg.key_len))
+ # SAD entry for outband / tx path
+ sad_entry["sad_id"] = i
+ sad_entry["spi"] = spi_d["spi_1"] + i
+
+ sad_entry["crypto_key"]["length"] = len(ckeys[i])
+ sad_entry["crypto_key"]["data"] = ckeys[i]
+ if integ_alg:
+ sad_entry["integrity_key"]["length"] = len(ikeys[i])
+ sad_entry["integrity_key"]["data"] = ikeys[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ sad_entry["flags"] |= IPsecSadFlags.IPSEC_API_SAD_FLAG_IS_INBOUND
+ for i in range(existing_tunnels, n_tunnels):
+ # SAD entry for inband / rx path
+ sad_entry["sad_id"] = 100000 + i
+ sad_entry["spi"] = spi_d["spi_2"] + i
+
+ sad_entry["crypto_key"]["length"] = len(ckeys[i])
+ sad_entry["crypto_key"]["data"] = ckeys[i]
+ if integ_alg:
+ sad_entry["integrity_key"]["length"] = len(ikeys[i])
+ sad_entry["integrity_key"]["data"] = ikeys[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = (
+ "Failed to add IPsec SAD entries on host"
+ f" {nodes['DUT1']['host']}"
+ )
+ papi_exec.get_replies(err_msg)
+ # Add protection for tunnels with IPSEC
+ cmd = "ipsec_tunnel_protect_update"
+ n_hop = dict(
+ address=0,
+ via_label=MPLS_LABEL_INVALID,
+ obj_id=Constants.BITWISE_NON_ZERO,
+ )
+ ipsec_tunnel_protect = dict(
+ sw_if_index=None, nh=n_hop, sa_out=None, n_sa_in=1, sa_in=None
+ )
+ args = dict(tunnel=ipsec_tunnel_protect)
+ for i in range(existing_tunnels, n_tunnels):
+ args["tunnel"]["sw_if_index"] = ipip_tunnels[i]
+ args["tunnel"]["sa_out"] = i
+ args["tunnel"]["sa_in"] = [100000 + i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = (
+ "Failed to add protection for tunnels with IPSEC"
+ f" on host {nodes['DUT1']['host']}"
+ )
+ papi_exec.get_replies(err_msg)
+
+ # Configure unnumbered interfaces
+ cmd = "sw_interface_set_unnumbered"
+ args = dict(
+ is_add=True,
+ sw_if_index=InterfaceUtil.get_interface_index(
+ nodes["DUT1"], if1_key
+ ),
+ unnumbered_sw_if_index=0,
+ )
+ for i in range(existing_tunnels, n_tunnels):
+ args["unnumbered_sw_if_index"] = ipip_tunnels[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ # Set interfaces up
+ cmd = "sw_interface_set_flags"
+ args = dict(
+ sw_if_index=0,
+ flags=InterfaceStatusFlags.IF_STATUS_API_FLAG_ADMIN_UP.value,
+ )
+ for i in range(existing_tunnels, n_tunnels):
+ args["sw_if_index"] = ipip_tunnels[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ # Configure IP routes
+ cmd = "ip_route_add_del"
+ args = dict(is_add=1, is_multipath=0, route=None)
+ for i in range(existing_tunnels, n_tunnels):
+ args["route"] = IPUtil.compose_vpp_route_structure(
+ nodes["DUT1"],
+ (raddr_ip2 + i).compressed,
+ prefix_len=128 if raddr_ip2.version == 6 else 32,
+ interface=ipip_tunnels[i],
+ )
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = f"Failed to add IP routes on host {nodes['DUT1']['host']}"
+ papi_exec.get_replies(err_msg)
+
+ return ckeys, ikeys
+
+ @staticmethod
+ def _ipsec_create_tunnel_interfaces_dut2_papi(
+ nodes: dict,
+ tun_ips: dict,
+ if2_key: str,
+ n_tunnels: int,
+ crypto_alg: CryptoAlg.InputType,
+ ckeys: Sequence[bytes],
+ integ_alg: IntegAlg.InputType,
+ ikeys: Sequence[bytes],
+ raddr_ip1: Union[IPv4Address, IPv6Address],
+ addr_incr: int,
+ spi_d: dict,
+ existing_tunnels: int = 0,
+ ) -> None:
+ """Create multiple IPsec tunnel interfaces on DUT2 node using PAPI.
+
+ This method accesses keys generated by DUT1 method
+ and does not return anything.
+
+ :param nodes: VPP nodes to create tunnel interfaces.
+ :param tun_ips: Dictionary with VPP node 1 ipsec tunnel interface
+ IPv4/IPv6 address (ip1) and VPP node 2 ipsec tunnel interface
+ IPv4/IPv6 address (ip2).
+ :param if2_key: VPP node 2 / TG node (in case of 2-node topology)
+ interface key from topology file.
+ :param n_tunnels: Number of tunnel interfaces to be there at the end.
+ :param crypto_alg: The encryption algorithm name.
+ :param ckeys: List of encryption keys.
+ :param integ_alg: The integrity algorithm name.
+ :param ikeys: List of integrity keys.
+ :param raddr_ip1: Policy selector remote IPv4/IPv6 start address for the
+ first tunnel in direction node1->node2.
+ :param spi_d: Dictionary with SPIs for VPP node 1 and VPP node 2.
+ :param addr_incr: IP / IPv6 address incremental step.
+ :param existing_tunnels: Number of tunnel interfaces before creation.
+ Useful mainly for reconf tests. Default 0.
+ :type nodes: dict
+ :type tun_ips: dict
+ :type if2_key: str
+ :type n_tunnels: int
+ :type crypto_alg: CryptoAlg.InputType
+ :type ckeys: Sequence[bytes]
+ :type integ_alg: IntegAlg.InputType
+ :type ikeys: Sequence[bytes]
+ :type raddr_ip1: Union[IPv4Address, IPv6Address]
+ :type addr_incr: int
+ :type spi_d: dict
+ :type existing_tunnels: int
+ """
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
+ with PapiSocketExecutor(nodes["DUT2"], is_async=True) as papi_exec:
+ if not existing_tunnels:
+ # Set IP address on VPP node 2 interface
+ cmd = "sw_interface_add_del_address"
+ args = dict(
+ sw_if_index=InterfaceUtil.get_interface_index(
+ nodes["DUT2"], if2_key
+ ),
+ is_add=True,
+ del_all=False,
+ prefix=IPUtil.create_prefix_object(
+ tun_ips["ip2"],
+ 96 if tun_ips["ip2"].version == 6 else 24,
+ ),
+ )
+ err_msg = (
+ f"Failed to set IP address on interface {if2_key}"
+ f" on host {nodes['DUT2']['host']}"
+ )
+ papi_exec.add(cmd, **args).get_replies(err_msg)
+ # Configure IPIP tunnel interfaces
+ cmd = "ipip_add_tunnel"
+ ipip_tunnel = dict(
+ instance=Constants.BITWISE_NON_ZERO,
+ src=None,
+ dst=None,
+ table_id=0,
+ flags=int(
+ TunnelEncpaDecapFlags.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
+ ),
+ mode=int(TunnelMode.TUNNEL_API_MODE_P2P),
+ dscp=int(IpDscp.IP_API_DSCP_CS0),
+ )
+ args = dict(tunnel=ipip_tunnel)
+ ipip_tunnels = [None] * existing_tunnels
+ for i in range(existing_tunnels, n_tunnels):
+ ipip_tunnel["src"] = IPAddress.create_ip_address_object(
+ tun_ips["ip2"]
+ )
+ ipip_tunnel["dst"] = IPAddress.create_ip_address_object(
+ tun_ips["ip1"] + i * addr_incr
+ )
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = (
+ "Failed to add IPIP tunnel interfaces on host"
+ f" {nodes['DUT2']['host']}"
+ )
+ ipip_tunnels.extend(
+ [
+ reply["sw_if_index"]
+ for reply in papi_exec.get_replies(err_msg)
+ if "sw_if_index" in reply
+ ]
+ )
+ # Configure IPSec SAD entries
+ cmd = "ipsec_sad_entry_add_v2"
+ c_key = dict(length=0, data=None)
+ i_key = dict(length=0, data=None)
+ common_flags = IPsecSadFlags.IPSEC_API_SAD_FLAG_NONE
+ sad_entry = dict(
+ sad_id=None,
+ spi=None,
+ protocol=IPsecProto.ESP,
+ crypto_algorithm=crypto_alg.alg_int_repr,
+ crypto_key=c_key,
+ integrity_algorithm=integ_alg.alg_int_repr,
+ integrity_key=i_key,
+ flags=common_flags,
+ tunnel=dict(
+ src=0,
+ dst=0,
+ table_id=0,
+ encap_decap_flags=int(
+ TunnelEncpaDecapFlags.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
+ ),
+ dscp=int(IpDscp.IP_API_DSCP_CS0),
+ ),
+ salt=0,
+ udp_src_port=IPSEC_UDP_PORT_DEFAULT,
+ udp_dst_port=IPSEC_UDP_PORT_DEFAULT,
+ anti_replay_window_size=IPSEC_REPLAY_WINDOW_DEFAULT,
+ )
+ args = dict(entry=sad_entry)
+ for i in range(existing_tunnels, n_tunnels):
+ ckeys.append(gen_key(crypto_alg.key_len))
+ ikeys.append(gen_key(integ_alg.key_len))
+ # SAD entry for outband / tx path
+ sad_entry["sad_id"] = 100000 + i
+ sad_entry["spi"] = spi_d["spi_2"] + i
+
+ sad_entry["crypto_key"]["length"] = len(ckeys[i])
+ sad_entry["crypto_key"]["data"] = ckeys[i]
+ if integ_alg:
+ sad_entry["integrity_key"]["length"] = len(ikeys[i])
+ sad_entry["integrity_key"]["data"] = ikeys[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ sad_entry["flags"] |= IPsecSadFlags.IPSEC_API_SAD_FLAG_IS_INBOUND
+ for i in range(existing_tunnels, n_tunnels):
+ # SAD entry for inband / rx path
+ sad_entry["sad_id"] = i
+ sad_entry["spi"] = spi_d["spi_1"] + i
+
+ sad_entry["crypto_key"]["length"] = len(ckeys[i])
+ sad_entry["crypto_key"]["data"] = ckeys[i]
+ if integ_alg:
+ sad_entry["integrity_key"]["length"] = len(ikeys[i])
+ sad_entry["integrity_key"]["data"] = ikeys[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = (
+ f"Failed to add IPsec SAD entries on host"
+ f" {nodes['DUT2']['host']}"
+ )
+ papi_exec.get_replies(err_msg)
+ # Add protection for tunnels with IPSEC
+ cmd = "ipsec_tunnel_protect_update"
+ n_hop = dict(
+ address=0,
+ via_label=MPLS_LABEL_INVALID,
+ obj_id=Constants.BITWISE_NON_ZERO,
+ )
+ ipsec_tunnel_protect = dict(
+ sw_if_index=None, nh=n_hop, sa_out=None, n_sa_in=1, sa_in=None
+ )
+ args = dict(tunnel=ipsec_tunnel_protect)
+ for i in range(existing_tunnels, n_tunnels):
+ args["tunnel"]["sw_if_index"] = ipip_tunnels[i]
+ args["tunnel"]["sa_out"] = 100000 + i
+ args["tunnel"]["sa_in"] = [i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = (
+ "Failed to add protection for tunnels with IPSEC"
+ f" on host {nodes['DUT2']['host']}"
+ )
+ papi_exec.get_replies(err_msg)
+
+ if not existing_tunnels:
+ # Configure IP route
+ cmd = "ip_route_add_del"
+ route = IPUtil.compose_vpp_route_structure(
+ nodes["DUT2"],
+ tun_ips["ip1"].compressed,
+ prefix_len=32 if tun_ips["ip1"].version == 6 else 8,
+ interface=if2_key,
+ gateway=(tun_ips["ip2"] - 1).compressed,
+ )
+ args = dict(is_add=1, is_multipath=0, route=route)
+ papi_exec.add(cmd, **args)
+ # Configure unnumbered interfaces
+ cmd = "sw_interface_set_unnumbered"
+ args = dict(
+ is_add=True,
+ sw_if_index=InterfaceUtil.get_interface_index(
+ nodes["DUT2"], if2_key
+ ),
+ unnumbered_sw_if_index=0,
+ )
+ for i in range(existing_tunnels, n_tunnels):
+ args["unnumbered_sw_if_index"] = ipip_tunnels[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ # Set interfaces up
+ cmd = "sw_interface_set_flags"
+ args = dict(
+ sw_if_index=0,
+ flags=InterfaceStatusFlags.IF_STATUS_API_FLAG_ADMIN_UP.value,
+ )
+ for i in range(existing_tunnels, n_tunnels):
+ args["sw_if_index"] = ipip_tunnels[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ # Configure IP routes
+ cmd = "ip_route_add_del"
+ args = dict(is_add=1, is_multipath=0, route=None)
+ for i in range(existing_tunnels, n_tunnels):
+ args["route"] = IPUtil.compose_vpp_route_structure(
+ nodes["DUT1"],
+ (raddr_ip1 + i).compressed,
+ prefix_len=128 if raddr_ip1.version == 6 else 32,
+ interface=ipip_tunnels[i],
+ )
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = f"Failed to add IP routes on host {nodes['DUT2']['host']}"
+ papi_exec.get_replies(err_msg)
+
+ @staticmethod
+ def vpp_ipsec_create_tunnel_interfaces(
+ nodes: dict,
+ tun_if1_ip_addr: str,
+ tun_if2_ip_addr: str,
+ if1_key: str,
+ if2_key: str,
+ n_tunnels: int,
+ crypto_alg: CryptoAlg.InputType,
+ integ_alg: IntegAlg.InputType,
+ raddr_ip1: str,
+ raddr_ip2: str,
+ raddr_range: int,
+ existing_tunnels: int = 0,
+ return_keys: bool = False,
+ ) -> Optional[Tuple[List[bytes], List[bytes], int, int]]:
+ """Create multiple IPsec tunnel interfaces between two VPP nodes.
+
+ Some deployments (e.g. devicetest) need to know the generated keys.
+ But other deployments (e.g. scale perf test) would get spammed
+ if we returned keys every time.
+
+ :param nodes: VPP nodes to create tunnel interfaces.
+ :param tun_if1_ip_addr: VPP node 1 ipsec tunnel interface IPv4/IPv6
+ address.
+ :param tun_if2_ip_addr: VPP node 2 ipsec tunnel interface IPv4/IPv6
+ address.
+ :param if1_key: VPP node 1 interface key from topology file.
+ :param if2_key: VPP node 2 / TG node (in case of 2-node topology)
+ interface key from topology file.
+ :param n_tunnels: Number of tunnel interfaces to be there at the end.
+ :param crypto_alg: The encryption algorithm name.
+ :param integ_alg: The integrity algorithm name.
+ :param raddr_ip1: Policy selector remote IPv4/IPv6 start address for the
+ first tunnel in direction node1->node2.
+ :param raddr_ip2: Policy selector remote IPv4/IPv6 start address for the
+ first tunnel in direction node2->node1.
+ :param raddr_range: Mask specifying range of Policy selector Remote
+ IPv4/IPv6 addresses. Valid values are from 1 to 32 in case of IPv4
+ and to 128 in case of IPv6.
+ :param existing_tunnels: Number of tunnel interfaces before creation.
+ Useful mainly for reconf tests. Default 0.
+ :param return_keys: Whether generated keys should be returned.
+ :type nodes: dict
+ :type tun_if1_ip_addr: str
+ :type tun_if2_ip_addr: str
+ :type if1_key: str
+ :type if2_key: str
+ :type n_tunnels: int
+ :type crypto_alg: CryptoAlg.InputType
+ :type integ_alg: IntegAlg.InputType
+ :type raddr_ip1: str
+ :type raddr_ip2: str
+ :type raddr_range: int
+ :type existing_tunnels: int
+ :type return_keys: bool
+ :returns: Ckeys, ikeys, spi_1, spi_2.
+ :rtype: Optional[Tuple[List[bytes], List[bytes], int, int]]
+ """
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
+ n_tunnels = int(n_tunnels)
+ existing_tunnels = int(existing_tunnels)
+ spi_d = dict(spi_1=100000, spi_2=200000)
+ tun_ips = dict(
+ ip1=ip_address(tun_if1_ip_addr), ip2=ip_address(tun_if2_ip_addr)
+ )
+ raddr_ip1 = ip_address(raddr_ip1)
+ raddr_ip2 = ip_address(raddr_ip2)
+ addr_incr = (
+ 1 << (128 - raddr_range)
+ if tun_ips["ip1"].version == 6
+ else 1 << (32 - raddr_range)
+ )
+
+ ckeys, ikeys = IPsecUtil._ipsec_create_tunnel_interfaces_dut1_papi(
+ nodes,
+ tun_ips,
+ if1_key,
+ if2_key,
+ n_tunnels,
+ crypto_alg,
+ integ_alg,
+ raddr_ip2,
+ addr_incr,
+ spi_d,
+ existing_tunnels,
+ )
+ if "DUT2" in nodes.keys():
+ IPsecUtil._ipsec_create_tunnel_interfaces_dut2_papi(
+ nodes,
+ tun_ips,
+ if2_key,
+ n_tunnels,
+ crypto_alg,
+ ckeys,
+ integ_alg,
+ ikeys,
+ raddr_ip1,
+ addr_incr,
+ spi_d,
+ existing_tunnels,
+ )
+
+ if return_keys:
+ return ckeys, ikeys, spi_d["spi_1"], spi_d["spi_2"]
+ return None
+
+ @staticmethod
+ def _create_ipsec_script_files(
+ dut: str, instances: int
+ ) -> List[TextIOWrapper]:
+ """Create script files for configuring IPsec in containers
+
+ :param dut: DUT node on which to create the script files
+ :param instances: number of containers on DUT node
+ :type dut: str
+ :type instances: int
+ :returns: Created opened file handles.
+ :rtype: List[TextIOWrapper]
+ """
+ scripts = []
+ for cnf in range(0, instances):
+ script_filename = (
+ f"/tmp/ipsec_create_tunnel_cnf_{dut}_{cnf + 1}.config"
+ )
+ scripts.append(open(script_filename, "w", encoding="utf-8"))
+ return scripts
+
+ @staticmethod
+ def _close_and_copy_ipsec_script_files(
+ dut: str, nodes: dict, instances: int, scripts: Sequence[TextIOWrapper]
+ ) -> None:
+ """Close created scripts and copy them to containers
+
+ :param dut: DUT node on which to create the script files
+ :param nodes: VPP nodes
+ :param instances: number of containers on DUT node
+ :param scripts: dictionary holding the script files
+ :type dut: str
+ :type nodes: dict
+ :type instances: int
+ :type scripts: dict
+ """
+ for cnf in range(0, instances):
+ scripts[cnf].close()
+ script_filename = (
+ f"/tmp/ipsec_create_tunnel_cnf_{dut}_{cnf + 1}.config"
+ )
+ scp_node(nodes[dut], script_filename, script_filename)
+
+ @staticmethod
+ def vpp_ipsec_add_multiple_tunnels(
+ nodes: dict,
+ interface1: Union[str, int],
+ interface2: Union[str, int],
+ n_tunnels: int,
+ crypto_alg: CryptoAlg.InputType,
+ integ_alg: IntegAlg.InputType,
+ tunnel_ip1: str,
+ tunnel_ip2: str,
+ raddr_ip1: str,
+ raddr_ip2: str,
+ raddr_range: int,
+ tunnel_addr_incr: bool = True,
+ ) -> None:
+ """Create multiple IPsec tunnels between two VPP nodes.
+
+ :param nodes: VPP nodes to create tunnels.
+ :param interface1: Interface name or sw_if_index on node 1.
+ :param interface2: Interface name or sw_if_index on node 2.
+ :param n_tunnels: Number of tunnels to create.
+ :param crypto_alg: The encryption algorithm name.
+ :param integ_alg: The integrity algorithm name.
+ :param tunnel_ip1: Tunnel node1 IPv4 address.
+ :param tunnel_ip2: Tunnel node2 IPv4 address.
+ :param raddr_ip1: Policy selector remote IPv4 start address for the
+ first tunnel in direction node1->node2.
+ :param raddr_ip2: Policy selector remote IPv4 start address for the
+ first tunnel in direction node2->node1.
+ :param raddr_range: Mask specifying range of Policy selector Remote
+ IPv4 addresses. Valid values are from 1 to 32.
+ :param tunnel_addr_incr: Enable or disable tunnel IP address
+ incremental step.
+ :type nodes: dict
+ :type interface1: Union[str, int]
+ :type interface2: Union[str, int]
+ :type n_tunnels: int
+ :type crypto_alg: CryptoAlg.InputType
+ :type integ_alg: IntegAlg.InputType
+ :type tunnel_ip1: str
+ :type tunnel_ip2: str
+ :type raddr_ip1: str
+ :type raddr_ip2: str
+ :type raddr_range: int
+ :type tunnel_addr_incr: bool
+ """
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
+
+ spd_id = 1
+ p_hi = 100
+ p_lo = 10
+ sa_id_1 = 100000
+ sa_id_2 = 200000
+ spi_1 = 300000
+ spi_2 = 400000
+
+ crypto_key = gen_key(crypto_alg.key_len).decode()
+ integ_key = gen_key(integ_alg.key_len).decode()
+ rmac = (
+ Topology.get_interface_mac(nodes["DUT2"], interface2)
+ if "DUT2" in nodes.keys()
+ else Topology.get_interface_mac(nodes["TG"], interface2)
+ )
+ IPsecUtil.vpp_ipsec_set_ip_route(
+ nodes["DUT1"],
+ n_tunnels,
+ tunnel_ip1,
+ raddr_ip2,
+ tunnel_ip2,
+ interface1,
+ raddr_range,
+ rmac,
+ )
+
+ IPsecUtil.vpp_ipsec_add_spd(nodes["DUT1"], spd_id)
+ IPsecUtil.vpp_ipsec_spd_add_if(nodes["DUT1"], spd_id, interface1)
+
+ addr_incr = (
+ 1 << (128 - 96)
+ if ip_address(tunnel_ip1).version == 6
+ else 1 << (32 - 24)
+ )
+ for i in range(n_tunnels // (addr_incr**2) + 1):
+ dut1_local_outbound_range = ip_network(
+ f"{ip_address(tunnel_ip1) + i*(addr_incr**3)}/8", False
+ ).with_prefixlen
+ dut1_remote_outbound_range = ip_network(
+ f"{ip_address(tunnel_ip2) + i*(addr_incr**3)}/8", False
+ ).with_prefixlen
+
+ IPsecUtil.vpp_ipsec_add_spd_entry(
+ nodes["DUT1"],
+ spd_id,
+ p_hi,
+ IpsecSpdAction.BYPASS,
+ inbound=False,
+ proto=IPsecProto.ESP,
+ laddr_range=dut1_local_outbound_range,
+ raddr_range=dut1_remote_outbound_range,
+ )
+ IPsecUtil.vpp_ipsec_add_spd_entry(
+ nodes["DUT1"],
+ spd_id,
+ p_hi,
+ IpsecSpdAction.BYPASS,
+ inbound=True,
+ proto=IPsecProto.ESP,
+ laddr_range=dut1_remote_outbound_range,
+ raddr_range=dut1_local_outbound_range,
+ )
+
+ IPsecUtil.vpp_ipsec_add_sad_entries(
+ nodes["DUT1"],
+ n_tunnels,
+ sa_id_1,
+ spi_1,
+ crypto_alg,
+ crypto_key,
+ integ_alg,
+ integ_key,
+ tunnel_ip1,
+ tunnel_ip2,
+ tunnel_addr_incr,
+ )
+
+ IPsecUtil.vpp_ipsec_add_spd_entries(
+ nodes["DUT1"],
+ n_tunnels,
+ spd_id,
+ priority=ObjIncrement(p_lo, 0),
+ action=IpsecSpdAction.PROTECT,
+ inbound=False,
+ sa_id=ObjIncrement(sa_id_1, 1),
+ raddr_range=NetworkIncrement(ip_network(raddr_ip2)),
+ )
+
+ IPsecUtil.vpp_ipsec_add_sad_entries(
+ nodes["DUT1"],
+ n_tunnels,
+ sa_id_2,
+ spi_2,
+ crypto_alg,
+ crypto_key,
+ integ_alg,
+ integ_key,
+ tunnel_ip2,
+ tunnel_ip1,
+ tunnel_addr_incr,
+ )
+ IPsecUtil.vpp_ipsec_add_spd_entries(
+ nodes["DUT1"],
+ n_tunnels,
+ spd_id,
+ priority=ObjIncrement(p_lo, 0),
+ action=IpsecSpdAction.PROTECT,
+ inbound=True,
+ sa_id=ObjIncrement(sa_id_2, 1),
+ raddr_range=NetworkIncrement(ip_network(raddr_ip1)),
+ )
+
+ if "DUT2" in nodes.keys():
+ rmac = Topology.get_interface_mac(nodes["DUT1"], interface1)
+ IPsecUtil.vpp_ipsec_set_ip_route(
+ nodes["DUT2"],
+ n_tunnels,
+ tunnel_ip2,
+ raddr_ip1,
+ tunnel_ip1,
+ interface2,
+ raddr_range,
+ rmac,
+ )
+
+ IPsecUtil.vpp_ipsec_add_spd(nodes["DUT2"], spd_id)
+ IPsecUtil.vpp_ipsec_spd_add_if(nodes["DUT2"], spd_id, interface2)
+ for i in range(n_tunnels // (addr_incr**2) + 1):
+ dut2_local_outbound_range = ip_network(
+ f"{ip_address(tunnel_ip1) + i*(addr_incr**3)}/8", False
+ ).with_prefixlen
+ dut2_remote_outbound_range = ip_network(
+ f"{ip_address(tunnel_ip2) + i*(addr_incr**3)}/8", False
+ ).with_prefixlen
+
+ IPsecUtil.vpp_ipsec_add_spd_entry(
+ nodes["DUT2"],
+ spd_id,
+ p_hi,
+ IpsecSpdAction.BYPASS,
+ inbound=False,
+ proto=IPsecProto.ESP,
+ laddr_range=dut2_remote_outbound_range,
+ raddr_range=dut2_local_outbound_range,
+ )
+ IPsecUtil.vpp_ipsec_add_spd_entry(
+ nodes["DUT2"],
+ spd_id,
+ p_hi,
+ IpsecSpdAction.BYPASS,
+ inbound=True,
+ proto=IPsecProto.ESP,
+ laddr_range=dut2_local_outbound_range,
+ raddr_range=dut2_remote_outbound_range,
+ )
+
+ IPsecUtil.vpp_ipsec_add_sad_entries(
+ nodes["DUT2"],
+ n_tunnels,
+ sa_id_1,
+ spi_1,
+ crypto_alg,
+ crypto_key,
+ integ_alg,
+ integ_key,
+ tunnel_ip1,
+ tunnel_ip2,
+ tunnel_addr_incr,
+ )
+ IPsecUtil.vpp_ipsec_add_spd_entries(
+ nodes["DUT2"],
+ n_tunnels,
+ spd_id,
+ priority=ObjIncrement(p_lo, 0),
+ action=IpsecSpdAction.PROTECT,
+ inbound=True,
+ sa_id=ObjIncrement(sa_id_1, 1),
+ raddr_range=NetworkIncrement(ip_network(raddr_ip2)),
+ )
+
+ IPsecUtil.vpp_ipsec_add_sad_entries(
+ nodes["DUT2"],
+ n_tunnels,
+ sa_id_2,
+ spi_2,
+ crypto_alg,
+ crypto_key,
+ integ_alg,
+ integ_key,
+ tunnel_ip2,
+ tunnel_ip1,
+ tunnel_addr_incr,
+ )
+ IPsecUtil.vpp_ipsec_add_spd_entries(
+ nodes["DUT2"],
+ n_tunnels,
+ spd_id,
+ priority=ObjIncrement(p_lo, 0),
+ action=IpsecSpdAction.PROTECT,
+ inbound=False,
+ sa_id=ObjIncrement(sa_id_2, 1),
+ raddr_range=NetworkIncrement(ip_network(raddr_ip1)),
+ )
+
+ @staticmethod
+ def vpp_ipsec_show_all(node: dict) -> None:
+ """Run "show ipsec all" debug CLI command.