+ :type sa_id: Optional[ObjIncrement]
+ :type proto: IPsecProto.InputType
+ :type laddr_range: Optional[NetworkIncrement]
+ :type raddr_range: Optional[NetworkIncrement]
+ :type lport_range: Optional[str]
+ :type rport_range: Optional[str]
+ :type is_ipv6: bool
+ """
+ action = get_enum_instance(IpsecSpdAction, action)
+ proto = get_enum_instance(IPsecProto, proto)
+ if laddr_range is None:
+ laddr_range = "::/0" if is_ipv6 else "0.0.0.0/0"
+ laddr_range = NetworkIncrement(ip_network(laddr_range), 0)
+
+ if raddr_range is None:
+ raddr_range = "::/0" if is_ipv6 else "0.0.0.0/0"
+ raddr_range = NetworkIncrement(ip_network(raddr_range), 0)
+
+ err_msg = (
+ "Failed to add entry to Security Policy Database"
+ f" {spd_id} on host {node['host']}"
+ )
+ with PapiSocketExecutor(node, is_async=True) as papi_exec:
+ for _ in range(n_entries):
+ IPsecUtil._vpp_ipsec_add_spd_entry_internal(
+ papi_exec,
+ spd_id,
+ next(priority),
+ action,
+ inbound,
+ next(sa_id) if sa_id is not None else sa_id,
+ proto,
+ next(laddr_range),
+ next(raddr_range),
+ lport_range,
+ rport_range,
+ is_ipv6,
+ )
+ papi_exec.get_replies(err_msg)
+
+ @staticmethod
+ def _ipsec_create_loopback_dut1_papi(
+ nodes: dict, tun_ips: dict, if1_key: str, if2_key: str
+ ) -> int:
+ """Create loopback interface and set IP address on VPP node 1 interface
+ using PAPI.
+
+ :param nodes: VPP nodes to create tunnel interfaces.
+ :param tun_ips: Dictionary with VPP node 1 ipsec tunnel interface
+ IPv4/IPv6 address (ip1) and VPP node 2 ipsec tunnel interface
+ IPv4/IPv6 address (ip2).
+ :param if1_key: VPP node 1 interface key from topology file.
+ :param if2_key: VPP node 2 / TG node (in case of 2-node topology)
+ interface key from topology file.
+ :type nodes: dict
+ :type tun_ips: dict
+ :type if1_key: str
+ :type if2_key: str
+ :returns: sw_if_idx Of the created loopback interface.
+ :rtype: int
+ """
+ with PapiSocketExecutor(nodes["DUT1"]) as papi_exec:
+ # Create loopback interface on DUT1, set it to up state
+ cmd = "create_loopback_instance"
+ args = dict(
+ mac_address=0,
+ is_specified=False,
+ user_instance=0,
+ )
+ err_msg = (
+ "Failed to create loopback interface"
+ f" on host {nodes['DUT1']['host']}"
+ )
+ papi_exec.add(cmd, **args)
+ loop_sw_if_idx = papi_exec.get_sw_if_index(err_msg)
+ cmd = "sw_interface_set_flags"
+ args = dict(
+ sw_if_index=loop_sw_if_idx,
+ flags=InterfaceStatusFlags.IF_STATUS_API_FLAG_ADMIN_UP.value,
+ )
+ err_msg = (
+ "Failed to set loopback interface state up"
+ f" on host {nodes['DUT1']['host']}"
+ )
+ papi_exec.add(cmd, **args).get_reply(err_msg)
+ # Set IP address on VPP node 1 interface
+ cmd = "sw_interface_add_del_address"
+ args = dict(
+ sw_if_index=InterfaceUtil.get_interface_index(
+ nodes["DUT1"], if1_key
+ ),
+ is_add=True,
+ del_all=False,
+ prefix=IPUtil.create_prefix_object(
+ tun_ips["ip2"] - 1,
+ 96 if tun_ips["ip2"].version == 6 else 24,
+ ),
+ )
+ err_msg = (
+ f"Failed to set IP address on interface {if1_key}"
+ f" on host {nodes['DUT1']['host']}"
+ )
+ papi_exec.add(cmd, **args).get_reply(err_msg)
+ cmd2 = "ip_neighbor_add_del"
+ args2 = dict(
+ is_add=1,
+ neighbor=dict(
+ sw_if_index=Topology.get_interface_sw_index(
+ nodes["DUT1"], if1_key
+ ),
+ flags=1,
+ mac_address=str(
+ Topology.get_interface_mac(nodes["DUT2"], if2_key)
+ if "DUT2" in nodes.keys()
+ else Topology.get_interface_mac(nodes["TG"], if2_key)
+ ),
+ ip_address=tun_ips["ip2"].compressed,
+ ),
+ )
+ err_msg = f"Failed to add IP neighbor on interface {if1_key}"
+ papi_exec.add(cmd2, **args2).get_reply(err_msg)
+
+ return loop_sw_if_idx
+
+ @staticmethod
+ def _ipsec_create_tunnel_interfaces_dut1_papi(
+ nodes: dict,
+ tun_ips: dict,
+ if1_key: str,
+ if2_key: str,
+ n_tunnels: int,
+ crypto_alg: CryptoAlg.InputType,
+ integ_alg: IntegAlg.InputType,
+ raddr_ip2: Union[IPv4Address, IPv6Address],
+ addr_incr: int,
+ spi_d: dict,
+ existing_tunnels: int = 0,
+ ) -> Tuple[List[bytes], List[bytes]]:
+ """Create multiple IPsec tunnel interfaces on DUT1 node using PAPI.
+
+ Generate random keys and return them (so DUT2 or TG can decrypt).
+
+ :param nodes: VPP nodes to create tunnel interfaces.
+ :param tun_ips: Dictionary with VPP node 1 ipsec tunnel interface
+ IPv4/IPv6 address (ip1) and VPP node 2 ipsec tunnel interface
+ IPv4/IPv6 address (ip2).
+ :param if1_key: VPP node 1 interface key from topology file.
+ :param if2_key: VPP node 2 / TG node (in case of 2-node topology)
+ interface key from topology file.
+ :param n_tunnels: Number of tunnel interfaces to be there at the end.
+ :param crypto_alg: The encryption algorithm name.
+ :param integ_alg: The integrity algorithm name.
+ :param raddr_ip2: Policy selector remote IPv4/IPv6 start address for the
+ first tunnel in direction node2->node1.
+ :param spi_d: Dictionary with SPIs for VPP node 1 and VPP node 2.
+ :param addr_incr: IP / IPv6 address incremental step.
+ :param existing_tunnels: Number of tunnel interfaces before creation.
+ Useful mainly for reconf tests. Default 0.
+ :type nodes: dict
+ :type tun_ips: dict
+ :type if1_key: str
+ :type if2_key: str
+ :type n_tunnels: int
+ :type crypto_alg: CryptoAlg.InputType
+ :type integ_alg: IntegAlg.InputType
+ :type raddr_ip2: Union[IPv4Address, IPv6Address]
+ :type addr_incr: int
+ :type spi_d: dict
+ :type existing_tunnels: int
+ :returns: Generated ckeys and ikeys.
+ :rtype: List[bytes], List[bytes]
+ """
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
+ if not existing_tunnels:
+ loop_sw_if_idx = IPsecUtil._ipsec_create_loopback_dut1_papi(
+ nodes, tun_ips, if1_key, if2_key
+ )
+ else:
+ loop_sw_if_idx = InterfaceUtil.vpp_get_interface_sw_index(
+ nodes["DUT1"], "loop0"
+ )
+ with PapiSocketExecutor(nodes["DUT1"], is_async=True) as papi_exec:
+ # Configure IP addresses on loop0 interface
+ cmd = "sw_interface_add_del_address"
+ args = dict(
+ sw_if_index=loop_sw_if_idx,
+ is_add=True,
+ del_all=False,
+ prefix=None,
+ )
+ for i in range(existing_tunnels, n_tunnels):
+ args["prefix"] = IPUtil.create_prefix_object(
+ tun_ips["ip1"] + i * addr_incr,
+ 128 if tun_ips["ip1"].version == 6 else 32,
+ )
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ # Configure IPIP tunnel interfaces
+ cmd = "ipip_add_tunnel"
+ ipip_tunnel = dict(
+ instance=Constants.BITWISE_NON_ZERO,
+ src=None,
+ dst=None,
+ table_id=0,
+ flags=int(
+ TunnelEncpaDecapFlags.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
+ ),
+ mode=int(TunnelMode.TUNNEL_API_MODE_P2P),
+ dscp=int(IpDscp.IP_API_DSCP_CS0),
+ )
+ args = dict(tunnel=ipip_tunnel)
+ ipip_tunnels = [None] * existing_tunnels
+ for i in range(existing_tunnels, n_tunnels):
+ ipip_tunnel["src"] = IPAddress.create_ip_address_object(
+ tun_ips["ip1"] + i * addr_incr
+ )
+ ipip_tunnel["dst"] = IPAddress.create_ip_address_object(
+ tun_ips["ip2"]
+ )
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = (
+ "Failed to add IPIP tunnel interfaces on host"
+ f" {nodes['DUT1']['host']}"
+ )
+ ipip_tunnels.extend(
+ [
+ reply["sw_if_index"]
+ for reply in papi_exec.get_replies(err_msg)
+ if "sw_if_index" in reply
+ ]
+ )
+ # Configure IPSec SAD entries
+ ckeys = [bytes()] * existing_tunnels
+ ikeys = [bytes()] * existing_tunnels
+ cmd = "ipsec_sad_entry_add_v2"
+ c_key = dict(length=0, data=None)
+ i_key = dict(length=0, data=None)
+ common_flags = IPsecSadFlags.IPSEC_API_SAD_FLAG_NONE
+ sad_entry = dict(
+ sad_id=None,
+ spi=None,
+ protocol=IPsecProto.ESP,
+ crypto_algorithm=crypto_alg.alg_int_repr,
+ crypto_key=c_key,
+ integrity_algorithm=integ_alg.alg_int_repr,
+ integrity_key=i_key,
+ flags=common_flags,
+ tunnel=dict(
+ src=0,
+ dst=0,
+ table_id=0,
+ encap_decap_flags=int(
+ TunnelEncpaDecapFlags.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
+ ),
+ dscp=int(IpDscp.IP_API_DSCP_CS0),
+ ),
+ salt=0,
+ udp_src_port=IPSEC_UDP_PORT_DEFAULT,
+ udp_dst_port=IPSEC_UDP_PORT_DEFAULT,
+ anti_replay_window_size=IPSEC_REPLAY_WINDOW_DEFAULT,
+ )
+ args = dict(entry=sad_entry)
+ for i in range(existing_tunnels, n_tunnels):
+ ckeys.append(gen_key(crypto_alg.key_len))
+ ikeys.append(gen_key(integ_alg.key_len))
+ # SAD entry for outband / tx path
+ sad_entry["sad_id"] = i
+ sad_entry["spi"] = spi_d["spi_1"] + i
+
+ sad_entry["crypto_key"]["length"] = len(ckeys[i])
+ sad_entry["crypto_key"]["data"] = ckeys[i]
+ if integ_alg:
+ sad_entry["integrity_key"]["length"] = len(ikeys[i])
+ sad_entry["integrity_key"]["data"] = ikeys[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ sad_entry["flags"] |= IPsecSadFlags.IPSEC_API_SAD_FLAG_IS_INBOUND
+ for i in range(existing_tunnels, n_tunnels):
+ # SAD entry for inband / rx path
+ sad_entry["sad_id"] = 100000 + i
+ sad_entry["spi"] = spi_d["spi_2"] + i
+
+ sad_entry["crypto_key"]["length"] = len(ckeys[i])
+ sad_entry["crypto_key"]["data"] = ckeys[i]
+ if integ_alg:
+ sad_entry["integrity_key"]["length"] = len(ikeys[i])
+ sad_entry["integrity_key"]["data"] = ikeys[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = (
+ "Failed to add IPsec SAD entries on host"
+ f" {nodes['DUT1']['host']}"
+ )
+ papi_exec.get_replies(err_msg)
+ # Add protection for tunnels with IPSEC
+ cmd = "ipsec_tunnel_protect_update"
+ n_hop = dict(
+ address=0,
+ via_label=MPLS_LABEL_INVALID,
+ obj_id=Constants.BITWISE_NON_ZERO,
+ )
+ ipsec_tunnel_protect = dict(
+ sw_if_index=None, nh=n_hop, sa_out=None, n_sa_in=1, sa_in=None
+ )
+ args = dict(tunnel=ipsec_tunnel_protect)
+ for i in range(existing_tunnels, n_tunnels):
+ args["tunnel"]["sw_if_index"] = ipip_tunnels[i]
+ args["tunnel"]["sa_out"] = i
+ args["tunnel"]["sa_in"] = [100000 + i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = (
+ "Failed to add protection for tunnels with IPSEC"
+ f" on host {nodes['DUT1']['host']}"
+ )
+ papi_exec.get_replies(err_msg)
+
+ # Configure unnumbered interfaces
+ cmd = "sw_interface_set_unnumbered"
+ args = dict(
+ is_add=True,
+ sw_if_index=InterfaceUtil.get_interface_index(
+ nodes["DUT1"], if1_key
+ ),
+ unnumbered_sw_if_index=0,
+ )
+ for i in range(existing_tunnels, n_tunnels):
+ args["unnumbered_sw_if_index"] = ipip_tunnels[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ # Set interfaces up
+ cmd = "sw_interface_set_flags"
+ args = dict(
+ sw_if_index=0,
+ flags=InterfaceStatusFlags.IF_STATUS_API_FLAG_ADMIN_UP.value,
+ )
+ for i in range(existing_tunnels, n_tunnels):
+ args["sw_if_index"] = ipip_tunnels[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ # Configure IP routes
+ cmd = "ip_route_add_del"
+ args = dict(is_add=1, is_multipath=0, route=None)
+ for i in range(existing_tunnels, n_tunnels):
+ args["route"] = IPUtil.compose_vpp_route_structure(
+ nodes["DUT1"],
+ (raddr_ip2 + i).compressed,
+ prefix_len=128 if raddr_ip2.version == 6 else 32,
+ interface=ipip_tunnels[i],
+ )
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = f"Failed to add IP routes on host {nodes['DUT1']['host']}"
+ papi_exec.get_replies(err_msg)
+
+ return ckeys, ikeys
+
+ @staticmethod
+ def _ipsec_create_tunnel_interfaces_dut2_papi(
+ nodes: dict,
+ tun_ips: dict,
+ if2_key: str,
+ n_tunnels: int,
+ crypto_alg: CryptoAlg.InputType,
+ ckeys: Sequence[bytes],
+ integ_alg: IntegAlg.InputType,
+ ikeys: Sequence[bytes],
+ raddr_ip1: Union[IPv4Address, IPv6Address],
+ addr_incr: int,
+ spi_d: dict,
+ existing_tunnels: int = 0,
+ ) -> None:
+ """Create multiple IPsec tunnel interfaces on DUT2 node using PAPI.
+
+ This method accesses keys generated by DUT1 method
+ and does not return anything.
+
+ :param nodes: VPP nodes to create tunnel interfaces.
+ :param tun_ips: Dictionary with VPP node 1 ipsec tunnel interface
+ IPv4/IPv6 address (ip1) and VPP node 2 ipsec tunnel interface
+ IPv4/IPv6 address (ip2).
+ :param if2_key: VPP node 2 / TG node (in case of 2-node topology)
+ interface key from topology file.
+ :param n_tunnels: Number of tunnel interfaces to be there at the end.
+ :param crypto_alg: The encryption algorithm name.
+ :param ckeys: List of encryption keys.
+ :param integ_alg: The integrity algorithm name.
+ :param ikeys: List of integrity keys.
+ :param raddr_ip1: Policy selector remote IPv4/IPv6 start address for the
+ first tunnel in direction node1->node2.
+ :param spi_d: Dictionary with SPIs for VPP node 1 and VPP node 2.
+ :param addr_incr: IP / IPv6 address incremental step.
+ :param existing_tunnels: Number of tunnel interfaces before creation.
+ Useful mainly for reconf tests. Default 0.
+ :type nodes: dict
+ :type tun_ips: dict
+ :type if2_key: str
+ :type n_tunnels: int
+ :type crypto_alg: CryptoAlg.InputType
+ :type ckeys: Sequence[bytes]
+ :type integ_alg: IntegAlg.InputType
+ :type ikeys: Sequence[bytes]
+ :type raddr_ip1: Union[IPv4Address, IPv6Address]
+ :type addr_incr: int
+ :type spi_d: dict
+ :type existing_tunnels: int
+ """
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
+ with PapiSocketExecutor(nodes["DUT2"], is_async=True) as papi_exec:
+ if not existing_tunnels:
+ # Set IP address on VPP node 2 interface
+ cmd = "sw_interface_add_del_address"
+ args = dict(
+ sw_if_index=InterfaceUtil.get_interface_index(
+ nodes["DUT2"], if2_key
+ ),
+ is_add=True,
+ del_all=False,
+ prefix=IPUtil.create_prefix_object(
+ tun_ips["ip2"],
+ 96 if tun_ips["ip2"].version == 6 else 24,
+ ),
+ )
+ err_msg = (
+ f"Failed to set IP address on interface {if2_key}"
+ f" on host {nodes['DUT2']['host']}"
+ )
+ papi_exec.add(cmd, **args).get_replies(err_msg)
+ # Configure IPIP tunnel interfaces
+ cmd = "ipip_add_tunnel"
+ ipip_tunnel = dict(
+ instance=Constants.BITWISE_NON_ZERO,
+ src=None,
+ dst=None,
+ table_id=0,
+ flags=int(
+ TunnelEncpaDecapFlags.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
+ ),
+ mode=int(TunnelMode.TUNNEL_API_MODE_P2P),
+ dscp=int(IpDscp.IP_API_DSCP_CS0),
+ )
+ args = dict(tunnel=ipip_tunnel)
+ ipip_tunnels = [None] * existing_tunnels
+ for i in range(existing_tunnels, n_tunnels):
+ ipip_tunnel["src"] = IPAddress.create_ip_address_object(
+ tun_ips["ip2"]
+ )
+ ipip_tunnel["dst"] = IPAddress.create_ip_address_object(
+ tun_ips["ip1"] + i * addr_incr
+ )
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = (
+ "Failed to add IPIP tunnel interfaces on host"
+ f" {nodes['DUT2']['host']}"
+ )
+ ipip_tunnels.extend(
+ [
+ reply["sw_if_index"]
+ for reply in papi_exec.get_replies(err_msg)
+ if "sw_if_index" in reply
+ ]
+ )
+ # Configure IPSec SAD entries
+ cmd = "ipsec_sad_entry_add_v2"
+ c_key = dict(length=0, data=None)
+ i_key = dict(length=0, data=None)
+ common_flags = IPsecSadFlags.IPSEC_API_SAD_FLAG_NONE
+ sad_entry = dict(
+ sad_id=None,
+ spi=None,
+ protocol=IPsecProto.ESP,
+ crypto_algorithm=crypto_alg.alg_int_repr,
+ crypto_key=c_key,
+ integrity_algorithm=integ_alg.alg_int_repr,
+ integrity_key=i_key,
+ flags=common_flags,
+ tunnel=dict(
+ src=0,
+ dst=0,
+ table_id=0,
+ encap_decap_flags=int(
+ TunnelEncpaDecapFlags.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
+ ),
+ dscp=int(IpDscp.IP_API_DSCP_CS0),
+ ),
+ salt=0,
+ udp_src_port=IPSEC_UDP_PORT_DEFAULT,
+ udp_dst_port=IPSEC_UDP_PORT_DEFAULT,
+ anti_replay_window_size=IPSEC_REPLAY_WINDOW_DEFAULT,
+ )
+ args = dict(entry=sad_entry)
+ for i in range(existing_tunnels, n_tunnels):
+ ckeys.append(gen_key(crypto_alg.key_len))
+ ikeys.append(gen_key(integ_alg.key_len))
+ # SAD entry for outband / tx path
+ sad_entry["sad_id"] = 100000 + i
+ sad_entry["spi"] = spi_d["spi_2"] + i
+
+ sad_entry["crypto_key"]["length"] = len(ckeys[i])
+ sad_entry["crypto_key"]["data"] = ckeys[i]
+ if integ_alg:
+ sad_entry["integrity_key"]["length"] = len(ikeys[i])
+ sad_entry["integrity_key"]["data"] = ikeys[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ sad_entry["flags"] |= IPsecSadFlags.IPSEC_API_SAD_FLAG_IS_INBOUND
+ for i in range(existing_tunnels, n_tunnels):
+ # SAD entry for inband / rx path
+ sad_entry["sad_id"] = i
+ sad_entry["spi"] = spi_d["spi_1"] + i
+
+ sad_entry["crypto_key"]["length"] = len(ckeys[i])
+ sad_entry["crypto_key"]["data"] = ckeys[i]
+ if integ_alg:
+ sad_entry["integrity_key"]["length"] = len(ikeys[i])
+ sad_entry["integrity_key"]["data"] = ikeys[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = (
+ f"Failed to add IPsec SAD entries on host"
+ f" {nodes['DUT2']['host']}"
+ )
+ papi_exec.get_replies(err_msg)
+ # Add protection for tunnels with IPSEC
+ cmd = "ipsec_tunnel_protect_update"
+ n_hop = dict(
+ address=0,
+ via_label=MPLS_LABEL_INVALID,
+ obj_id=Constants.BITWISE_NON_ZERO,
+ )
+ ipsec_tunnel_protect = dict(
+ sw_if_index=None, nh=n_hop, sa_out=None, n_sa_in=1, sa_in=None
+ )
+ args = dict(tunnel=ipsec_tunnel_protect)
+ for i in range(existing_tunnels, n_tunnels):
+ args["tunnel"]["sw_if_index"] = ipip_tunnels[i]
+ args["tunnel"]["sa_out"] = 100000 + i
+ args["tunnel"]["sa_in"] = [i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = (
+ "Failed to add protection for tunnels with IPSEC"
+ f" on host {nodes['DUT2']['host']}"
+ )
+ papi_exec.get_replies(err_msg)
+
+ if not existing_tunnels:
+ # Configure IP route
+ cmd = "ip_route_add_del"
+ route = IPUtil.compose_vpp_route_structure(
+ nodes["DUT2"],
+ tun_ips["ip1"].compressed,
+ prefix_len=32 if tun_ips["ip1"].version == 6 else 8,
+ interface=if2_key,
+ gateway=(tun_ips["ip2"] - 1).compressed,
+ )
+ args = dict(is_add=1, is_multipath=0, route=route)
+ papi_exec.add(cmd, **args)
+ # Configure unnumbered interfaces
+ cmd = "sw_interface_set_unnumbered"
+ args = dict(
+ is_add=True,
+ sw_if_index=InterfaceUtil.get_interface_index(
+ nodes["DUT2"], if2_key
+ ),
+ unnumbered_sw_if_index=0,
+ )
+ for i in range(existing_tunnels, n_tunnels):
+ args["unnumbered_sw_if_index"] = ipip_tunnels[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ # Set interfaces up
+ cmd = "sw_interface_set_flags"
+ args = dict(
+ sw_if_index=0,
+ flags=InterfaceStatusFlags.IF_STATUS_API_FLAG_ADMIN_UP.value,
+ )
+ for i in range(existing_tunnels, n_tunnels):
+ args["sw_if_index"] = ipip_tunnels[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ # Configure IP routes
+ cmd = "ip_route_add_del"
+ args = dict(is_add=1, is_multipath=0, route=None)
+ for i in range(existing_tunnels, n_tunnels):
+ args["route"] = IPUtil.compose_vpp_route_structure(
+ nodes["DUT1"],
+ (raddr_ip1 + i).compressed,
+ prefix_len=128 if raddr_ip1.version == 6 else 32,
+ interface=ipip_tunnels[i],
+ )
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = f"Failed to add IP routes on host {nodes['DUT2']['host']}"
+ papi_exec.get_replies(err_msg)
+
+ @staticmethod
+ def vpp_ipsec_create_tunnel_interfaces(
+ nodes: dict,
+ tun_if1_ip_addr: str,
+ tun_if2_ip_addr: str,
+ if1_key: str,
+ if2_key: str,
+ n_tunnels: int,
+ crypto_alg: CryptoAlg.InputType,
+ integ_alg: IntegAlg.InputType,
+ raddr_ip1: str,
+ raddr_ip2: str,
+ raddr_range: int,
+ existing_tunnels: int = 0,
+ return_keys: bool = False,
+ ) -> Optional[Tuple[List[bytes], List[bytes], int, int]]:
+ """Create multiple IPsec tunnel interfaces between two VPP nodes.
+
+ Some deployments (e.g. devicetest) need to know the generated keys.
+ But other deployments (e.g. scale perf test) would get spammed
+ if we returned keys every time.
+
+ :param nodes: VPP nodes to create tunnel interfaces.
+ :param tun_if1_ip_addr: VPP node 1 ipsec tunnel interface IPv4/IPv6
+ address.
+ :param tun_if2_ip_addr: VPP node 2 ipsec tunnel interface IPv4/IPv6
+ address.
+ :param if1_key: VPP node 1 interface key from topology file.
+ :param if2_key: VPP node 2 / TG node (in case of 2-node topology)
+ interface key from topology file.
+ :param n_tunnels: Number of tunnel interfaces to be there at the end.
+ :param crypto_alg: The encryption algorithm name.
+ :param integ_alg: The integrity algorithm name.
+ :param raddr_ip1: Policy selector remote IPv4/IPv6 start address for the
+ first tunnel in direction node1->node2.
+ :param raddr_ip2: Policy selector remote IPv4/IPv6 start address for the
+ first tunnel in direction node2->node1.
+ :param raddr_range: Mask specifying range of Policy selector Remote
+ IPv4/IPv6 addresses. Valid values are from 1 to 32 in case of IPv4
+ and to 128 in case of IPv6.
+ :param existing_tunnels: Number of tunnel interfaces before creation.
+ Useful mainly for reconf tests. Default 0.
+ :param return_keys: Whether generated keys should be returned.
+ :type nodes: dict
+ :type tun_if1_ip_addr: str
+ :type tun_if2_ip_addr: str
+ :type if1_key: str
+ :type if2_key: str
+ :type n_tunnels: int
+ :type crypto_alg: CryptoAlg.InputType
+ :type integ_alg: IntegAlg.InputType
+ :type raddr_ip1: str
+ :type raddr_ip2: str
+ :type raddr_range: int
+ :type existing_tunnels: int
+ :type return_keys: bool
+ :returns: Ckeys, ikeys, spi_1, spi_2.
+ :rtype: Optional[Tuple[List[bytes], List[bytes], int, int]]
+ """
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
+ n_tunnels = int(n_tunnels)
+ existing_tunnels = int(existing_tunnels)
+ spi_d = dict(spi_1=100000, spi_2=200000)
+ tun_ips = dict(
+ ip1=ip_address(tun_if1_ip_addr), ip2=ip_address(tun_if2_ip_addr)
+ )
+ raddr_ip1 = ip_address(raddr_ip1)
+ raddr_ip2 = ip_address(raddr_ip2)
+ addr_incr = (
+ 1 << (128 - raddr_range)
+ if tun_ips["ip1"].version == 6
+ else 1 << (32 - raddr_range)
+ )
+
+ ckeys, ikeys = IPsecUtil._ipsec_create_tunnel_interfaces_dut1_papi(
+ nodes,
+ tun_ips,
+ if1_key,
+ if2_key,
+ n_tunnels,
+ crypto_alg,
+ integ_alg,
+ raddr_ip2,
+ addr_incr,
+ spi_d,
+ existing_tunnels,
+ )
+ if "DUT2" in nodes.keys():
+ IPsecUtil._ipsec_create_tunnel_interfaces_dut2_papi(
+ nodes,
+ tun_ips,
+ if2_key,
+ n_tunnels,
+ crypto_alg,
+ ckeys,
+ integ_alg,
+ ikeys,
+ raddr_ip1,
+ addr_incr,
+ spi_d,
+ existing_tunnels,
+ )
+
+ if return_keys:
+ return ckeys, ikeys, spi_d["spi_1"], spi_d["spi_2"]
+ return None
+
+ @staticmethod
+ def _create_ipsec_script_files(
+ dut: str, instances: int
+ ) -> List[TextIOWrapper]:
+ """Create script files for configuring IPsec in containers
+
+ :param dut: DUT node on which to create the script files
+ :param instances: number of containers on DUT node
+ :type dut: str
+ :type instances: int
+ :returns: Created opened file handles.
+ :rtype: List[TextIOWrapper]
+ """
+ scripts = []
+ for cnf in range(0, instances):
+ script_filename = (
+ f"/tmp/ipsec_create_tunnel_cnf_{dut}_{cnf + 1}.config"
+ )
+ scripts.append(open(script_filename, "w", encoding="utf-8"))
+ return scripts
+
+ @staticmethod
+ def _close_and_copy_ipsec_script_files(
+ dut: str, nodes: dict, instances: int, scripts: Sequence[TextIOWrapper]
+ ) -> None:
+ """Close created scripts and copy them to containers
+
+ :param dut: DUT node on which to create the script files
+ :param nodes: VPP nodes
+ :param instances: number of containers on DUT node
+ :param scripts: dictionary holding the script files
+ :type dut: str
+ :type nodes: dict
+ :type instances: int
+ :type scripts: dict
+ """
+ for cnf in range(0, instances):
+ scripts[cnf].close()
+ script_filename = (
+ f"/tmp/ipsec_create_tunnel_cnf_{dut}_{cnf + 1}.config"
+ )
+ scp_node(nodes[dut], script_filename, script_filename)
+
+ @staticmethod
+ def vpp_ipsec_add_multiple_tunnels(
+ nodes: dict,
+ interface1: Union[str, int],
+ interface2: Union[str, int],
+ n_tunnels: int,
+ crypto_alg: CryptoAlg.InputType,
+ integ_alg: IntegAlg.InputType,
+ tunnel_ip1: str,
+ tunnel_ip2: str,
+ raddr_ip1: str,
+ raddr_ip2: str,
+ raddr_range: int,
+ tunnel_addr_incr: bool = True,
+ ) -> None:
+ """Create multiple IPsec tunnels between two VPP nodes.
+
+ :param nodes: VPP nodes to create tunnels.
+ :param interface1: Interface name or sw_if_index on node 1.
+ :param interface2: Interface name or sw_if_index on node 2.
+ :param n_tunnels: Number of tunnels to create.
+ :param crypto_alg: The encryption algorithm name.
+ :param integ_alg: The integrity algorithm name.
+ :param tunnel_ip1: Tunnel node1 IPv4 address.
+ :param tunnel_ip2: Tunnel node2 IPv4 address.
+ :param raddr_ip1: Policy selector remote IPv4 start address for the
+ first tunnel in direction node1->node2.
+ :param raddr_ip2: Policy selector remote IPv4 start address for the
+ first tunnel in direction node2->node1.
+ :param raddr_range: Mask specifying range of Policy selector Remote
+ IPv4 addresses. Valid values are from 1 to 32.
+ :param tunnel_addr_incr: Enable or disable tunnel IP address
+ incremental step.
+ :type nodes: dict
+ :type interface1: Union[str, int]
+ :type interface2: Union[str, int]
+ :type n_tunnels: int
+ :type crypto_alg: CryptoAlg.InputType
+ :type integ_alg: IntegAlg.InputType
+ :type tunnel_ip1: str
+ :type tunnel_ip2: str
+ :type raddr_ip1: str
+ :type raddr_ip2: str
+ :type raddr_range: int
+ :type tunnel_addr_incr: bool