+ :param client: VPP client instance in connected state.
+ :type client: vpp_papi.VPPApiClient
+ :raises RuntimeError: If related key already has a cached client.
+ """
+ key = self.key_for_self()
+ cache = self.__class__.conn_cache
+ if key in cache:
+ raise RuntimeError(f"Caching client with existing key: {key}")
+ cache[key] = client
+
+ def get_connected_client(self, check_connected=True):
+ """Return None or cached connected client.
+
+ If check_connected, RuntimeError is raised when the client is
+ not in cache. None is returned if client is not in cache
+ (and the check is disabled).
+ Successful retrieval from cache is logged only when check_connected.
+
+ This hides details of what the node key is.
+
+ :param check_connected: Whether cache miss raises (and success logs).
+ :type check_connected: bool
+ :returns: Connected client instance, or None if uncached and no check.
+ :rtype: Optional[vpp_papi.VPPApiClient]
+ :raises RuntimeError: If cache miss and check enabled.
+ """
+ key = self.key_for_self()
+ ret = self.__class__.conn_cache.get(key, None)
+ if check_connected:
+ if ret is None:
+ raise RuntimeError(f"Client not cached for key: {key}")
+ # When reading logs, it is good to see which VPP is accessed.
+ logger.debug(f"Activated cached PAPI client for key: {key}")
+ return ret
+
+ def __enter__(self):
+ """Create a tunnel, connect VPP instance.
+
+ If the connected client is in cache, return it.
+ Only if not, create a new (or reuse a disconnected) client instance.
+
+ Only at this point a local socket names are created
+ in a temporary directory, as CSIT can connect to multiple VPPs.
+
+ The following attributes are added to the client instance
+ to simplify caching and cleanup:
+ csit_temp_dir
+ - Temporary socket files are created here.
+ csit_control_socket
+ - This socket controls the local ssh process doing the forwarding.
+ csit_local_vpp_socket
+ - This is the forwarded socket to talk with remote VPP.
+ csit_deque
+ - Queue for responses.
+
+ The attribute names do not start with underscore,
+ so pylint does not complain about accessing private attribute.
+ The attribute names start with csit_ to avoid naming conflicts
+ with "real" attributes from VPP Python code.
+
+ :returns: self
+ :rtype: PapiSocketExecutor
+ """
+ # Do we have the connected instance in the cache?
+ vpp_instance = self.get_connected_client(check_connected=False)
+ if vpp_instance is not None:
+ return self
+ # No luck, create and connect a new instance.
+ time_enter = time.monotonic()
+ node = self._node
+ # Parsing takes longer than connecting, prepare instance before tunnel.
+ vpp_instance = self.ensure_vpp_instance()
+ # Store into cache as soon as possible.
+ # If connection fails, it is better to attempt disconnect anyway.
+ self.set_connected_client(vpp_instance)
+ # Set additional attributes.
+ vpp_instance.csit_temp_dir = tempfile.TemporaryDirectory(dir="/tmp")
+ temp_path = vpp_instance.csit_temp_dir.name
+ api_socket = temp_path + "/vpp-api.sock"
+ vpp_instance.csit_local_vpp_socket = api_socket
+ ssh_socket = temp_path + "/ssh.sock"
+ vpp_instance.csit_control_socket = ssh_socket
+ # Cleanup possibilities.
+ ret_code, _ = run(["ls", ssh_socket], check=False)
+ if ret_code != 2:
+ # This branch never seems to be hit in CI,
+ # but may be useful when testing manually.
+ run(
+ ["ssh", "-S", ssh_socket, "-O", "exit", "0.0.0.0"],
+ check=False,
+ log=True,
+ )
+ # TODO: Is any sleep necessary? How to prove if not?
+ run(["sleep", "0.1"])
+ run(["rm", "-vrf", ssh_socket])
+ # Even if ssh can perhaps reuse this file,
+ # we need to remove it for readiness detection to work correctly.
+ run(["rm", "-rvf", api_socket])
+ # We use sleep command. The ssh command will exit in 30 second,
+ # unless a local socket connection is established,
+ # in which case the ssh command will exit only when
+ # the ssh connection is closed again (via control socket).
+ # The log level is to suppress "Warning: Permanently added" messages.
+ ssh_cmd = [
+ "ssh",
+ "-S",
+ ssh_socket,
+ "-M",
+ "-L",
+ f"{api_socket}:{self._remote_vpp_socket}",
+ "-p",
+ str(node["port"]),
+ "-o",
+ "LogLevel=ERROR",
+ "-o",
+ "UserKnownHostsFile=/dev/null",
+ "-o",
+ "StrictHostKeyChecking=no",
+ "-o",
+ "ExitOnForwardFailure=yes",
+ f"{node['username']}@{node['host']}",
+ "sleep",
+ "30",
+ ]
+ priv_key = node.get("priv_key")
+ if priv_key:
+ # This is tricky. We need a file to pass the value to ssh command.
+ # And we need ssh command, because paramiko does not support sockets
+ # (neither ssh_socket, nor _remote_vpp_socket).
+ key_file = tempfile.NamedTemporaryFile()
+ key_file.write(priv_key)
+ # Make sure the content is written, but do not close yet.
+ key_file.flush()
+ ssh_cmd[1:1] = ["-i", key_file.name]
+ password = node.get("password")
+ if password:
+ # Prepend sshpass command to set password.
+ ssh_cmd[:0] = ["sshpass", "-p", password]
+ time_stop = time.monotonic() + 10.0
+ # subprocess.Popen seems to be the best way to run commands
+ # on background. Other ways (shell=True with "&" and ssh with -f)
+ # seem to be too dependent on shell behavior.
+ # In particular, -f does NOT return values for run().
+ subprocess.Popen(ssh_cmd)
+ # Check socket presence on local side.
+ while time.monotonic() < time_stop:
+ # It can take a moment for ssh to create the socket file.
+ ret_code, _ = run(["ls", "-l", api_socket], check=False)
+ if not ret_code:
+ break
+ time.sleep(0.01)
+ else:
+ raise RuntimeError("Local side socket has not appeared.")
+ if priv_key:
+ # Socket up means the key has been read. Delete file by closing it.
+ key_file.close()
+ # Everything is ready, set the local socket address and connect.
+ vpp_instance.transport.server_address = api_socket
+ # It seems we can get read error even if every preceding check passed.
+ # Single retry seems to help. TODO: Confirm this is still needed.
+ for _ in range(2):
+ try:
+ vpp_instance.connect("csit_socket", do_async=True)
+ except (IOError, struct.error) as err:
+ logger.warn(f"Got initial connect error {err!r}")
+ vpp_instance.disconnect()
+ else:
+ break
+ else:
+ raise RuntimeError("Failed to connect to VPP over a socket.")
+ # Only after rls2302 all relevant VPP builds should have do_async.
+ if hasattr(vpp_instance.transport, "do_async"):
+ deq = deque()
+ vpp_instance.csit_deque = deq
+ vpp_instance.register_event_callback(lambda x, y: deq.append(y))
+ else:
+ vpp_instance.csit_deque = None
+ duration_conn = time.monotonic() - time_enter
+ logger.trace(f"Establishing socket connection took {duration_conn}s.")
+ return self