"""
import binascii
+import glob
import json
+import shutil
+import subprocess
+import sys
+import tempfile
+import time
+from pprint import pformat
from robot.api import logger
from resources.libraries.python.Constants import Constants
-from resources.libraries.python.ssh import SSH, SSHTimeout
+from resources.libraries.python.LocalExecution import run
+from resources.libraries.python.FilteredLogger import FilteredLogger
+from resources.libraries.python.PythonThree import raise_from
from resources.libraries.python.PapiHistory import PapiHistory
+from resources.libraries.python.ssh import (
+ SSH, SSHTimeout, exec_cmd_no_error, scp_node)
+from resources.libraries.python.VppApiCrc import VppApiCrcChecker
-__all__ = ["PapiExecutor", "PapiResponse"]
+__all__ = ["PapiExecutor", "PapiSocketExecutor"]
-class PapiResponse(object):
- """Class for metadata specifying the Papi reply, stdout, stderr and return
- code.
+def dictize(obj):
+ """A helper method, to make namedtuple-like object accessible as dict.
+
+ If the object is namedtuple-like, its _asdict() form is returned,
+ but in the returned object __getitem__ method is wrapped
+ to dictize also any items returned.
+ If the object does not have _asdict, it will be returned without any change.
+ Integer keys still access the object as tuple.
+
+ A more useful version would be to keep obj mostly as a namedtuple,
+ just add getitem for string keys. Unfortunately, namedtuple inherits
+ from tuple, including its read-only __getitem__ attribute,
+ so we cannot monkey-patch it.
+
+ TODO: Create a proxy for namedtuple to allow that.
+
+ :param obj: Arbitrary object to dictize.
+ :type obj: object
+ :returns: Dictized object.
+ :rtype: same as obj type or collections.OrderedDict
"""
+ if not hasattr(obj, "_asdict"):
+ return obj
+ ret = obj._asdict()
+ old_get = ret.__getitem__
+ new_get = lambda self, key: dictize(old_get(self, key))
+ ret.__getitem__ = new_get
+ return ret
- def __init__(self, papi_reply=None, stdout="", stderr="", requests=None):
- """Construct the Papi response by setting the values needed.
-
- TODO:
- Implement 'dump' analogue of verify_replies that would concatenate
- the values, so that call sites do not have to do that themselves.
-
- :param papi_reply: API reply from last executed PAPI command(s).
- :param stdout: stdout from last executed PAPI command(s).
- :param stderr: stderr from last executed PAPI command(s).
- :param requests: List of used PAPI requests. It is used while verifying
- replies. If None, expected replies must be provided for verify_reply
- and verify_replies methods.
- :type papi_reply: list or None
- :type stdout: str
- :type stderr: str
- :type requests: list
- """
- # API reply from last executed PAPI command(s).
- self.reply = papi_reply
+class PapiSocketExecutor(object):
+ """Methods for executing VPP Python API commands on forwarded socket.
- # stdout from last executed PAPI command(s).
- self.stdout = stdout
+ The current implementation connects for the duration of resource manager.
+ Delay for accepting connection is 10s, and disconnect is explicit.
+ TODO: Decrease 10s to value that is long enough for creating connection
+ and short enough to not affect performance.
- # stderr from last executed PAPI command(s).
- self.stderr = stderr
+ The current implementation downloads and parses .api.json files only once
+ and stores a VPPApiClient instance (disconnected) as a class variable.
+ Accessing multiple nodes with different APIs is therefore not supported.
- # List of used PAPI requests.
- self.requests = requests
+ The current implementation seems to run into read error occasionally.
+ Not sure if the error is in Python code on Robot side, ssh forwarding,
+ or socket handling at VPP side. Anyway, reconnect after some sleep
+ seems to help, hoping repeated command execution does not lead to surprises.
+ The reconnection is logged at WARN level, so it is prominently shown
+ in log.html, so we can see how frequently it happens.
- # List of expected PAPI replies. It is used while verifying replies.
- if self.requests:
- self.expected_replies = \
- ["{rqst}_reply".format(rqst=rqst) for rqst in self.requests]
+ TODO: Support sockets in NFs somehow.
+ TODO: Support handling of retval!=0 without try/except in caller.
- def __str__(self):
- """Return string with human readable description of the PapiResponse.
+ Note: Use only with "with" statement, e.g.:
- :returns: Readable description.
- :rtype: str
- """
- return (
- "papi_reply={papi_reply},stdout={stdout},stderr={stderr},"
- "requests={requests}").format(
- papi_reply=self.reply, stdout=self.stdout, stderr=self.stderr,
- requests=self.requests)
+ with PapiSocketExecutor(node) as papi_exec:
+ reply = papi_exec.add('show_version').get_reply(err_msg)
- def __repr__(self):
- """Return string executable as Python constructor call.
+ This class processes two classes of VPP PAPI methods:
+ 1. Simple request / reply: method='request'.
+ 2. Dump functions: method='dump'.
- :returns: Executable constructor call.
- :rtype: str
- """
- return "PapiResponse({str})".format(str=str(self))
+ Note that access to VPP stats over socket is not supported yet.
- def verify_reply(self, cmd_reply=None, idx=0,
- err_msg="Failed to verify PAPI reply."):
- """Verify and return data from the PAPI response.
+ The recommended ways of use are (examples):
- Note: Use only with a simple request / reply command. In this case the
- PAPI reply includes 'retval' which is checked in this method.
+ 1. Simple request / reply
- Do not use with 'dump' and 'vpp-stats' methods.
+ a. One request with no arguments:
- Use if PAPI response includes only one command reply.
+ with PapiSocketExecutor(node) as papi_exec:
+ reply = papi_exec.add('show_version').get_reply(err_msg)
+
+ b. Three requests with arguments, the second and the third ones are the same
+ but with different arguments.
- Use it this way (preferred):
+ with PapiSocketExecutor(node) as papi_exec:
+ replies = papi_exec.add(cmd1, **args1).add(cmd2, **args2).\
+ add(cmd2, **args3).get_replies(err_msg)
- with PapiExecutor(node) as papi_exec:
- data = papi_exec.add('show_version').get_replies().verify_reply()
+ 2. Dump functions
- or if you must provide the expected reply (not recommended):
+ cmd = 'sw_interface_rx_placement_dump'
+ with PapiSocketExecutor(node) as papi_exec:
+ details = papi_exec.add(cmd, sw_if_index=ifc['vpp_sw_index']).\
+ get_details(err_msg)
+ """
- with PapiExecutor(node) as papi_exec:
- data = papi_exec.add('show_version').get_replies().\
- verify_reply('show_version_reply')
+ # Class cache for reuse between instances.
+ cached_vpp_instance = None
+ api_json_directory = None
+ crc_checker_instance = None
- :param cmd_reply: PAPI reply. If None, list of 'requests' should have
- been provided to the __init__ method as pre-generated list of
- replies is used in this method in this case.
- The PapiExecutor._execute() method provides the requests
- automatically.
- :param idx: Index to PapiResponse.reply list.
- :param err_msg: The message used if the verification fails.
- :type cmd_reply: str
- :type idx: int
- :type err_msg: str or None
- :returns: Verified data from PAPI response.
- :rtype: dict
- :raises AssertionError: If the PAPI return value is not 0, so the reply
- is not valid.
- :raises KeyError, IndexError: If the reply does not have expected
- structure.
+ def __init__(self, node, remote_vpp_socket="/run/vpp-api.sock"):
+ """Store the given arguments, declare managed variables.
+
+ :param node: Node to connect to and forward unix domain socket from.
+ :param remote_vpp_socket: Path to remote socket to tunnel to.
+ :type node: dict
+ :type remote_vpp_socket: str
+ """
+ self._node = node
+ self._remote_vpp_socket = remote_vpp_socket
+ # The list of PAPI commands to be executed on the node.
+ self._api_command_list = list()
+ # The following values are set on enter, reset on exit.
+ self._temp_dir = None
+ self._ssh_control_socket = None
+ self._local_vpp_socket = None
+
+ def create_crc_checker(self):
+ """Return the cached instance or create new one from directory.
+
+ It is assumed self.api_json_directory is set, as a class variable.
+
+ :returns: Cached or newly created instance aware of .api.json content.
+ :rtype: VppApiCrc.VppApiCrcChecker
"""
- cmd_rpl = self.expected_replies[idx] if cmd_reply is None else cmd_reply
+ cls = self.__class__
+ if cls.crc_checker_instance is None:
+ cls.crc_checker_instance = VppApiCrcChecker(cls.api_json_directory)
+ return cls.crc_checker_instance
+
+ @property
+ def vpp_instance(self):
+ """Return VPP instance with bindings to all API calls.
- data = self.reply[idx]['api_reply'][cmd_rpl]
- if data['retval'] != 0:
- raise AssertionError("{msg}\nidx={idx}, cmd_reply={reply}".
- format(msg=err_msg, idx=idx, reply=cmd_rpl))
+ The returned instance is initialized for unix domain socket access,
+ it has initialized all the bindings, but it is not connected
+ (to local socket) yet.
- return data
+ First invocation downloads .api.json files from self._node
+ into a temporary directory.
- def verify_replies(self, cmd_replies=None,
- err_msg="Failed to verify PAPI reply."):
- """Verify and return data from the PAPI response.
+ After first invocation, the result is cached, so other calls are quick.
+ Class variable is used as the cache, but this property is defined as
+ an instance method, so that _node (for api files) is known.
- Note: Use only with request / reply commands. In this case each
- PAPI reply includes 'retval' which is checked.
+ :returns: Initialized but not connected VPP instance.
+ :rtype: vpp_papi.VPPApiClient
+ """
+ cls = self.__class__
+ if cls.cached_vpp_instance is not None:
+ return cls.cached_vpp_instance
+ tmp_dir = tempfile.mkdtemp(dir="/tmp")
+ package_path = "Not set yet."
+ try:
+ # Pack, copy and unpack Python part of VPP installation from _node.
+ # TODO: Use rsync or recursive version of ssh.scp_node instead?
+ node = self._node
+ exec_cmd_no_error(node, ["rm", "-rf", "/tmp/papi.txz"])
+ # Papi python version depends on OS (and time).
+ # Python 2.7 or 3.4, site-packages or dist-packages.
+ installed_papi_glob = "/usr/lib/python*/*-packages/vpp_papi"
+ # We need to wrap this command in bash, in order to expand globs,
+ # and as ssh does join, the inner command has to be quoted.
+ inner_cmd = " ".join([
+ "tar", "cJf", "/tmp/papi.txz", "--exclude=*.pyc",
+ installed_papi_glob, "/usr/share/vpp/api"])
+ exec_cmd_no_error(node, ["bash", "-c", "'" + inner_cmd + "'"])
+ scp_node(node, tmp_dir + "/papi.txz", "/tmp/papi.txz", get=True)
+ run(["tar", "xf", tmp_dir + "/papi.txz", "-C", tmp_dir])
+ cls.api_json_directory = tmp_dir + "/usr/share/vpp/api"
+ # Perform initial checks before .api.json files are gone,
+ # by accessing the property (which also creates its instance).
+ self.create_crc_checker()
+ # When present locally, we finally can find the installation path.
+ package_path = glob.glob(tmp_dir + installed_papi_glob)[0]
+ # Package path has to be one level above the vpp_papi directory.
+ package_path = package_path.rsplit('/', 1)[0]
+ sys.path.append(package_path)
+ from vpp_papi.vpp_papi import VPPApiClient as vpp_class
+ vpp_class.apidir = cls.api_json_directory
+ # We need to create instance before removing from sys.path.
+ cls.cached_vpp_instance = vpp_class(
+ use_socket=True, server_address="TBD", async_thread=False,
+ read_timeout=14, logger=FilteredLogger(logger, "INFO"))
+ # Cannot use loglevel parameter, robot.api.logger lacks support.
+ # TODO: Stop overriding read_timeout when VPP-1722 is fixed.
+ finally:
+ shutil.rmtree(tmp_dir)
+ if sys.path[-1] == package_path:
+ sys.path.pop()
+ return cls.cached_vpp_instance
+
+ def __enter__(self):
+ """Create a tunnel, connect VPP instance.
+
+ Only at this point a local socket names are created
+ in a temporary directory, because VIRL runs 3 pybots at once,
+ so harcoding local filenames does not work.
+
+ :returns: self
+ :rtype: PapiSocketExecutor
+ """
+ # Parsing takes longer than connecting, prepare instance before tunnel.
+ vpp_instance = self.vpp_instance
+ node = self._node
+ self._temp_dir = tempfile.mkdtemp(dir="/tmp")
+ self._local_vpp_socket = self._temp_dir + "/vpp-api.sock"
+ self._ssh_control_socket = self._temp_dir + "/ssh.sock"
+ ssh_socket = self._ssh_control_socket
+ # Cleanup possibilities.
+ ret_code, _ = run(["ls", ssh_socket], check=False)
+ if ret_code != 2:
+ # This branch never seems to be hit in CI,
+ # but may be useful when testing manually.
+ run(["ssh", "-S", ssh_socket, "-O", "exit", "0.0.0.0"],
+ check=False, log=True)
+ # TODO: Is any sleep necessary? How to prove if not?
+ run(["sleep", "0.1"])
+ run(["rm", "-vrf", ssh_socket])
+ # Even if ssh can perhaps reuse this file,
+ # we need to remove it for readiness detection to work correctly.
+ run(["rm", "-rvf", self._local_vpp_socket])
+ # On VIRL, the ssh user is not added to "vpp" group,
+ # so we need to change remote socket file access rights.
+ exec_cmd_no_error(
+ node, "chmod o+rwx " + self._remote_vpp_socket, sudo=True)
+ # We use sleep command. The ssh command will exit in 10 second,
+ # unless a local socket connection is established,
+ # in which case the ssh command will exit only when
+ # the ssh connection is closed again (via control socket).
+ # The log level is to supress "Warning: Permanently added" messages.
+ ssh_cmd = [
+ "ssh", "-S", ssh_socket, "-M",
+ "-o", "LogLevel=ERROR", "-o", "UserKnownHostsFile=/dev/null",
+ "-o", "StrictHostKeyChecking=no", "-o", "ExitOnForwardFailure=yes",
+ "-L", self._local_vpp_socket + ':' + self._remote_vpp_socket,
+ "-p", str(node['port']), node['username'] + "@" + node['host'],
+ "sleep", "10"]
+ priv_key = node.get("priv_key")
+ if priv_key:
+ # This is tricky. We need a file to pass the value to ssh command.
+ # And we need ssh command, because paramiko does not suport sockets
+ # (neither ssh_socket, nor _remote_vpp_socket).
+ key_file = tempfile.NamedTemporaryFile()
+ key_file.write(priv_key)
+ # Make sure the content is written, but do not close yet.
+ key_file.flush()
+ ssh_cmd[1:1] = ["-i", key_file.name]
+ password = node.get("password")
+ if password:
+ # Prepend sshpass command to set password.
+ ssh_cmd[:0] = ["sshpass", "-p", password]
+ time_stop = time.time() + 10.0
+ # subprocess.Popen seems to be the best way to run commands
+ # on background. Other ways (shell=True with "&" and ssh with -f)
+ # seem to be too dependent on shell behavior.
+ # In particular, -f does NOT return values for run().
+ subprocess.Popen(ssh_cmd)
+ # Check socket presence on local side.
+ while time.time() < time_stop:
+ # It can take a moment for ssh to create the socket file.
+ ret_code, _ = run(["ls", "-l", self._local_vpp_socket], check=False)
+ if not ret_code:
+ break
+ time.sleep(0.1)
+ else:
+ raise RuntimeError("Local side socket has not appeared.")
+ if priv_key:
+ # Socket up means the key has been read. Delete file by closing it.
+ key_file.close()
+ # Everything is ready, set the local socket address and connect.
+ vpp_instance.transport.server_address = self._local_vpp_socket
+ # It seems we can get read error even if every preceding check passed.
+ # Single retry seems to help.
+ for _ in xrange(2):
+ try:
+ vpp_instance.connect_sync("csit_socket")
+ except IOError as err:
+ logger.warn("Got initial connect error {err!r}".format(err=err))
+ vpp_instance.disconnect()
+ else:
+ break
+ else:
+ raise RuntimeError("Failed to connect to VPP over a socket.")
+ return self
- Do not use with 'dump' and 'vpp-stats' methods.
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ """Disconnect the vpp instance, tear down the SHH tunnel.
- Use if PAPI response includes more than one command reply.
+ Also remove the local sockets by deleting the temporary directory.
+ Arguments related to possible exception are entirely ignored.
+ """
+ self.vpp_instance.disconnect()
+ run(["ssh", "-S", self._ssh_control_socket, "-O", "exit", "0.0.0.0"],
+ check=False)
+ shutil.rmtree(self._temp_dir)
+ return
- Use it this way:
+ def add(self, csit_papi_command, history=True, **kwargs):
+ """Add next command to internal command list; return self.
- with PapiExecutor(node) as papi_exec:
- papi_exec.add(cmd1, **args1).add(cmd2, **args2).add(cmd2, **args3).\
- get_replies(err_msg).verify_replies()
+ The argument name 'csit_papi_command' must be unique enough as it cannot
+ be repeated in kwargs.
+ Unless disabled, new entry to papi history is also added at this point.
- or if you need the data from the PAPI response:
+ Any pending conflicts from .api.json processing are raised.
+ Then the command name is checked for known CRCs.
+ Unsupported commands raise an exception, as CSIT change
+ should not start using messages without making sure which CRCs
+ are supported.
+ Each CRC issue is raised only once, so subsequent tests
+ can raise other issues.
- with PapiExecutor(node) as papi_exec:
- data = papi_exec.add(cmd1, **args1).add(cmd2, **args2).\
- add(cmd2, **args3).get_replies(err_msg).verify_replies()
+ :param csit_papi_command: VPP API command.
+ :param history: Enable/disable adding command to PAPI command history.
+ :param kwargs: Optional key-value arguments.
+ :type csit_papi_command: str
+ :type history: bool
+ :type kwargs: dict
+ :returns: self, so that method chaining is possible.
+ :rtype: PapiSocketExecutor
+ :raises RuntimeError: If unverified or conflicting CRC is encountered.
+ """
+ if history:
+ PapiHistory.add_to_papi_history(
+ self._node, csit_papi_command, **kwargs)
+ self._api_command_list.append(
+ dict(api_name=csit_papi_command, api_args=kwargs))
+ return self
- or if you must provide the list of expected replies (not recommended):
+ def get_replies(self, err_msg="Failed to get replies."):
+ """Get replies from VPP Python API.
- with PapiExecutor(node) as papi_exec:
- data = papi_exec.add(cmd1, **args1).add(cmd2, **args2).\
- add(cmd2, **args3).get_replies(err_msg).\
- verify_replies(cmd_replies=cmd_replies)
+ The replies are parsed into dict-like objects,
+ "retval" field is guaranteed to be zero on success.
- :param cmd_replies: List of PAPI command replies. If None, list of
- 'requests' should have been provided to the __init__ method as
- pre-generated list of replies is used in this method in this case.
- The PapiExecutor._execute() method provides the requests
- automatically.
- :param err_msg: The message used if the verification fails.
- :type cmd_replies: list of str or None
+ :param err_msg: The message used if the PAPI command(s) execution fails.
:type err_msg: str
- :returns: List of verified data from PAPI response.
- :rtype list
- :raises AssertionError: If the PAPI response does not include at least
- one of specified command replies.
+ :returns: Responses, dict objects with fields due to API and "retval".
+ :rtype: list of dict
+ :raises RuntimeError: If retval is nonzero, parsing or ssh error.
"""
- data = list()
+ return self._execute(err_msg=err_msg)
- cmd_rpls = self.expected_replies if cmd_replies is None else cmd_replies
+ def get_reply(self, err_msg="Failed to get reply."):
+ """Get reply from VPP Python API.
- if len(self.reply) != len(cmd_rpls):
- raise AssertionError(err_msg)
- for idx, cmd_reply in enumerate(cmd_rpls):
- data.append(self.verify_reply(cmd_reply, idx, err_msg))
+ The reply is parsed into dict-like object,
+ "retval" field is guaranteed to be zero on success.
- return data
+ TODO: Discuss exception types to raise, unify with inner methods.
+ :param err_msg: The message used if the PAPI command(s) execution fails.
+ :type err_msg: str
+ :returns: Response, dict object with fields due to API and "retval".
+ :rtype: dict
+ :raises AssertionError: If retval is nonzero, parsing or ssh error.
+ """
+ replies = self.get_replies(err_msg=err_msg)
+ if len(replies) != 1:
+ raise RuntimeError("Expected single reply, got {replies!r}".format(
+ replies=replies))
+ return replies[0]
-class PapiExecutor(object):
- """Contains methods for executing VPP Python API commands on DUTs.
+ def get_sw_if_index(self, err_msg="Failed to get reply."):
+ """Get sw_if_index from reply from VPP Python API.
- Note: Use only with "with" statement, e.g.:
+ Frequently, the caller is only interested in sw_if_index field
+ of the reply, this wrapper makes such call sites shorter.
- with PapiExecutor(node) as papi_exec:
- papi_resp = papi_exec.add('show_version').get_replies(err_msg)
+ TODO: Discuss exception types to raise, unify with inner methods.
- This class processes three classes of VPP PAPI methods:
- 1. simple request / reply: method='request',
- 2. dump functions: method='dump',
- 3. vpp-stats: method='stats'.
+ :param err_msg: The message used if the PAPI command(s) execution fails.
+ :type err_msg: str
+ :returns: Response, sw_if_index value of the reply.
+ :rtype: int
+ :raises AssertionError: If retval is nonzero, parsing or ssh error.
+ """
+ reply = self.get_reply(err_msg=err_msg)
+ logger.info("Getting index from {reply!r}".format(reply=reply))
+ return reply["sw_if_index"]
- The recommended ways of use are (examples):
+ def get_details(self, err_msg="Failed to get dump details."):
+ """Get dump details from VPP Python API.
- 1. Simple request / reply
+ The details are parsed into dict-like objects.
+ The number of details per single dump command can vary,
+ and all association between details and dumps is lost,
+ so if you care about the association (as opposed to
+ logging everything at once for debugging purposes),
+ it is recommended to call get_details for each dump (type) separately.
- a. One request with no arguments:
+ :param err_msg: The message used if the PAPI command(s) execution fails.
+ :type err_msg: str
+ :returns: Details, dict objects with fields due to API without "retval".
+ :rtype: list of dict
+ """
+ return self._execute(err_msg)
- with PapiExecutor(node) as papi_exec:
- data = papi_exec.add('show_version').get_replies().\
- verify_reply()
+ @staticmethod
+ def run_cli_cmd(node, cmd, log=True):
+ """Run a CLI command as cli_inband, return the "reply" field of reply.
- b. Three requests with arguments, the second and the third ones are the same
- but with different arguments.
+ Optionally, log the field value.
- with PapiExecutor(node) as papi_exec:
- data = papi_exec.add(cmd1, **args1).add(cmd2, **args2).\
- add(cmd2, **args3).get_replies(err_msg).verify_replies()
+ :param node: Node to run command on.
+ :param cmd: The CLI command to be run on the node.
+ :param log: If True, the response is logged.
+ :type node: dict
+ :type cmd: str
+ :type log: bool
+ :returns: CLI output.
+ :rtype: str
+ """
+ cli = 'cli_inband'
+ args = dict(cmd=cmd)
+ err_msg = "Failed to run 'cli_inband {cmd}' PAPI command on host " \
+ "{host}".format(host=node['host'], cmd=cmd)
+ with PapiSocketExecutor(node) as papi_exec:
+ reply = papi_exec.add(cli, **args).get_reply(err_msg)["reply"]
+ if log:
+ logger.info("{cmd}:\n{reply}".format(cmd=cmd, reply=reply))
+ return reply
- 2. Dump functions
+ @staticmethod
+ def dump_and_log(node, cmds):
+ """Dump and log requested information, return None.
- cmd = 'sw_interface_rx_placement_dump'
- with PapiExecutor(node) as papi_exec:
- papi_resp = papi_exec.add(cmd, sw_if_index=ifc['vpp_sw_index']).\
- get_dump(err_msg)
+ :param node: DUT node.
+ :param cmds: Dump commands to be executed.
+ :type node: dict
+ :type cmds: list of str
+ """
+ with PapiSocketExecutor(node) as papi_exec:
+ for cmd in cmds:
+ dump = papi_exec.add(cmd).get_details()
+ logger.debug("{cmd}:\n{data}".format(
+ cmd=cmd, data=pformat(dump)))
- 3. vpp-stats
+ def _execute(self, err_msg="Undefined error message"):
+ """Turn internal command list into data and execute; return replies.
- path = ['^/if', '/err/ip4-input', '/sys/node/ip4-input']
+ This method also clears the internal command list.
+
+ IMPORTANT!
+ Do not use this method in L1 keywords. Use:
+ - get_replies()
+ - get_reply()
+ - get_sw_if_index()
+ - get_details()
- with PapiExecutor(node) as papi_exec:
- data = papi_exec.add(api_name='vpp-stats', path=path).get_stats()
+ :param err_msg: The message used if the PAPI command(s) execution fails.
+ :type err_msg: str
+ :returns: Papi responses parsed into a dict-like object,
+ with fields due to API (possibly including retval).
+ :rtype: list of dict
+ :raises RuntimeError: If the replies are not all correct.
+ """
+ vpp_instance = self.vpp_instance
+ local_list = self._api_command_list
+ # Clear first as execution may fail.
+ self._api_command_list = list()
+ replies = list()
+ for command in local_list:
+ api_name = command["api_name"]
+ papi_fn = getattr(vpp_instance.api, api_name)
+ try:
+ try:
+ reply = papi_fn(**command["api_args"])
+ except IOError as err:
+ # Ocassionally an error happens, try reconnect.
+ logger.warn("Reconnect after error: {err!r}".format(
+ err=err))
+ self.vpp_instance.disconnect()
+ # Testing showes immediate reconnect fails.
+ time.sleep(1)
+ self.vpp_instance.connect_sync("csit_socket")
+ logger.trace("Reconnected.")
+ reply = papi_fn(**command["api_args"])
+ except (AttributeError, IOError) as err:
+ raise_from(AssertionError(err_msg), err, level="INFO")
+ # *_dump commands return list of objects, convert, ordinary reply.
+ if not isinstance(reply, list):
+ reply = [reply]
+ for item in reply:
+ dict_item = dictize(item)
+ if "retval" in dict_item.keys():
+ # *_details messages do not contain retval.
+ retval = dict_item["retval"]
+ if retval != 0:
+ # TODO: What exactly to log and raise here?
+ err = AssertionError("Retval {rv!r}".format(rv=retval))
+ # Lowering log level, some retval!=0 calls are expected.
+ # TODO: Expose level argument so callers can decide?
+ raise_from(AssertionError(err_msg), err, level="DEBUG")
+ replies.append(dict_item)
+ return replies
- print('RX interface core 0, sw_if_index 0:\n{0}'.\
- format(data[0]['/if/rx'][0][0]))
- or
+class PapiExecutor(object):
+ """Contains methods for executing VPP Python API commands on DUTs.
- path_1 = ['^/if', ]
- path_2 = ['^/if', '/err/ip4-input', '/sys/node/ip4-input']
+ TODO: Remove .add step, make get_stats accept paths directly.
- with PapiExecutor(node) as papi_exec:
- data = papi_exec.add('vpp-stats', path=path_1).\
- add('vpp-stats', path=path_2).get_stats()
+ This class processes only one type of VPP PAPI methods: vpp-stats.
- print('RX interface core 0, sw_if_index 0:\n{0}'.\
- format(data[1]['/if/rx'][0][0]))
+ The recommended ways of use are (examples):
- Note: In this case, when PapiExecutor method 'add' is used:
- - its parameter 'csit_papi_command' is used only to keep information
- that vpp-stats are requested. It is not further processed but it is
- included in the PAPI history this way:
- vpp-stats(path=['^/if', '/err/ip4-input', '/sys/node/ip4-input'])
- Always use csit_papi_command="vpp-stats" if the VPP PAPI method
- is "stats".
- - the second parameter must be 'path' as it is used by PapiExecutor
- method 'add'.
+ path = ['^/if', '/err/ip4-input', '/sys/node/ip4-input']
+ with PapiExecutor(node) as papi_exec:
+ stats = papi_exec.add(api_name='vpp-stats', path=path).get_stats()
+
+ print('RX interface core 0, sw_if_index 0:\n{0}'.\
+ format(stats[0]['/if/rx'][0][0]))
+
+ or
+
+ path_1 = ['^/if', ]
+ path_2 = ['^/if', '/err/ip4-input', '/sys/node/ip4-input']
+ with PapiExecutor(node) as papi_exec:
+ stats = papi_exec.add('vpp-stats', path=path_1).\
+ add('vpp-stats', path=path_2).get_stats()
+
+ print('RX interface core 0, sw_if_index 0:\n{0}'.\
+ format(stats[1]['/if/rx'][0][0]))
+
+ Note: In this case, when PapiExecutor method 'add' is used:
+ - its parameter 'csit_papi_command' is used only to keep information
+ that vpp-stats are requested. It is not further processed but it is
+ included in the PAPI history this way:
+ vpp-stats(path=['^/if', '/err/ip4-input', '/sys/node/ip4-input'])
+ Always use csit_papi_command="vpp-stats" if the VPP PAPI method
+ is "stats".
+ - the second parameter must be 'path' as it is used by PapiExecutor
+ method 'add'.
"""
def __init__(self, node):
def __exit__(self, exc_type, exc_val, exc_tb):
self._ssh.disconnect(self._node)
- def add(self, csit_papi_command="vpp-stats", **kwargs):
+ def add(self, csit_papi_command="vpp-stats", history=True, **kwargs):
"""Add next command to internal command list; return self.
The argument name 'csit_papi_command' must be unique enough as it cannot
be repeated in kwargs.
:param csit_papi_command: VPP API command.
+ :param history: Enable/disable adding command to PAPI command history.
:param kwargs: Optional key-value arguments.
:type csit_papi_command: str
+ :type history: bool
:type kwargs: dict
:returns: self, so that method chaining is possible.
:rtype: PapiExecutor
"""
- PapiHistory.add_to_papi_history(self._node, csit_papi_command, **kwargs)
+ if history:
+ PapiHistory.add_to_papi_history(
+ self._node, csit_papi_command, **kwargs)
self._api_command_list.append(dict(api_name=csit_papi_command,
api_args=kwargs))
return self
:type err_msg: str
:type timeout: int
:returns: Requested VPP statistics.
- :rtype: list
+ :rtype: list of dict
"""
paths = [cmd['api_args']['path'] for cmd in self._api_command_list]
self._api_command_list = list()
- stdout, _ = self._execute_papi(
+ stdout = self._execute_papi(
paths, method='stats', err_msg=err_msg, timeout=timeout)
return json.loads(stdout)
- def get_replies(self, err_msg="Failed to get replies.",
- process_reply=True, ignore_errors=False, timeout=120):
- """Get reply/replies from VPP Python API.
-
- :param err_msg: The message used if the PAPI command(s) execution fails.
- :param process_reply: Process PAPI reply if True.
- :param ignore_errors: If true, the errors in the reply are ignored.
- :param timeout: Timeout in seconds.
- :type err_msg: str
- :type process_reply: bool
- :type ignore_errors: bool
- :type timeout: int
- :returns: Papi response including: papi reply, stdout, stderr and
- return code.
- :rtype: PapiResponse
- """
- return self._execute(
- method='request', process_reply=process_reply,
- ignore_errors=ignore_errors, err_msg=err_msg, timeout=timeout)
-
- def get_dump(self, err_msg="Failed to get dump.",
- process_reply=True, ignore_errors=False, timeout=120):
- """Get dump from VPP Python API.
-
- :param err_msg: The message used if the PAPI command(s) execution fails.
- :param process_reply: Process PAPI reply if True.
- :param ignore_errors: If true, the errors in the reply are ignored.
- :param timeout: Timeout in seconds.
- :type err_msg: str
- :type process_reply: bool
- :type ignore_errors: bool
- :type timeout: int
- :returns: Papi response including: papi reply, stdout, stderr and
- return code.
- :rtype: PapiResponse
- """
- return self._execute(
- method='dump', process_reply=process_reply,
- ignore_errors=ignore_errors, err_msg=err_msg, timeout=timeout)
-
- def execute_should_pass(self, err_msg="Failed to execute PAPI command.",
- process_reply=True, ignore_errors=False,
- timeout=120):
- """Execute the PAPI commands and check the return code.
- Raise exception if the PAPI command(s) failed.
-
- IMPORTANT!
- Do not use this method in L1 keywords. Use:
- - get_replies()
- - get_dump()
- This method will be removed soon.
-
- :param err_msg: The message used if the PAPI command(s) execution fails.
- :param process_reply: Indicate whether or not to process PAPI reply.
- :param ignore_errors: If true, the errors in the reply are ignored.
- :param timeout: Timeout in seconds.
- :type err_msg: str
- :type process_reply: bool
- :type ignore_errors: bool
- :type timeout: int
- :returns: Papi response including: papi reply, stdout, stderr and
- return code.
- :rtype: PapiResponse
- :raises AssertionError: If PAPI command(s) execution failed.
- """
- # TODO: Migrate callers to get_replies and delete this method.
- return self.get_replies(
- process_reply=process_reply, ignore_errors=ignore_errors,
- err_msg=err_msg, timeout=timeout)
-
@staticmethod
def _process_api_data(api_d):
"""Process API data for smooth converting to JSON string.
:rtype: list
"""
+ def process_value(val):
+ """Process value.
+
+ :param val: Value to be processed.
+ :type val: object
+ :returns: Processed value.
+ :rtype: dict or str or int
+ """
+ if isinstance(val, dict):
+ for val_k, val_v in val.iteritems():
+ val[str(val_k)] = process_value(val_v)
+ return val
+ elif isinstance(val, list):
+ for idx, val_l in enumerate(val):
+ val[idx] = process_value(val_l)
+ return val
+ else:
+ return binascii.hexlify(val) if isinstance(val, str) else val
+
api_data_processed = list()
for api in api_d:
api_args_processed = dict()
for a_k, a_v in api["api_args"].iteritems():
- value = binascii.hexlify(a_v) if isinstance(a_v, str) else a_v
- api_args_processed[str(a_k)] = value
+ api_args_processed[str(a_k)] = process_value(a_v)
api_data_processed.append(dict(api_name=api["api_name"],
api_args=api_args_processed))
return api_data_processed
- @staticmethod
- def _revert_api_reply(api_r):
- """Process API reply / a part of API reply.
-
- Apply binascii.unhexlify() method for unicode values.
-
- TODO: Implement complex solution to process of replies.
-
- :param api_r: API reply.
- :type api_r: dict
- :returns: Processed API reply / a part of API reply.
- :rtype: dict
- """
- reply_dict = dict()
- reply_value = dict()
- for reply_key, reply_v in api_r.iteritems():
- for a_k, a_v in reply_v.iteritems():
- reply_value[a_k] = binascii.unhexlify(a_v) \
- if isinstance(a_v, unicode) else a_v
- reply_dict[reply_key] = reply_value
- return reply_dict
-
- def _process_reply(self, api_reply):
- """Process API reply.
-
- :param api_reply: API reply.
- :type api_reply: dict or list of dict
- :returns: Processed API reply.
- :rtype: list or dict
- """
- if isinstance(api_reply, list):
- reverted_reply = [self._revert_api_reply(a_r) for a_r in api_reply]
- else:
- reverted_reply = self._revert_api_reply(api_reply)
- return reverted_reply
-
def _execute_papi(self, api_data, method='request', err_msg="",
timeout=120):
"""Execute PAPI command(s) on remote node and store the result.
:type method: str
:type err_msg: str
:type timeout: int
- :returns: Stdout and stderr.
- :rtype: 2-tuple of str
+ :returns: Stdout from remote python utility, to be parsed by caller.
+ :rtype: str
:raises SSHTimeout: If PAPI command(s) execution has timed out.
:raises RuntimeError: If PAPI executor failed due to another reason.
:raises AssertionError: If PAPI command(s) execution has failed.
"""
if not api_data:
- RuntimeError("No API data provided.")
+ raise RuntimeError("No API data provided.")
- json_data = json.dumps(api_data) if method == "stats" \
+ json_data = json.dumps(api_data) \
+ if method in ("stats", "stats_request") \
else json.dumps(self._process_api_data(api_data))
cmd = "{fw_dir}/{papi_provider} --method {method} --data '{json}'".\
- format(fw_dir=Constants.REMOTE_FW_DIR,
- papi_provider=Constants.RESOURCES_PAPI_PROVIDER,
- method=method,
- json=json_data)
+ format(
+ fw_dir=Constants.REMOTE_FW_DIR, method=method, json=json_data,
+ papi_provider=Constants.RESOURCES_PAPI_PROVIDER)
try:
- ret_code, stdout, stderr = self._ssh.exec_command_sudo(
- cmd=cmd, timeout=timeout)
+ ret_code, stdout, _ = self._ssh.exec_command_sudo(
+ cmd=cmd, timeout=timeout, log_stdout_err=False)
+ # TODO: Fail on non-empty stderr?
except SSHTimeout:
logger.error("PAPI command(s) execution timeout on host {host}:"
"\n{apis}".format(host=self._node["host"],
apis=api_data))
raise
- except Exception:
- raise RuntimeError("PAPI command(s) execution on host {host} "
- "failed: {apis}".format(host=self._node["host"],
- apis=api_data))
+ except Exception as exc:
+ raise_from(RuntimeError(
+ "PAPI command(s) execution on host {host} "
+ "failed: {apis}".format(
+ host=self._node["host"], apis=api_data)), exc)
if ret_code != 0:
raise AssertionError(err_msg)
- return stdout, stderr
-
- def _execute(self, method='request', process_reply=True,
- ignore_errors=False, err_msg="", timeout=120):
- """Turn internal command list into proper data and execute; return
- PAPI response.
-
- This method also clears the internal command list.
-
- IMPORTANT!
- Do not use this method in L1 keywords. Use:
- - get_stats()
- - get_replies()
- - get_dump()
-
- :param method: VPP Python API method. Supported methods are: 'request',
- 'dump' and 'stats'.
- :param process_reply: Process PAPI reply if True.
- :param ignore_errors: If true, the errors in the reply are ignored.
- :param err_msg: The message used if the PAPI command(s) execution fails.
- :param timeout: Timeout in seconds.
- :type method: str
- :type process_reply: bool
- :type ignore_errors: bool
- :type err_msg: str
- :type timeout: int
- :returns: Papi response including: papi reply, stdout, stderr and
- return code.
- :rtype: PapiResponse
- :raises KeyError: If the reply is not correct.
- """
-
- local_list = self._api_command_list
-
- # Clear first as execution may fail.
- self._api_command_list = list()
-
- stdout, stderr = self._execute_papi(
- local_list, method=method, err_msg=err_msg, timeout=timeout)
- papi_reply = list()
- if process_reply:
- try:
- json_data = json.loads(stdout)
- except ValueError:
- logger.error("An error occured while processing the PAPI "
- "request:\n{rqst}".format(rqst=local_list))
- raise
- for data in json_data:
- try:
- api_reply_processed = dict(
- api_name=data["api_name"],
- api_reply=self._process_reply(data["api_reply"]))
- except KeyError:
- if ignore_errors:
- continue
- else:
- raise
- papi_reply.append(api_reply_processed)
-
- # Log processed papi reply to be able to check API replies changes
- logger.debug("Processed PAPI reply: {reply}".format(reply=papi_reply))
-
- return PapiResponse(
- papi_reply=papi_reply, stdout=stdout, stderr=stderr,
- requests=[rqst["api_name"] for rqst in local_list])
+ return stdout