+ def _ipsec_create_loopback_dut1_papi(
+ nodes: dict, tun_ips: dict, if1_key: str, if2_key: str
+ ) -> int:
+ """Create loopback interface and set IP address on VPP node 1 interface
+ using PAPI.
+
+ :param nodes: VPP nodes to create tunnel interfaces.
+ :param tun_ips: Dictionary with VPP node 1 ipsec tunnel interface
+ IPv4/IPv6 address (ip1) and VPP node 2 ipsec tunnel interface
+ IPv4/IPv6 address (ip2).
+ :param if1_key: VPP node 1 interface key from topology file.
+ :param if2_key: VPP node 2 / TG node (in case of 2-node topology)
+ interface key from topology file.
+ :type nodes: dict
+ :type tun_ips: dict
+ :type if1_key: str
+ :type if2_key: str
+ :returns: sw_if_idx Of the created loopback interface.
+ :rtype: int
+ """
+ with PapiSocketExecutor(nodes["DUT1"]) as papi_exec:
+ # Create loopback interface on DUT1, set it to up state
+ cmd = "create_loopback_instance"
+ args = dict(
+ mac_address=0,
+ is_specified=False,
+ user_instance=0,
+ )
+ err_msg = (
+ "Failed to create loopback interface"
+ f" on host {nodes['DUT1']['host']}"
+ )
+ papi_exec.add(cmd, **args)
+ loop_sw_if_idx = papi_exec.get_sw_if_index(err_msg)
+ cmd = "sw_interface_set_flags"
+ args = dict(
+ sw_if_index=loop_sw_if_idx,
+ flags=InterfaceStatusFlags.IF_STATUS_API_FLAG_ADMIN_UP.value,
+ )
+ err_msg = (
+ "Failed to set loopback interface state up"
+ f" on host {nodes['DUT1']['host']}"
+ )
+ papi_exec.add(cmd, **args).get_reply(err_msg)
+ # Set IP address on VPP node 1 interface
+ cmd = "sw_interface_add_del_address"
+ args = dict(
+ sw_if_index=InterfaceUtil.get_interface_index(
+ nodes["DUT1"], if1_key
+ ),
+ is_add=True,
+ del_all=False,
+ prefix=IPUtil.create_prefix_object(
+ tun_ips["ip2"] - 1,
+ 96 if tun_ips["ip2"].version == 6 else 24,
+ ),
+ )
+ err_msg = (
+ f"Failed to set IP address on interface {if1_key}"
+ f" on host {nodes['DUT1']['host']}"
+ )
+ papi_exec.add(cmd, **args).get_reply(err_msg)
+ cmd2 = "ip_neighbor_add_del"
+ args2 = dict(
+ is_add=1,
+ neighbor=dict(
+ sw_if_index=Topology.get_interface_sw_index(
+ nodes["DUT1"], if1_key
+ ),
+ flags=1,
+ mac_address=str(
+ Topology.get_interface_mac(nodes["DUT2"], if2_key)
+ if "DUT2" in nodes.keys()
+ else Topology.get_interface_mac(nodes["TG"], if2_key)
+ ),
+ ip_address=tun_ips["ip2"].compressed,
+ ),
+ )
+ err_msg = f"Failed to add IP neighbor on interface {if1_key}"
+ papi_exec.add(cmd2, **args2).get_reply(err_msg)
+
+ return loop_sw_if_idx
+
+ @staticmethod
+ def _ipsec_create_tunnel_interfaces_dut1_papi(
+ nodes: dict,
+ tun_ips: dict,
+ if1_key: str,
+ if2_key: str,
+ n_tunnels: int,
+ crypto_alg: CryptoAlg.InputType,
+ integ_alg: IntegAlg.InputType,
+ raddr_ip2: Union[IPv4Address, IPv6Address],
+ addr_incr: int,
+ spi_d: dict,
+ existing_tunnels: int = 0,
+ udp_encap: bool = False,
+ anti_replay: bool = False,
+ ) -> Tuple[List[bytes], List[bytes]]:
+ """Create multiple IPsec tunnel interfaces on DUT1 node using PAPI.
+
+ Generate random keys and return them (so DUT2 or TG can decrypt).
+
+ :param nodes: VPP nodes to create tunnel interfaces.
+ :param tun_ips: Dictionary with VPP node 1 ipsec tunnel interface
+ IPv4/IPv6 address (ip1) and VPP node 2 ipsec tunnel interface
+ IPv4/IPv6 address (ip2).
+ :param if1_key: VPP node 1 interface key from topology file.
+ :param if2_key: VPP node 2 / TG node (in case of 2-node topology)
+ interface key from topology file.
+ :param n_tunnels: Number of tunnel interfaces to be there at the end.
+ :param crypto_alg: The encryption algorithm name.
+ :param integ_alg: The integrity algorithm name.
+ :param raddr_ip2: Policy selector remote IPv4/IPv6 start address for the
+ first tunnel in direction node2->node1.
+ :param spi_d: Dictionary with SPIs for VPP node 1 and VPP node 2.
+ :param addr_incr: IP / IPv6 address incremental step.
+ :param existing_tunnels: Number of tunnel interfaces before creation.
+ Useful mainly for reconf tests. Default 0.
+ :param udp_encap: Whether to apply UDP_ENCAP flag.
+ :param anti_replay: Whether to apply USE_ANTI_REPLAY flag.
+ :type nodes: dict
+ :type tun_ips: dict
+ :type if1_key: str
+ :type if2_key: str
+ :type n_tunnels: int
+ :type crypto_alg: CryptoAlg.InputType
+ :type integ_alg: IntegAlg.InputType
+ :type raddr_ip2: Union[IPv4Address, IPv6Address]
+ :type addr_incr: int
+ :type spi_d: dict
+ :type existing_tunnels: int
+ :type udp_encap: bool
+ :type anti_replay: bool
+ :returns: Generated ckeys and ikeys.
+ :rtype: List[bytes], List[bytes]
+ """
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
+ if not existing_tunnels:
+ loop_sw_if_idx = IPsecUtil._ipsec_create_loopback_dut1_papi(
+ nodes, tun_ips, if1_key, if2_key
+ )
+ else:
+ loop_sw_if_idx = InterfaceUtil.vpp_get_interface_sw_index(
+ nodes["DUT1"], "loop0"
+ )
+ with PapiSocketExecutor(nodes["DUT1"], is_async=True) as papi_exec:
+ # Configure IP addresses on loop0 interface
+ cmd = "sw_interface_add_del_address"
+ args = dict(
+ sw_if_index=loop_sw_if_idx,
+ is_add=True,
+ del_all=False,
+ prefix=None,
+ )
+ for i in range(existing_tunnels, n_tunnels):
+ args["prefix"] = IPUtil.create_prefix_object(
+ tun_ips["ip1"] + i * addr_incr,
+ 128 if tun_ips["ip1"].version == 6 else 32,
+ )
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ # Configure IPIP tunnel interfaces
+ cmd = "ipip_add_tunnel"
+ ipip_tunnel = dict(
+ instance=Constants.BITWISE_NON_ZERO,
+ src=None,
+ dst=None,
+ table_id=0,
+ flags=int(
+ TunnelEncpaDecapFlags.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
+ ),
+ mode=int(TunnelMode.TUNNEL_API_MODE_P2P),
+ dscp=int(IpDscp.IP_API_DSCP_CS0),
+ )
+ args = dict(tunnel=ipip_tunnel)
+ ipip_tunnels = [None] * existing_tunnels
+ for i in range(existing_tunnels, n_tunnels):
+ ipip_tunnel["src"] = IPAddress.create_ip_address_object(
+ tun_ips["ip1"] + i * addr_incr
+ )
+ ipip_tunnel["dst"] = IPAddress.create_ip_address_object(
+ tun_ips["ip2"]
+ )
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = (
+ "Failed to add IPIP tunnel interfaces on host"
+ f" {nodes['DUT1']['host']}"
+ )
+ ipip_tunnels.extend(
+ [
+ reply["sw_if_index"]
+ for reply in papi_exec.get_replies(err_msg)
+ if "sw_if_index" in reply
+ ]
+ )
+ # Configure IPSec SAD entries
+ ckeys = [bytes()] * existing_tunnels
+ ikeys = [bytes()] * existing_tunnels
+ cmd = "ipsec_sad_entry_add_v2"
+ c_key = dict(length=0, data=None)
+ i_key = dict(length=0, data=None)
+ common_flags = IPsecSadFlags.IPSEC_API_SAD_FLAG_NONE
+ if udp_encap:
+ common_flags |= IPsecSadFlags.IPSEC_API_SAD_FLAG_UDP_ENCAP
+ if anti_replay:
+ common_flags |= IPsecSadFlags.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
+ sad_entry = dict(
+ sad_id=None,
+ spi=None,
+ protocol=IPsecProto.ESP,
+ crypto_algorithm=crypto_alg.alg_int_repr,
+ crypto_key=c_key,
+ integrity_algorithm=integ_alg.alg_int_repr,
+ integrity_key=i_key,
+ flags=common_flags,
+ tunnel=dict(
+ src=0,
+ dst=0,
+ table_id=0,
+ encap_decap_flags=int(
+ TunnelEncpaDecapFlags.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
+ ),
+ dscp=int(IpDscp.IP_API_DSCP_CS0),
+ ),
+ salt=0,
+ udp_src_port=IPSEC_UDP_PORT_DEFAULT,
+ udp_dst_port=IPSEC_UDP_PORT_DEFAULT,
+ anti_replay_window_size=IPSEC_REPLAY_WINDOW_DEFAULT,
+ )
+ args = dict(entry=sad_entry)
+ for i in range(existing_tunnels, n_tunnels):
+ ckeys.append(gen_key(crypto_alg.key_len))
+ ikeys.append(gen_key(integ_alg.key_len))
+ # SAD entry for outband / tx path
+ sad_entry["sad_id"] = i
+ sad_entry["spi"] = spi_d["spi_1"] + i
+
+ sad_entry["crypto_key"]["length"] = len(ckeys[i])
+ sad_entry["crypto_key"]["data"] = ckeys[i]
+ if integ_alg:
+ sad_entry["integrity_key"]["length"] = len(ikeys[i])
+ sad_entry["integrity_key"]["data"] = ikeys[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ sad_entry["flags"] |= IPsecSadFlags.IPSEC_API_SAD_FLAG_IS_INBOUND
+ for i in range(existing_tunnels, n_tunnels):
+ # SAD entry for inband / rx path
+ sad_entry["sad_id"] = 100000 + i
+ sad_entry["spi"] = spi_d["spi_2"] + i
+
+ sad_entry["crypto_key"]["length"] = len(ckeys[i])
+ sad_entry["crypto_key"]["data"] = ckeys[i]
+ if integ_alg:
+ sad_entry["integrity_key"]["length"] = len(ikeys[i])
+ sad_entry["integrity_key"]["data"] = ikeys[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = (
+ "Failed to add IPsec SAD entries on host"
+ f" {nodes['DUT1']['host']}"
+ )
+ papi_exec.get_replies(err_msg)
+ # Add protection for tunnels with IPSEC
+ cmd = "ipsec_tunnel_protect_update"
+ n_hop = dict(
+ address=0,
+ via_label=MPLS_LABEL_INVALID,
+ obj_id=Constants.BITWISE_NON_ZERO,
+ )
+ ipsec_tunnel_protect = dict(
+ sw_if_index=None, nh=n_hop, sa_out=None, n_sa_in=1, sa_in=None
+ )
+ args = dict(tunnel=ipsec_tunnel_protect)
+ for i in range(existing_tunnels, n_tunnels):
+ args["tunnel"]["sw_if_index"] = ipip_tunnels[i]
+ args["tunnel"]["sa_out"] = i
+ args["tunnel"]["sa_in"] = [100000 + i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = (
+ "Failed to add protection for tunnels with IPSEC"
+ f" on host {nodes['DUT1']['host']}"
+ )
+ papi_exec.get_replies(err_msg)
+
+ # Configure unnumbered interfaces
+ cmd = "sw_interface_set_unnumbered"
+ args = dict(
+ is_add=True,
+ sw_if_index=InterfaceUtil.get_interface_index(
+ nodes["DUT1"], if1_key
+ ),
+ unnumbered_sw_if_index=0,
+ )
+ for i in range(existing_tunnels, n_tunnels):
+ args["unnumbered_sw_if_index"] = ipip_tunnels[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ # Set interfaces up
+ cmd = "sw_interface_set_flags"
+ args = dict(
+ sw_if_index=0,
+ flags=InterfaceStatusFlags.IF_STATUS_API_FLAG_ADMIN_UP.value,
+ )
+ for i in range(existing_tunnels, n_tunnels):
+ args["sw_if_index"] = ipip_tunnels[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ # Configure IP routes
+ cmd = "ip_route_add_del"
+ args = dict(is_add=1, is_multipath=0, route=None)
+ for i in range(existing_tunnels, n_tunnels):
+ args["route"] = IPUtil.compose_vpp_route_structure(
+ nodes["DUT1"],
+ (raddr_ip2 + i).compressed,
+ prefix_len=128 if raddr_ip2.version == 6 else 32,
+ interface=ipip_tunnels[i],
+ )
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = f"Failed to add IP routes on host {nodes['DUT1']['host']}"
+ papi_exec.get_replies(err_msg)
+
+ return ckeys, ikeys
+
+ @staticmethod
+ def _ipsec_create_tunnel_interfaces_dut2_papi(
+ nodes: dict,
+ tun_ips: dict,
+ if2_key: str,
+ n_tunnels: int,
+ crypto_alg: CryptoAlg.InputType,
+ ckeys: Sequence[bytes],
+ integ_alg: IntegAlg.InputType,
+ ikeys: Sequence[bytes],
+ raddr_ip1: Union[IPv4Address, IPv6Address],
+ addr_incr: int,
+ spi_d: dict,
+ existing_tunnels: int = 0,
+ udp_encap: bool = False,
+ anti_replay: bool = False,
+ ) -> None:
+ """Create multiple IPsec tunnel interfaces on DUT2 node using PAPI.
+
+ This method accesses keys generated by DUT1 method
+ and does not return anything.
+
+ :param nodes: VPP nodes to create tunnel interfaces.
+ :param tun_ips: Dictionary with VPP node 1 ipsec tunnel interface
+ IPv4/IPv6 address (ip1) and VPP node 2 ipsec tunnel interface
+ IPv4/IPv6 address (ip2).
+ :param if2_key: VPP node 2 / TG node (in case of 2-node topology)
+ interface key from topology file.
+ :param n_tunnels: Number of tunnel interfaces to be there at the end.
+ :param crypto_alg: The encryption algorithm name.
+ :param ckeys: List of encryption keys.
+ :param integ_alg: The integrity algorithm name.
+ :param ikeys: List of integrity keys.
+ :param raddr_ip1: Policy selector remote IPv4/IPv6 start address for the
+ first tunnel in direction node1->node2.
+ :param spi_d: Dictionary with SPIs for VPP node 1 and VPP node 2.
+ :param addr_incr: IP / IPv6 address incremental step.
+ :param existing_tunnels: Number of tunnel interfaces before creation.
+ Useful mainly for reconf tests. Default 0.
+ :param udp_encap: Whether to apply UDP_ENCAP flag.
+ :param anti_replay: Whether to apply USE_ANTI_REPLAY flag.
+ :type nodes: dict
+ :type tun_ips: dict
+ :type if2_key: str
+ :type n_tunnels: int
+ :type crypto_alg: CryptoAlg.InputType
+ :type ckeys: Sequence[bytes]
+ :type integ_alg: IntegAlg.InputType
+ :type ikeys: Sequence[bytes]
+ :type raddr_ip1: Union[IPv4Address, IPv6Address]
+ :type addr_incr: int
+ :type spi_d: dict
+ :type existing_tunnels: int
+ :type udp_encap: bool
+ :type anti_replay: bool
+ """
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
+ with PapiSocketExecutor(nodes["DUT2"], is_async=True) as papi_exec:
+ if not existing_tunnels:
+ # Set IP address on VPP node 2 interface
+ cmd = "sw_interface_add_del_address"
+ args = dict(
+ sw_if_index=InterfaceUtil.get_interface_index(
+ nodes["DUT2"], if2_key
+ ),
+ is_add=True,
+ del_all=False,
+ prefix=IPUtil.create_prefix_object(
+ tun_ips["ip2"],
+ 96 if tun_ips["ip2"].version == 6 else 24,
+ ),
+ )
+ err_msg = (
+ f"Failed to set IP address on interface {if2_key}"
+ f" on host {nodes['DUT2']['host']}"
+ )
+ papi_exec.add(cmd, **args).get_replies(err_msg)
+ # Configure IPIP tunnel interfaces
+ cmd = "ipip_add_tunnel"
+ ipip_tunnel = dict(
+ instance=Constants.BITWISE_NON_ZERO,
+ src=None,
+ dst=None,
+ table_id=0,
+ flags=int(
+ TunnelEncpaDecapFlags.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
+ ),
+ mode=int(TunnelMode.TUNNEL_API_MODE_P2P),
+ dscp=int(IpDscp.IP_API_DSCP_CS0),
+ )
+ args = dict(tunnel=ipip_tunnel)
+ ipip_tunnels = [None] * existing_tunnels
+ for i in range(existing_tunnels, n_tunnels):
+ ipip_tunnel["src"] = IPAddress.create_ip_address_object(
+ tun_ips["ip2"]
+ )
+ ipip_tunnel["dst"] = IPAddress.create_ip_address_object(
+ tun_ips["ip1"] + i * addr_incr
+ )
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = (
+ "Failed to add IPIP tunnel interfaces on host"
+ f" {nodes['DUT2']['host']}"
+ )
+ ipip_tunnels.extend(
+ [
+ reply["sw_if_index"]
+ for reply in papi_exec.get_replies(err_msg)
+ if "sw_if_index" in reply
+ ]
+ )
+ # Configure IPSec SAD entries
+ cmd = "ipsec_sad_entry_add_v2"
+ c_key = dict(length=0, data=None)
+ i_key = dict(length=0, data=None)
+ common_flags = IPsecSadFlags.IPSEC_API_SAD_FLAG_NONE
+ if udp_encap:
+ common_flags |= IPsecSadFlags.IPSEC_API_SAD_FLAG_UDP_ENCAP
+ if anti_replay:
+ common_flags |= IPsecSadFlags.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
+ sad_entry = dict(
+ sad_id=None,
+ spi=None,
+ protocol=IPsecProto.ESP,
+ crypto_algorithm=crypto_alg.alg_int_repr,
+ crypto_key=c_key,
+ integrity_algorithm=integ_alg.alg_int_repr,
+ integrity_key=i_key,
+ flags=common_flags,
+ tunnel=dict(
+ src=0,
+ dst=0,
+ table_id=0,
+ encap_decap_flags=int(
+ TunnelEncpaDecapFlags.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
+ ),
+ dscp=int(IpDscp.IP_API_DSCP_CS0),
+ ),
+ salt=0,
+ udp_src_port=IPSEC_UDP_PORT_DEFAULT,
+ udp_dst_port=IPSEC_UDP_PORT_DEFAULT,
+ anti_replay_window_size=IPSEC_REPLAY_WINDOW_DEFAULT,
+ )
+ args = dict(entry=sad_entry)
+ for i in range(existing_tunnels, n_tunnels):
+ ckeys.append(gen_key(crypto_alg.key_len))
+ ikeys.append(gen_key(integ_alg.key_len))
+ # SAD entry for outband / tx path
+ sad_entry["sad_id"] = 100000 + i
+ sad_entry["spi"] = spi_d["spi_2"] + i
+
+ sad_entry["crypto_key"]["length"] = len(ckeys[i])
+ sad_entry["crypto_key"]["data"] = ckeys[i]
+ if integ_alg:
+ sad_entry["integrity_key"]["length"] = len(ikeys[i])
+ sad_entry["integrity_key"]["data"] = ikeys[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ sad_entry["flags"] |= IPsecSadFlags.IPSEC_API_SAD_FLAG_IS_INBOUND
+ for i in range(existing_tunnels, n_tunnels):
+ # SAD entry for inband / rx path
+ sad_entry["sad_id"] = i
+ sad_entry["spi"] = spi_d["spi_1"] + i
+
+ sad_entry["crypto_key"]["length"] = len(ckeys[i])
+ sad_entry["crypto_key"]["data"] = ckeys[i]
+ if integ_alg:
+ sad_entry["integrity_key"]["length"] = len(ikeys[i])
+ sad_entry["integrity_key"]["data"] = ikeys[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = (
+ f"Failed to add IPsec SAD entries on host"
+ f" {nodes['DUT2']['host']}"
+ )
+ papi_exec.get_replies(err_msg)
+ # Add protection for tunnels with IPSEC
+ cmd = "ipsec_tunnel_protect_update"
+ n_hop = dict(
+ address=0,
+ via_label=MPLS_LABEL_INVALID,
+ obj_id=Constants.BITWISE_NON_ZERO,
+ )
+ ipsec_tunnel_protect = dict(
+ sw_if_index=None, nh=n_hop, sa_out=None, n_sa_in=1, sa_in=None
+ )
+ args = dict(tunnel=ipsec_tunnel_protect)
+ for i in range(existing_tunnels, n_tunnels):
+ args["tunnel"]["sw_if_index"] = ipip_tunnels[i]
+ args["tunnel"]["sa_out"] = 100000 + i
+ args["tunnel"]["sa_in"] = [i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = (
+ "Failed to add protection for tunnels with IPSEC"
+ f" on host {nodes['DUT2']['host']}"
+ )
+ papi_exec.get_replies(err_msg)
+
+ if not existing_tunnels:
+ # Configure IP route
+ cmd = "ip_route_add_del"
+ route = IPUtil.compose_vpp_route_structure(
+ nodes["DUT2"],
+ tun_ips["ip1"].compressed,
+ prefix_len=32 if tun_ips["ip1"].version == 6 else 8,
+ interface=if2_key,
+ gateway=(tun_ips["ip2"] - 1).compressed,
+ )
+ args = dict(is_add=1, is_multipath=0, route=route)
+ papi_exec.add(cmd, **args)
+ # Configure unnumbered interfaces
+ cmd = "sw_interface_set_unnumbered"
+ args = dict(
+ is_add=True,
+ sw_if_index=InterfaceUtil.get_interface_index(
+ nodes["DUT2"], if2_key
+ ),
+ unnumbered_sw_if_index=0,
+ )
+ for i in range(existing_tunnels, n_tunnels):
+ args["unnumbered_sw_if_index"] = ipip_tunnels[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ # Set interfaces up
+ cmd = "sw_interface_set_flags"
+ args = dict(
+ sw_if_index=0,
+ flags=InterfaceStatusFlags.IF_STATUS_API_FLAG_ADMIN_UP.value,
+ )
+ for i in range(existing_tunnels, n_tunnels):
+ args["sw_if_index"] = ipip_tunnels[i]
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ # Configure IP routes
+ cmd = "ip_route_add_del"
+ args = dict(is_add=1, is_multipath=0, route=None)
+ for i in range(existing_tunnels, n_tunnels):
+ args["route"] = IPUtil.compose_vpp_route_structure(
+ nodes["DUT1"],
+ (raddr_ip1 + i).compressed,
+ prefix_len=128 if raddr_ip1.version == 6 else 32,
+ interface=ipip_tunnels[i],
+ )
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = f"Failed to add IP routes on host {nodes['DUT2']['host']}"
+ papi_exec.get_replies(err_msg)
+
+ @staticmethod
+ def vpp_ipsec_create_tunnel_interfaces(
+ nodes: dict,
+ tun_if1_ip_addr: str,
+ tun_if2_ip_addr: str,
+ if1_key: str,
+ if2_key: str,
+ n_tunnels: int,
+ crypto_alg: CryptoAlg.InputType,
+ integ_alg: IntegAlg.InputType,
+ raddr_ip1: str,
+ raddr_ip2: str,
+ raddr_range: int,
+ existing_tunnels: int = 0,
+ udp_encap: bool = False,
+ anti_replay: bool = False,
+ return_keys: bool = False,
+ ) -> Optional[Tuple[List[bytes], List[bytes], int, int]]: