"""Python API executor library.
"""
-import binascii
+import copy
import glob
import json
import shutil
+import struct # vpp-papi can raise struct.error
import subprocess
import sys
import tempfile
from resources.libraries.python.Constants import Constants
from resources.libraries.python.LocalExecution import run
from resources.libraries.python.FilteredLogger import FilteredLogger
-from resources.libraries.python.PythonThree import raise_from
from resources.libraries.python.PapiHistory import PapiHistory
from resources.libraries.python.ssh import (
SSH, SSHTimeout, exec_cmd_no_error, scp_node)
+from resources.libraries.python.topology import Topology, SocketType
+from resources.libraries.python.VppApiCrc import VppApiCrcChecker
-__all__ = ["PapiExecutor", "PapiSocketExecutor"]
+__all__ = [u"PapiExecutor", u"PapiSocketExecutor"]
def dictize(obj):
from tuple, including its read-only __getitem__ attribute,
so we cannot monkey-patch it.
- TODO: Create a proxy for namedtuple to allow that.
+ TODO: Create a proxy for named tuple to allow that.
:param obj: Arbitrary object to dictize.
:type obj: object
:returns: Dictized object.
:rtype: same as obj type or collections.OrderedDict
"""
- if not hasattr(obj, "_asdict"):
+ if not hasattr(obj, u"_asdict"):
return obj
ret = obj._asdict()
old_get = ret.__getitem__
ret.__getitem__ = new_get
return ret
-class PapiSocketExecutor(object):
+
+class PapiSocketExecutor:
"""Methods for executing VPP Python API commands on forwarded socket.
The current implementation connects for the duration of resource manager.
Note: Use only with "with" statement, e.g.:
+ cmd = 'show_version'
with PapiSocketExecutor(node) as papi_exec:
- reply = papi_exec.add('show_version').get_reply(err_msg)
+ reply = papi_exec.add(cmd).get_reply(err_msg)
This class processes two classes of VPP PAPI methods:
1. Simple request / reply: method='request'.
a. One request with no arguments:
+ cmd = 'show_version'
with PapiSocketExecutor(node) as papi_exec:
- reply = papi_exec.add('show_version').get_reply(err_msg)
+ reply = papi_exec.add(cmd).get_reply(err_msg)
b. Three requests with arguments, the second and the third ones are the same
but with different arguments.
"""
# Class cache for reuse between instances.
- cached_vpp_instance = None
+ vpp_instance = None
+ """Takes long time to create, stores all PAPI functions and types."""
+ crc_checker = None
+ """Accesses .api.json files at creation, caching allows deleting them."""
- def __init__(self, node, remote_vpp_socket="/run/vpp-api.sock"):
+ def __init__(self, node, remote_vpp_socket=Constants.SOCKSVR_PATH):
"""Store the given arguments, declare managed variables.
:param node: Node to connect to and forward unix domain socket from.
self._temp_dir = None
self._ssh_control_socket = None
self._local_vpp_socket = None
+ self.initialize_vpp_instance()
- @property
- def vpp_instance(self):
- """Return VPP instance with bindings to all API calls.
-
- The returned instance is initialized for unix domain socket access,
- it has initialized all the bindings, but it is not connected
- (to local socket) yet.
+ def initialize_vpp_instance(self):
+ """Create VPP instance with bindings to API calls, store as class field.
- First invocation downloads .api.json files from self._node
- into a temporary directory.
+ No-op if the instance had been stored already.
- After first invocation, the result is cached, so other calls are quick.
- Class variable is used as the cache, but this property is defined as
- an instance method, so that _node (for api files) is known.
+ The instance is initialized for unix domain socket access,
+ it has initialized all the bindings, but it is not connected
+ (to a local socket) yet.
- :returns: Initialized but not connected VPP instance.
- :rtype: vpp_papi.VPPApiClient
+ This method downloads .api.json files from self._node
+ into a temporary directory, deletes them finally.
"""
- cls = self.__class__
- if cls.cached_vpp_instance is not None:
- return cls.cached_vpp_instance
- tmp_dir = tempfile.mkdtemp(dir="/tmp")
- package_path = "Not set yet."
+ if self.vpp_instance:
+ return
+ cls = self.__class__ # Shorthand for setting class fields.
+ package_path = None
+ tmp_dir = tempfile.mkdtemp(dir=u"/tmp")
try:
# Pack, copy and unpack Python part of VPP installation from _node.
# TODO: Use rsync or recursive version of ssh.scp_node instead?
node = self._node
- exec_cmd_no_error(node, ["rm", "-rf", "/tmp/papi.txz"])
+ exec_cmd_no_error(node, [u"rm", u"-rf", u"/tmp/papi.txz"])
# Papi python version depends on OS (and time).
# Python 2.7 or 3.4, site-packages or dist-packages.
- installed_papi_glob = "/usr/lib/python*/*-packages/vpp_papi"
+ installed_papi_glob = u"/usr/lib/python3*/*-packages/vpp_papi"
# We need to wrap this command in bash, in order to expand globs,
# and as ssh does join, the inner command has to be quoted.
- inner_cmd = " ".join([
- "tar", "cJf", "/tmp/papi.txz", "--exclude=*.pyc",
- installed_papi_glob, "/usr/share/vpp/api"])
- exec_cmd_no_error(node, ["bash", "-c", "'" + inner_cmd + "'"])
- scp_node(node, tmp_dir + "/papi.txz", "/tmp/papi.txz", get=True)
- run(["tar", "xf", tmp_dir + "/papi.txz", "-C", tmp_dir])
+ inner_cmd = u" ".join([
+ u"tar", u"cJf", u"/tmp/papi.txz", u"--exclude=*.pyc",
+ installed_papi_glob, u"/usr/share/vpp/api"
+ ])
+ exec_cmd_no_error(node, [u"bash", u"-c", u"'" + inner_cmd + u"'"])
+ scp_node(node, tmp_dir + u"/papi.txz", u"/tmp/papi.txz", get=True)
+ run([u"tar", u"xf", tmp_dir + u"/papi.txz", u"-C", tmp_dir])
+ api_json_directory = tmp_dir + u"/usr/share/vpp/api"
+ # Perform initial checks before .api.json files are gone,
+ # by creating the checker instance.
+ cls.crc_checker = VppApiCrcChecker(api_json_directory)
# When present locally, we finally can find the installation path.
package_path = glob.glob(tmp_dir + installed_papi_glob)[0]
# Package path has to be one level above the vpp_papi directory.
- package_path = package_path.rsplit('/', 1)[0]
+ package_path = package_path.rsplit(u"/", 1)[0]
sys.path.append(package_path)
+ # pylint: disable=import-outside-toplevel, import-error
from vpp_papi.vpp_papi import VPPApiClient as vpp_class
- vpp_class.apidir = tmp_dir + "/usr/share/vpp/api"
+ vpp_class.apidir = api_json_directory
# We need to create instance before removing from sys.path.
- cls.cached_vpp_instance = vpp_class(
- use_socket=True, server_address="TBD", async_thread=False,
- read_timeout=6, logger=FilteredLogger(logger, "INFO"))
+ cls.vpp_instance = vpp_class(
+ use_socket=True, server_address=u"TBD", async_thread=False,
+ read_timeout=14, logger=FilteredLogger(logger, u"INFO"))
# Cannot use loglevel parameter, robot.api.logger lacks support.
# TODO: Stop overriding read_timeout when VPP-1722 is fixed.
finally:
shutil.rmtree(tmp_dir)
if sys.path[-1] == package_path:
sys.path.pop()
- return cls.cached_vpp_instance
def __enter__(self):
"""Create a tunnel, connect VPP instance.
# Parsing takes longer than connecting, prepare instance before tunnel.
vpp_instance = self.vpp_instance
node = self._node
- self._temp_dir = tempfile.mkdtemp(dir="/tmp")
- self._local_vpp_socket = self._temp_dir + "/vpp-api.sock"
- self._ssh_control_socket = self._temp_dir + "/ssh.sock"
+ self._temp_dir = tempfile.mkdtemp(dir=u"/tmp")
+ self._local_vpp_socket = self._temp_dir + u"/vpp-api.sock"
+ self._ssh_control_socket = self._temp_dir + u"/ssh.sock"
ssh_socket = self._ssh_control_socket
# Cleanup possibilities.
- ret_code, _ = run(["ls", ssh_socket], check=False)
+ ret_code, _ = run([u"ls", ssh_socket], check=False)
if ret_code != 2:
# This branch never seems to be hit in CI,
# but may be useful when testing manually.
- run(["ssh", "-S", ssh_socket, "-O", "exit", "0.0.0.0"],
- check=False, log=True)
+ run(
+ [u"ssh", u"-S", ssh_socket, u"-O", u"exit", u"0.0.0.0"],
+ check=False, log=True
+ )
# TODO: Is any sleep necessary? How to prove if not?
- run(["sleep", "0.1"])
- run(["rm", "-vrf", ssh_socket])
+ run([u"sleep", u"0.1"])
+ run([u"rm", u"-vrf", ssh_socket])
# Even if ssh can perhaps reuse this file,
# we need to remove it for readiness detection to work correctly.
- run(["rm", "-rvf", self._local_vpp_socket])
+ run([u"rm", u"-rvf", self._local_vpp_socket])
# On VIRL, the ssh user is not added to "vpp" group,
# so we need to change remote socket file access rights.
exec_cmd_no_error(
- node, "chmod o+rwx " + self._remote_vpp_socket, sudo=True)
+ node, u"chmod o+rwx " + self._remote_vpp_socket, sudo=True
+ )
# We use sleep command. The ssh command will exit in 10 second,
# unless a local socket connection is established,
# in which case the ssh command will exit only when
# the ssh connection is closed again (via control socket).
# The log level is to supress "Warning: Permanently added" messages.
ssh_cmd = [
- "ssh", "-S", ssh_socket, "-M",
- "-o", "LogLevel=ERROR", "-o", "UserKnownHostsFile=/dev/null",
- "-o", "StrictHostKeyChecking=no", "-o", "ExitOnForwardFailure=yes",
- "-L", self._local_vpp_socket + ':' + self._remote_vpp_socket,
- "-p", str(node['port']), node['username'] + "@" + node['host'],
- "sleep", "10"]
- priv_key = node.get("priv_key")
+ u"ssh", u"-S", ssh_socket, u"-M",
+ u"-o", u"LogLevel=ERROR", u"-o", u"UserKnownHostsFile=/dev/null",
+ u"-o", u"StrictHostKeyChecking=no",
+ u"-o", u"ExitOnForwardFailure=yes",
+ u"-L", self._local_vpp_socket + u":" + self._remote_vpp_socket,
+ u"-p", str(node[u"port"]), node[u"username"] + u"@" + node[u"host"],
+ u"sleep", u"10"
+ ]
+ priv_key = node.get(u"priv_key")
if priv_key:
# This is tricky. We need a file to pass the value to ssh command.
- # And we need ssh command, because paramiko does not suport sockets
+ # And we need ssh command, because paramiko does not support sockets
# (neither ssh_socket, nor _remote_vpp_socket).
key_file = tempfile.NamedTemporaryFile()
key_file.write(priv_key)
# Make sure the content is written, but do not close yet.
key_file.flush()
- ssh_cmd[1:1] = ["-i", key_file.name]
- password = node.get("password")
+ ssh_cmd[1:1] = [u"-i", key_file.name]
+ password = node.get(u"password")
if password:
# Prepend sshpass command to set password.
- ssh_cmd[:0] = ["sshpass", "-p", password]
+ ssh_cmd[:0] = [u"sshpass", u"-p", password]
time_stop = time.time() + 10.0
# subprocess.Popen seems to be the best way to run commands
# on background. Other ways (shell=True with "&" and ssh with -f)
# Check socket presence on local side.
while time.time() < time_stop:
# It can take a moment for ssh to create the socket file.
- ret_code, _ = run(["ls", "-l", self._local_vpp_socket], check=False)
+ ret_code, _ = run(
+ [u"ls", u"-l", self._local_vpp_socket], check=False
+ )
if not ret_code:
break
time.sleep(0.1)
else:
- raise RuntimeError("Local side socket has not appeared.")
+ raise RuntimeError(u"Local side socket has not appeared.")
if priv_key:
# Socket up means the key has been read. Delete file by closing it.
key_file.close()
vpp_instance.transport.server_address = self._local_vpp_socket
# It seems we can get read error even if every preceding check passed.
# Single retry seems to help.
- for _ in xrange(2):
+ for _ in range(2):
try:
- vpp_instance.connect_sync("csit_socket")
- except IOError as err:
- logger.warn("Got initial connect error {err!r}".format(err=err))
+ vpp_instance.connect_sync(u"csit_socket")
+ except (IOError, struct.error) as err:
+ logger.warn(f"Got initial connect error {err!r}")
vpp_instance.disconnect()
else:
break
else:
- raise RuntimeError("Failed to connect to VPP over a socket.")
+ raise RuntimeError(u"Failed to connect to VPP over a socket.")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
Arguments related to possible exception are entirely ignored.
"""
self.vpp_instance.disconnect()
- run(["ssh", "-S", self._ssh_control_socket, "-O", "exit", "0.0.0.0"],
- check=False)
+ run([
+ u"ssh", u"-S", self._ssh_control_socket, u"-O", u"exit", u"0.0.0.0"
+ ], check=False)
shutil.rmtree(self._temp_dir)
- return
def add(self, csit_papi_command, history=True, **kwargs):
"""Add next command to internal command list; return self.
+ Unless disabled, new entry to papi history is also added at this point.
The argument name 'csit_papi_command' must be unique enough as it cannot
be repeated in kwargs.
- Unless disabled, new entry to papi history is also added at this point.
+ The kwargs dict is deep-copied, so it is safe to use the original
+ with partial modifications for subsequent commands.
+
+ Any pending conflicts from .api.json processing are raised.
+ Then the command name is checked for known CRCs.
+ Unsupported commands raise an exception, as CSIT change
+ should not start using messages without making sure which CRCs
+ are supported.
+ Each CRC issue is raised only once, so subsequent tests
+ can raise other issues.
:param csit_papi_command: VPP API command.
:param history: Enable/disable adding command to PAPI command history.
:type kwargs: dict
:returns: self, so that method chaining is possible.
:rtype: PapiSocketExecutor
+ :raises RuntimeError: If unverified or conflicting CRC is encountered.
"""
+ self.crc_checker.report_initial_conflicts()
if history:
PapiHistory.add_to_papi_history(
- self._node, csit_papi_command, **kwargs)
+ self._node, csit_papi_command, **kwargs
+ )
+ self.crc_checker.check_api_name(csit_papi_command)
self._api_command_list.append(
- dict(api_name=csit_papi_command, api_args=kwargs))
+ dict(
+ api_name=csit_papi_command,
+ api_args=copy.deepcopy(kwargs)
+ )
+ )
return self
def get_replies(self, err_msg="Failed to get replies."):
"""
return self._execute(err_msg=err_msg)
- def get_reply(self, err_msg="Failed to get reply."):
+ def get_reply(self, err_msg=u"Failed to get reply."):
"""Get reply from VPP Python API.
The reply is parsed into dict-like object,
"""
replies = self.get_replies(err_msg=err_msg)
if len(replies) != 1:
- raise RuntimeError("Expected single reply, got {replies!r}".format(
- replies=replies))
+ raise RuntimeError(f"Expected single reply, got {replies!r}")
return replies[0]
- def get_sw_if_index(self, err_msg="Failed to get reply."):
+ def get_sw_if_index(self, err_msg=u"Failed to get reply."):
"""Get sw_if_index from reply from VPP Python API.
Frequently, the caller is only interested in sw_if_index field
:raises AssertionError: If retval is nonzero, parsing or ssh error.
"""
reply = self.get_reply(err_msg=err_msg)
- logger.info("Getting index from {reply!r}".format(reply=reply))
- return reply["sw_if_index"]
+ logger.trace(f"Getting index from {reply!r}")
+ return reply[u"sw_if_index"]
def get_details(self, err_msg="Failed to get dump details."):
"""Get dump details from VPP Python API.
return self._execute(err_msg)
@staticmethod
- def run_cli_cmd(node, cmd, log=True):
+ def run_cli_cmd(
+ node, cli_cmd, log=True, remote_vpp_socket=Constants.SOCKSVR_PATH):
"""Run a CLI command as cli_inband, return the "reply" field of reply.
Optionally, log the field value.
:param node: Node to run command on.
- :param cmd: The CLI command to be run on the node.
+ :param cli_cmd: The CLI command to be run on the node.
+ :param remote_vpp_socket: Path to remote socket to tunnel to.
:param log: If True, the response is logged.
:type node: dict
- :type cmd: str
+ :type remote_vpp_socket: str
+ :type cli_cmd: str
:type log: bool
:returns: CLI output.
:rtype: str
"""
- cli = 'cli_inband'
- args = dict(cmd=cmd)
- err_msg = "Failed to run 'cli_inband {cmd}' PAPI command on host " \
- "{host}".format(host=node['host'], cmd=cmd)
- with PapiSocketExecutor(node) as papi_exec:
- reply = papi_exec.add(cli, **args).get_reply(err_msg)["reply"]
+ cmd = u"cli_inband"
+ args = dict(
+ cmd=cli_cmd
+ )
+ err_msg = f"Failed to run 'cli_inband {cli_cmd}' PAPI command " \
+ f"on host {node[u'host']}"
+
+ with PapiSocketExecutor(node, remote_vpp_socket) as papi_exec:
+ reply = papi_exec.add(cmd, **args).get_reply(err_msg)["reply"]
if log:
- logger.info("{cmd}:\n{reply}".format(cmd=cmd, reply=reply))
+ logger.info(
+ f"{cmd} ({node[u'host']} - {remote_vpp_socket}):\n"
+ f"{reply.strip()}"
+ )
return reply
+ @staticmethod
+ def run_cli_cmd_on_all_sockets(node, cli_cmd, log=True):
+ """Run a CLI command as cli_inband, on all sockets in topology file.
+
+ :param node: Node to run command on.
+ :param cli_cmd: The CLI command to be run on the node.
+ :param log: If True, the response is logged.
+ :type node: dict
+ :type cli_cmd: str
+ :type log: bool
+ """
+ sockets = Topology.get_node_sockets(node, socket_type=SocketType.PAPI)
+ if sockets:
+ for socket in sockets.values():
+ PapiSocketExecutor.run_cli_cmd(
+ node, cli_cmd, log=log, remote_vpp_socket=socket
+ )
+
@staticmethod
def dump_and_log(node, cmds):
"""Dump and log requested information, return None.
with PapiSocketExecutor(node) as papi_exec:
for cmd in cmds:
dump = papi_exec.add(cmd).get_details()
- logger.debug("{cmd}:\n{data}".format(
- cmd=cmd, data=pformat(dump)))
+ logger.debug(f"{cmd}:\n{pformat(dump)}")
- def _execute(self, err_msg="Undefined error message"):
+ def _execute(self, err_msg=u"Undefined error message", exp_rv=0):
"""Turn internal command list into data and execute; return replies.
This method also clears the internal command list.
self._api_command_list = list()
replies = list()
for command in local_list:
- api_name = command["api_name"]
+ api_name = command[u"api_name"]
papi_fn = getattr(vpp_instance.api, api_name)
try:
try:
- reply = papi_fn(**command["api_args"])
- except IOError as err:
- # Ocassionally an error happens, try reconnect.
- logger.warn("Reconnect after error: {err!r}".format(
- err=err))
+ reply = papi_fn(**command[u"api_args"])
+ except (IOError, struct.error) as err:
+ # Occasionally an error happens, try reconnect.
+ logger.warn(f"Reconnect after error: {err!r}")
self.vpp_instance.disconnect()
- # Testing showes immediate reconnect fails.
+ # Testing shows immediate reconnect fails.
time.sleep(1)
- self.vpp_instance.connect_sync("csit_socket")
- logger.trace("Reconnected.")
- reply = papi_fn(**command["api_args"])
- except (AttributeError, IOError) as err:
- raise_from(AssertionError(err_msg), err, level="INFO")
+ self.vpp_instance.connect_sync(u"csit_socket")
+ logger.trace(u"Reconnected.")
+ reply = papi_fn(**command[u"api_args"])
+ except (AttributeError, IOError, struct.error) as err:
+ raise AssertionError(err_msg) from err
# *_dump commands return list of objects, convert, ordinary reply.
if not isinstance(reply, list):
reply = [reply]
for item in reply:
+ self.crc_checker.check_api_name(item.__class__.__name__)
dict_item = dictize(item)
- if "retval" in dict_item.keys():
+ if u"retval" in dict_item.keys():
# *_details messages do not contain retval.
- retval = dict_item["retval"]
- if retval != 0:
+ retval = dict_item[u"retval"]
+ if retval != exp_rv:
# TODO: What exactly to log and raise here?
- err = AssertionError("Retval {rv!r}".format(rv=retval))
- # Lowering log level, some retval!=0 calls are expected.
- # TODO: Expose level argument so callers can decide?
- raise_from(AssertionError(err_msg), err, level="DEBUG")
+ raise AssertionError(
+ f"Retval {retval!r} does not match expected "
+ f"retval {exp_rv!r}"
+ )
replies.append(dict_item)
return replies
-class PapiExecutor(object):
+class PapiExecutor:
"""Contains methods for executing VPP Python API commands on DUTs.
TODO: Remove .add step, make get_stats accept paths directly.
:param node: Node to run command(s) on.
:type node: dict
"""
-
# Node to run command(s) on.
self._node = node
try:
self._ssh.connect(self._node)
except IOError:
- raise RuntimeError("Cannot open SSH connection to host {host} to "
- "execute PAPI command(s)".
- format(host=self._node["host"]))
+ raise RuntimeError(
+ f"Cannot open SSH connection to host {self._node[u'host']} "
+ f"to execute PAPI command(s)"
+ )
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._ssh.disconnect(self._node)
- def add(self, csit_papi_command="vpp-stats", history=True, **kwargs):
+ def add(self, csit_papi_command=u"vpp-stats", history=True, **kwargs):
"""Add next command to internal command list; return self.
The argument name 'csit_papi_command' must be unique enough as it cannot
be repeated in kwargs.
+ The kwargs dict is deep-copied, so it is safe to use the original
+ with partial modifications for subsequent commands.
:param csit_papi_command: VPP API command.
:param history: Enable/disable adding command to PAPI command history.
"""
if history:
PapiHistory.add_to_papi_history(
- self._node, csit_papi_command, **kwargs)
- self._api_command_list.append(dict(api_name=csit_papi_command,
- api_args=kwargs))
+ self._node, csit_papi_command, **kwargs
+ )
+ self._api_command_list.append(
+ dict(
+ api_name=csit_papi_command, api_args=copy.deepcopy(kwargs)
+ )
+ )
return self
- def get_stats(self, err_msg="Failed to get statistics.", timeout=120):
+ def get_stats(
+ self, err_msg=u"Failed to get statistics.", timeout=120,
+ socket=Constants.SOCKSTAT_PATH):
"""Get VPP Stats from VPP Python API.
:param err_msg: The message used if the PAPI command(s) execution fails.
:param timeout: Timeout in seconds.
+ :param socket: Path to Stats socket to tunnel to.
:type err_msg: str
:type timeout: int
+ :type socket: str
:returns: Requested VPP statistics.
:rtype: list of dict
"""
-
- paths = [cmd['api_args']['path'] for cmd in self._api_command_list]
+ paths = [cmd[u"api_args"][u"path"] for cmd in self._api_command_list]
self._api_command_list = list()
stdout = self._execute_papi(
- paths, method='stats', err_msg=err_msg, timeout=timeout)
+ paths, method=u"stats", err_msg=err_msg, timeout=timeout,
+ socket=socket
+ )
return json.loads(stdout)
:rtype: dict or str or int
"""
if isinstance(val, dict):
- for val_k, val_v in val.iteritems():
+ for val_k, val_v in val.items():
val[str(val_k)] = process_value(val_v)
- return val
+ retval = val
elif isinstance(val, list):
for idx, val_l in enumerate(val):
val[idx] = process_value(val_l)
- return val
+ retval = val
else:
- return binascii.hexlify(val) if isinstance(val, str) else val
+ retval = val.encode().hex() if isinstance(val, str) else val
+ return retval
api_data_processed = list()
for api in api_d:
api_args_processed = dict()
- for a_k, a_v in api["api_args"].iteritems():
+ for a_k, a_v in api[u"api_args"].items():
api_args_processed[str(a_k)] = process_value(a_v)
- api_data_processed.append(dict(api_name=api["api_name"],
- api_args=api_args_processed))
+ api_data_processed.append(
+ dict(
+ api_name=api[u"api_name"],
+ api_args=api_args_processed
+ )
+ )
return api_data_processed
- def _execute_papi(self, api_data, method='request', err_msg="",
- timeout=120):
+ def _execute_papi(
+ self, api_data, method=u"request", err_msg=u"", timeout=120,
+ socket=None):
"""Execute PAPI command(s) on remote node and store the result.
:param api_data: List of APIs with their arguments.
:raises RuntimeError: If PAPI executor failed due to another reason.
:raises AssertionError: If PAPI command(s) execution has failed.
"""
-
if not api_data:
- raise RuntimeError("No API data provided.")
+ raise RuntimeError(u"No API data provided.")
json_data = json.dumps(api_data) \
- if method in ("stats", "stats_request") \
+ if method in (u"stats", u"stats_request") \
else json.dumps(self._process_api_data(api_data))
- cmd = "{fw_dir}/{papi_provider} --method {method} --data '{json}'".\
- format(
- fw_dir=Constants.REMOTE_FW_DIR, method=method, json=json_data,
- papi_provider=Constants.RESOURCES_PAPI_PROVIDER)
+ sock = f" --socket {socket}" if socket else u""
+ cmd = f"{Constants.REMOTE_FW_DIR}/{Constants.RESOURCES_PAPI_PROVIDER}" \
+ f" --method {method} --data '{json_data}'{sock}"
try:
ret_code, stdout, _ = self._ssh.exec_command_sudo(
- cmd=cmd, timeout=timeout, log_stdout_err=False)
+ cmd=cmd, timeout=timeout, log_stdout_err=False
+ )
# TODO: Fail on non-empty stderr?
except SSHTimeout:
- logger.error("PAPI command(s) execution timeout on host {host}:"
- "\n{apis}".format(host=self._node["host"],
- apis=api_data))
+ logger.error(
+ f"PAPI command(s) execution timeout on host "
+ f"{self._node[u'host']}:\n{api_data}"
+ )
raise
except Exception as exc:
- raise_from(RuntimeError(
- "PAPI command(s) execution on host {host} "
- "failed: {apis}".format(
- host=self._node["host"], apis=api_data)), exc)
+ raise RuntimeError(
+ f"PAPI command(s) execution on host {self._node[u'host']} "
+ f"failed: {api_data}"
+ ) from exc
if ret_code != 0:
raise AssertionError(err_msg)