+ err_msg = f"Failed to add IP neighbor on interface {if1_key}"
+ papi_exec.add(cmd2, **args2).get_reply(err_msg)
+
+ return loop_sw_if_idx
+
+ @staticmethod
+ def _ipsec_create_tunnel_interfaces_dut1_papi(
+ nodes: dict,
+ tun_ips: dict,
+ if1_key: str,
+ if2_key: str,
+ n_tunnels: int,
+ crypto_alg: CryptoAlg.InputType,
+ integ_alg: IntegAlg.InputType,
+ raddr_ip2: Union[IPv4Address, IPv6Address],
+ addr_incr: int,
+ spi_d: dict,
+ existing_tunnels: int = 0,
+ ) -> Tuple[List[bytes], List[bytes]]:
+ """Create multiple IPsec tunnel interfaces on DUT1 node using PAPI.
+
+ Generate random keys and return them (so DUT2 or TG can decrypt).
+
+ :param nodes: VPP nodes to create tunnel interfaces.
+ :param tun_ips: Dictionary with VPP node 1 ipsec tunnel interface
+ IPv4/IPv6 address (ip1) and VPP node 2 ipsec tunnel interface
+ IPv4/IPv6 address (ip2).
+ :param if1_key: VPP node 1 interface key from topology file.
+ :param if2_key: VPP node 2 / TG node (in case of 2-node topology)
+ interface key from topology file.
+ :param n_tunnels: Number of tunnel interfaces to be there at the end.
+ :param crypto_alg: The encryption algorithm name.
+ :param integ_alg: The integrity algorithm name.
+ :param raddr_ip2: Policy selector remote IPv4/IPv6 start address for the
+ first tunnel in direction node2->node1.
+ :param spi_d: Dictionary with SPIs for VPP node 1 and VPP node 2.
+ :param addr_incr: IP / IPv6 address incremental step.
+ :param existing_tunnels: Number of tunnel interfaces before creation.
+ Useful mainly for reconf tests. Default 0.
+ :type nodes: dict
+ :type tun_ips: dict
+ :type if1_key: str
+ :type if2_key: str
+ :type n_tunnels: int
+ :type crypto_alg: CryptoAlg.InputType
+ :type integ_alg: IntegAlg.InputType
+ :type raddr_ip2: Union[IPv4Address, IPv6Address]
+ :type addr_incr: int
+ :type spi_d: dict
+ :type existing_tunnels: int
+ :returns: Generated ckeys and ikeys.
+ :rtype: List[bytes], List[bytes]
+ """
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
+ if not existing_tunnels:
+ loop_sw_if_idx = IPsecUtil._ipsec_create_loopback_dut1_papi(
+ nodes, tun_ips, if1_key, if2_key
+ )
+ else:
+ loop_sw_if_idx = InterfaceUtil.vpp_get_interface_sw_index(
+ nodes["DUT1"], "loop0"
+ )
+ with PapiSocketExecutor(nodes["DUT1"], is_async=True) as papi_exec:
+ # Configure IP addresses on loop0 interface
+ cmd = "sw_interface_add_del_address"
+ args = dict(
+ sw_if_index=loop_sw_if_idx,
+ is_add=True,
+ del_all=False,
+ prefix=None,
+ )
+ for i in range(existing_tunnels, n_tunnels):
+ args["prefix"] = IPUtil.create_prefix_object(
+ tun_ips["ip1"] + i * addr_incr,
+ 128 if tun_ips["ip1"].version == 6 else 32,
+ )
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ # Configure IPIP tunnel interfaces
+ cmd = "ipip_add_tunnel"
+ ipip_tunnel = dict(
+ instance=Constants.BITWISE_NON_ZERO,
+ src=None,
+ dst=None,
+ table_id=0,
+ flags=int(
+ TunnelEncpaDecapFlags.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
+ ),
+ mode=int(TunnelMode.TUNNEL_API_MODE_P2P),
+ dscp=int(IpDscp.IP_API_DSCP_CS0),
+ )
+ args = dict(tunnel=ipip_tunnel)
+ ipip_tunnels = [None] * existing_tunnels
+ for i in range(existing_tunnels, n_tunnels):
+ ipip_tunnel["src"] = IPAddress.create_ip_address_object(
+ tun_ips["ip1"] + i * addr_incr
+ )
+ ipip_tunnel["dst"] = IPAddress.create_ip_address_object(
+ tun_ips["ip2"]
+ )
+ papi_exec.add(
+ cmd, history=bool(not 1 < i < n_tunnels - 2), **args
+ )
+ err_msg = (
+ "Failed to add IPIP tunnel interfaces on host"
+ f" {nodes['DUT1']['host']}"
+ )
+ ipip_tunnels.extend(
+ [
+ reply["sw_if_index"]
+ for reply in papi_exec.get_replies(err_msg)
+ if "sw_if_index" in reply
+ ]
+ )
+ # Configure IPSec SAD entries
+ ckeys = [bytes()] * existing_tunnels
+ ikeys = [bytes()] * existing_tunnels
+ cmd = "ipsec_sad_entry_add_v2"
+ c_key = dict(length=0, data=None)
+ i_key = dict(length=0, data=None)
+ common_flags = IPsecSadFlags.IPSEC_API_SAD_FLAG_NONE
+ sad_entry = dict(
+ sad_id=None,
+ spi=None,
+ protocol=IPsecProto.ESP,
+ crypto_algorithm=crypto_alg.alg_int_repr,
+ crypto_key=c_key,
+ integrity_algorithm=integ_alg.alg_int_repr,
+ integrity_key=i_key,
+ flags=common_flags,
+ tunnel=dict(
+ src=0,
+ dst=0,
+ table_id=0,
+ encap_decap_flags=int(
+ TunnelEncpaDecapFlags.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
+ ),
+ dscp=int(IpDscp.IP_API_DSCP_CS0),
+ ),
+ salt=0,
+ udp_src_port=IPSEC_UDP_PORT_DEFAULT,
+ udp_dst_port=IPSEC_UDP_PORT_DEFAULT,
+ anti_replay_window_size=IPSEC_REPLAY_WINDOW_DEFAULT,
+ )
+ args = dict(entry=sad_entry)
+ for i in range(existing_tunnels, n_tunnels):
+ ckeys.append(gen_key(crypto_alg.key_len))
+ ikeys.append(gen_key(integ_alg.key_len))
+ # SAD entry for outband / tx path
+ sad_entry["sad_id"] = i
+ sad_entry["spi"] = spi_d["spi_1"] + i
+
+ sad_entry["crypto_key"]["length"] = len(ckeys[i])
+ sad_entry["crypto_key"]["data"] = ckeys[i]