1 # Copyright (c) 2024 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Python API executor library.
16 TODO: Document sync and async handling properly.
24 import struct # vpp-papi can raise struct.error
29 from collections import deque, UserDict
31 from pprint import pformat
32 from robot.api import logger
34 from resources.libraries.python.Constants import Constants
35 from resources.libraries.python.LocalExecution import run
36 from resources.libraries.python.FilteredLogger import FilteredLogger
37 from resources.libraries.python.PapiHistory import PapiHistory
38 from resources.libraries.python.ssh import (
44 from resources.libraries.python.topology import Topology, SocketType
45 from resources.libraries.python.VppApiCrc import VppApiCrcChecker
56 """A helper method, to make namedtuple-like object accessible as dict.
58 If the object is namedtuple-like, its _asdict() form is returned,
59 but in the returned object __getitem__ method is wrapped
60 to dictize also any items returned.
61 If the object does not have _asdict, it will be returned without any change.
62 Integer keys still access the object as tuple.
64 A more useful version would be to keep obj mostly as a namedtuple,
65 just add getitem for string keys. Unfortunately, namedtuple inherits
66 from tuple, including its read-only __getitem__ attribute,
67 so we cannot monkey-patch it.
69 TODO: Create a proxy for named tuple to allow that.
71 :param obj: Arbitrary object to dictize.
73 :returns: Dictized object.
74 :rtype: same as obj type or collections.UserDict
76 if not hasattr(obj, "_asdict"):
78 overriden = UserDict(obj._asdict())
79 old_get = overriden.__getitem__
80 overriden.__getitem__ = lambda self, key: dictize(old_get(self, key))
84 def dictize_and_check_retval(obj, err_msg):
85 """Make namedtuple-like object accessible as dict, check retval if exists.
87 If the object contains "retval" field, raise when the value is non-zero.
89 See dictize() for what it means to dictize.
91 :param obj: Arbitrary object to dictize.
92 :param err_msg: The (additional) text for the raised exception.
95 :returns: Dictized object.
96 :rtype: same as obj type or collections.UserDict
97 :raises AssertionError: If retval field is present with nonzero value.
100 # *_details messages do not contain retval.
101 retval = ret.get("retval", 0)
103 raise AssertionError(f"{err_msg}\nRetval nonzero in object {ret!r}")
107 class PapiSocketExecutor:
108 """Methods for executing VPP Python API commands on forwarded socket.
110 The current implementation downloads and parses .api.json files only once
111 and caches client instances for reuse.
112 Cleanup metadata is added as additional attributes
113 directly to the client instances.
115 The current implementation caches the connected client instances.
116 As a downside, clients need to be explicitly told to disconnect
119 The current implementation seems to run into read error occasionally.
120 Not sure if the error is in Python code on Robot side, ssh forwarding,
121 or socket handling at VPP side. Anyway, reconnect after some sleep
122 seems to help, hoping repeated command execution does not lead to surprises.
123 The reconnection is logged at WARN level, so it is prominently shown
124 in log.html, so we can see how frequently it happens.
125 There are similar retries cleanups in other places
126 (so unresponsive VPPs do not break test much more than needed),
127 but it is hard to verify all that works correctly.
128 Especially, if Robot crashes, files and ssh processes may leak.
130 TODO: Decrease current timeout value when creating connections
131 so broken VPP does not prolong job duration too much
132 while good VPP (almost) never fails to connect.
134 TODO: Support handling of retval!=0 without try/except in caller.
136 This class processes two classes of VPP PAPI methods:
137 1. Simple request / reply: method='request'.
138 2. Dump functions: method='dump'.
140 Note that access to VPP stats over socket is not supported yet.
142 The recommended ways of use are (examples):
144 1. Simple request / reply. Example with no arguments:
147 with PapiSocketExecutor(node) as papi_exec:
148 reply = papi_exec.add(cmd).get_reply(err_msg)
152 cmd = "sw_interface_rx_placement_dump"
153 with PapiSocketExecutor(node) as papi_exec:
154 papi_exec.add(cmd, sw_if_index=ifc["vpp_sw_index"])
155 details = papi_exec.get_details(err_msg)
157 3. Multiple requests with one reply each.
158 In this example, there are three requests with arguments,
159 the second and the third ones are the same but with different arguments.
160 This example also showcases method chaining.
162 with PapiSocketExecutor(node, is_async=True) as papi_exec:
163 replies = papi_exec.add(cmd1, **args1).add(cmd2, **args2).\
164 add(cmd2, **args3).get_replies(err_msg)
166 The "is_async=True" part in the last example enables "async handling mode",
167 which imposes limitations but gains speed and saves memory.
168 This is different than async mode of VPP PAPI, as the default handling mode
169 also uses async PAPI connections.
171 The implementation contains more hidden details, such as
172 support for old VPP PAPI async mode behavior, API CRC checking
173 conditional usage of control ping, and possible susceptibility to VPP-2033.
174 See docstring of methods for more detailed info.
177 # Class cache for reuse between instances.
179 """We copy .api json files and PAPI code from DUT to robot machine.
180 This class variable holds temporary directory once created.
181 When python exits, the directory is deleted, so no downloaded file leaks.
182 The value will be set to TemporaryDirectory class instance (not string path)
183 to ensure deletion at exit."""
185 """String path to .api.json files, a directory somewhere in api_root_dir."""
186 api_package_path = None
187 """String path to PAPI code, a different directory under api_root_dir."""
189 """Accesses .api.json files at creation, caching speeds up accessing it."""
190 reusable_vpp_client_list = list()
191 """Each connection needs a separate client instance,
192 and each client instance creation needs to parse all .api files,
193 which takes time. If a client instance disconnects, it is put here,
194 so on next connect we can reuse intead of creating new."""
196 """Mapping from node key to connected client instance."""
199 self, node, remote_vpp_socket=Constants.SOCKSVR_PATH, is_async=False
201 """Store the given arguments, declare managed variables.
203 :param node: Node to connect to and forward unix domain socket from.
204 :param remote_vpp_socket: Path to remote socket to tunnel to.
205 :param is_async: Whether to use async handling.
207 :type remote_vpp_socket: str
211 self._remote_vpp_socket = remote_vpp_socket
212 self._is_async = is_async
213 # The list of PAPI commands to be executed on the node.
214 self._api_command_list = list()
216 def ensure_api_dirs(self):
217 """Copy files from DUT to local temporary directory.
219 If the directory is still there, do not copy again.
220 If copying, also initialize CRC checker (this also performs
221 static checks), and remember PAPI package path.
222 Do not add that to PATH yet.
225 if cls.api_package_path:
227 # Pylint suggests to use "with" statement, which we cannot,
228 # do as the dir should stay for multiple ensure_vpp_instance calls.
229 cls.api_root_dir = tempfile.TemporaryDirectory(dir="/tmp")
230 root_path = cls.api_root_dir.name
231 # Pack, copy and unpack Python part of VPP installation from _node.
232 # TODO: Use rsync or recursive version of ssh.scp_node instead?
234 exec_cmd_no_error(node, ["rm", "-rf", "/tmp/papi.txz"])
235 # Papi python version depends on OS (and time).
236 # Python 3.4 or higher, site-packages or dist-packages.
237 installed_papi_glob = "/usr/lib/python3*/*-packages/vpp_papi"
238 # We need to wrap this command in bash, in order to expand globs,
239 # and as ssh does join, the inner command has to be quoted.
240 inner_cmd = " ".join(
247 "/usr/share/vpp/api",
250 exec_cmd_no_error(node, ["bash", "-c", f"'{inner_cmd}'"])
251 scp_node(node, root_path + "/papi.txz", "/tmp/papi.txz", get=True)
252 run(["tar", "xf", root_path + "/papi.txz", "-C", root_path])
253 cls.api_json_path = root_path + "/usr/share/vpp/api"
254 # Perform initial checks before .api.json files are gone,
255 # by creating the checker instance.
256 cls.crc_checker = VppApiCrcChecker(cls.api_json_path)
257 # When present locally, we finally can find the installation path.
258 cls.api_package_path = glob.glob(root_path + installed_papi_glob)[0]
259 # Package path has to be one level above the vpp_papi directory.
260 cls.api_package_path = cls.api_package_path.rsplit("/", 1)[0]
262 def ensure_vpp_instance(self):
263 """Create or reuse a closed client instance, return it.
265 The instance is initialized for unix domain socket access,
266 it has initialized all the bindings, it is removed from the internal
267 list of disconnected instances, but it is not connected
268 (to a local socket) yet.
270 :returns: VPP client instance ready for connect.
271 :rtype: vpp_papi.VPPApiClient
273 self.ensure_api_dirs()
275 if cls.reusable_vpp_client_list:
276 # Reuse in LIFO fashion.
277 *cls.reusable_vpp_client_list, ret = cls.reusable_vpp_client_list
279 # Creating an instance leads to dynamic imports from VPP PAPI code,
280 # so the package directory has to be present until the instance.
281 # But it is simpler to keep the package dir around.
283 sys.path.append(cls.api_package_path)
284 # TODO: Pylint says import-outside-toplevel and import-error.
285 # It is right, we should refactor the code and move initialization
286 # of package outside.
287 from vpp_papi.vpp_papi import VPPApiClient as vpp_class
289 # The old way. Deduplicate when pre-2402 support is not needed.
291 vpp_class.apidir = cls.api_json_path
292 # We need to create instance before removing from sys.path.
293 # Cannot use loglevel parameter, robot.api.logger lacks the support.
294 vpp_instance = vpp_class(
296 server_address="TBD",
298 # Large read timeout was originally there for VPP-1722,
299 # it may still be helping against AVF device creation failures.
301 logger=FilteredLogger(logger, "INFO"),
303 except vpp_class.VPPRuntimeError:
306 # We need to create instance before removing from sys.path.
307 # Cannot use loglevel parameter, robot.api.logger lacks the support.
308 vpp_instance = vpp_class(
309 apidir=cls.api_json_path,
311 server_address="TBD",
313 # Large read timeout was originally there for VPP-1722,
314 # it may still be helping against AVF device creation failures.
316 logger=FilteredLogger(logger, "INFO"),
318 # The following is needed to prevent union (e.g. Ip4) debug logging
319 # of VPP part of PAPI from spamming robot logs.
320 logging.getLogger("vpp_papi.serializer").setLevel(logging.INFO)
322 if sys.path[-1] == cls.api_package_path:
327 def key_for_node_and_socket(cls, node, remote_socket):
328 """Return a hashable object to distinguish nodes.
330 The usual node object (of "dict" type) is not hashable,
331 and can contain mutable information (mostly virtual interfaces).
332 Use this method to get an object suitable for being a key in dict.
334 The fields to include are chosen by what ssh needs.
336 This class method is needed, for disconnect.
338 :param node: The node object to distinguish.
339 :param remote_socket: Path to remote socket.
341 :type remote_socket: str
342 :return: Tuple of values distinguishing this node from similar ones.
349 # TODO: Do we support sockets paths such as "~/vpp/api.socket"?
354 def key_for_self(self):
355 """Return a hashable object to distinguish nodes.
357 Just a wrapper around key_for_node_and_socket
358 which sets up proper arguments.
360 :return: Tuple of values distinguishing this node from similar ones.
363 return self.__class__.key_for_node_and_socket(
365 self._remote_vpp_socket,
368 def set_connected_client(self, client):
369 """Add a connected client instance into cache.
371 This hides details of what the node key is.
373 If there already is a client for the computed key,
374 fail, as it is a sign of resource leakage.
376 :param client: VPP client instance in connected state.
377 :type client: vpp_papi.VPPApiClient
378 :raises RuntimeError: If related key already has a cached client.
380 key = self.key_for_self()
381 cache = self.__class__.conn_cache
383 raise RuntimeError(f"Caching client with existing key: {key}")
386 def get_connected_client(self, check_connected=True):
387 """Return None or cached connected client.
389 If check_connected, RuntimeError is raised when the client is
390 not in cache. None is returned if client is not in cache
391 (and the check is disabled).
392 Successful retrieval from cache is logged only when check_connected.
394 This hides details of what the node key is.
396 :param check_connected: Whether cache miss raises (and success logs).
397 :type check_connected: bool
398 :returns: Connected client instance, or None if uncached and no check.
399 :rtype: Optional[vpp_papi.VPPApiClient]
400 :raises RuntimeError: If cache miss and check enabled.
402 key = self.key_for_self()
403 ret = self.__class__.conn_cache.get(key, None)
406 raise RuntimeError(f"Client not cached for key: {key}")
407 # When reading logs, it is good to see which VPP is accessed.
408 logger.debug(f"Activated cached PAPI client for key: {key}")
412 """Create a tunnel, connect VPP instance.
414 If the connected client is in cache, return it.
415 Only if not, create a new (or reuse a disconnected) client instance.
417 Only at this point a local socket names are created
418 in a temporary directory, as CSIT can connect to multiple VPPs.
420 The following attributes are added to the client instance
421 to simplify caching and cleanup:
423 - Temporary socket files are created here.
425 - This socket controls the local ssh process doing the forwarding.
426 csit_local_vpp_socket
427 - This is the forwarded socket to talk with remote VPP.
429 - Queue for responses.
431 The attribute names do not start with underscore,
432 so pylint does not complain about accessing private attribute.
433 The attribute names start with csit_ to avoid naming conflicts
434 with "real" attributes from VPP Python code.
437 :rtype: PapiSocketExecutor
439 # Do we have the connected instance in the cache?
440 vpp_instance = self.get_connected_client(check_connected=False)
441 if vpp_instance is not None:
443 # No luck, create and connect a new instance.
444 time_enter = time.monotonic()
446 # Parsing takes longer than connecting, prepare instance before tunnel.
447 vpp_instance = self.ensure_vpp_instance()
448 # Store into cache as soon as possible.
449 # If connection fails, it is better to attempt disconnect anyway.
450 self.set_connected_client(vpp_instance)
451 # Set additional attributes.
452 vpp_instance.csit_temp_dir = tempfile.TemporaryDirectory(dir="/tmp")
453 temp_path = vpp_instance.csit_temp_dir.name
454 api_socket = temp_path + "/vpp-api.sock"
455 vpp_instance.csit_local_vpp_socket = api_socket
456 ssh_socket = temp_path + "/ssh.sock"
457 vpp_instance.csit_control_socket = ssh_socket
458 # Cleanup possibilities.
459 ret_code, _ = run(["ls", ssh_socket], check=False)
461 # This branch never seems to be hit in CI,
462 # but may be useful when testing manually.
464 ["ssh", "-S", ssh_socket, "-O", "exit", "0.0.0.0"],
468 # TODO: Is any sleep necessary? How to prove if not?
469 run(["sleep", "0.1"])
470 run(["rm", "-vrf", ssh_socket])
471 # Even if ssh can perhaps reuse this file,
472 # we need to remove it for readiness detection to work correctly.
473 run(["rm", "-rvf", api_socket])
474 # We use sleep command. The ssh command will exit in 30 second,
475 # unless a local socket connection is established,
476 # in which case the ssh command will exit only when
477 # the ssh connection is closed again (via control socket).
478 # The log level is to suppress "Warning: Permanently added" messages.
485 f"{api_socket}:{self._remote_vpp_socket}",
491 "UserKnownHostsFile=/dev/null",
493 "StrictHostKeyChecking=no",
495 "ExitOnForwardFailure=yes",
496 f"{node['username']}@{node['host']}",
500 priv_key = node.get("priv_key")
502 # This is tricky. We need a file to pass the value to ssh command.
503 # And we need ssh command, because paramiko does not support sockets
504 # (neither ssh_socket, nor _remote_vpp_socket).
505 key_file = tempfile.NamedTemporaryFile()
506 key_file.write(priv_key)
507 # Make sure the content is written, but do not close yet.
509 ssh_cmd[1:1] = ["-i", key_file.name]
510 password = node.get("password")
512 # Prepend sshpass command to set password.
513 ssh_cmd[:0] = ["sshpass", "-p", password]
514 time_stop = time.monotonic() + 10.0
515 # subprocess.Popen seems to be the best way to run commands
516 # on background. Other ways (shell=True with "&" and ssh with -f)
517 # seem to be too dependent on shell behavior.
518 # In particular, -f does NOT return values for run().
519 subprocess.Popen(ssh_cmd)
520 # Check socket presence on local side.
521 while time.monotonic() < time_stop:
522 # It can take a moment for ssh to create the socket file.
523 ret_code, _ = run(["ls", "-l", api_socket], check=False)
528 raise RuntimeError("Local side socket has not appeared.")
530 # Socket up means the key has been read. Delete file by closing it.
532 # Everything is ready, set the local socket address and connect.
533 vpp_instance.transport.server_address = api_socket
534 # It seems we can get read error even if every preceding check passed.
535 # Single retry seems to help. TODO: Confirm this is still needed.
538 vpp_instance.connect("csit_socket", do_async=True)
539 except (IOError, struct.error) as err:
540 logger.warn(f"Got initial connect error {err!r}")
541 vpp_instance.disconnect()
545 raise RuntimeError("Failed to connect to VPP over a socket.")
546 # Only after rls2302 all relevant VPP builds should have do_async.
547 if hasattr(vpp_instance.transport, "do_async"):
549 vpp_instance.csit_deque = deq
550 vpp_instance.register_event_callback(lambda x, y: deq.append(y))
552 vpp_instance.csit_deque = None
553 duration_conn = time.monotonic() - time_enter
554 logger.trace(f"Establishing socket connection took {duration_conn}s.")
557 def __exit__(self, exc_type, exc_val, exc_tb):
558 """No-op, the client instance remains in cache in connected state."""
561 def disconnect_by_key(cls, key):
562 """Disconnect a connected client instance, noop it not connected.
564 Also remove the local sockets by deleting the temporary directory.
565 Put disconnected client instances to the reuse list.
566 The added attributes are not cleaned up,
567 as their values will get overwritten on next connect.
569 This method is useful for disconnect_all type of work.
571 :param key: Tuple identifying the node (and socket).
572 :type key: tuple of str
574 client_instance = cls.conn_cache.get(key, None)
575 if client_instance is None:
577 logger.debug(f"Disconnecting by key: {key}")
578 client_instance.disconnect()
583 client_instance.csit_control_socket,
590 # Temp dir has autoclean, but deleting explicitly
591 # as an error can happen.
593 client_instance.csit_temp_dir.cleanup()
594 except FileNotFoundError:
595 # There is a race condition with ssh removing its ssh.sock file.
596 # Single retry should be enough to ensure the complete removal.
597 shutil.rmtree(client_instance.csit_temp_dir.name)
598 # Finally, put disconnected clients to reuse list.
599 cls.reusable_vpp_client_list.append(client_instance)
600 # Invalidate cache last. Repeated errors are better than silent leaks.
601 del cls.conn_cache[key]
604 def disconnect_by_node_and_socket(
605 cls, node, remote_socket=Constants.SOCKSVR_PATH
607 """Disconnect a connected client instance, noop it not connected.
609 Also remove the local sockets by deleting the temporary directory.
610 Put disconnected client instances to the reuse list.
611 The added attributes are not cleaned up,
612 as their values will get overwritten on next connect.
614 Call this method just before killing/restarting remote VPP instance.
616 key = cls.key_for_node_and_socket(node, remote_socket)
617 return cls.disconnect_by_key(key)
620 def disconnect_all_sockets_by_node(cls, node):
621 """Disconnect all socket connected client instance.
623 Noop if not connected.
625 Also remove the local sockets by deleting the temporary directory.
626 Put disconnected client instances to the reuse list.
627 The added attributes are not cleaned up,
628 as their values will get overwritten on next connect.
630 Call this method just before killing/restarting remote VPP instance.
632 sockets = Topology.get_node_sockets(node, socket_type=SocketType.PAPI)
634 for socket in sockets.values():
635 # TODO: Remove sockets from topology.
636 PapiSocketExecutor.disconnect_by_node_and_socket(node, socket)
637 # Always attempt to disconnect the default socket.
638 return cls.disconnect_by_node_and_socket(node)
641 def disconnect_all_papi_connections():
642 """Disconnect all connected client instances, tear down the SSH tunnels.
644 Also remove the local sockets by deleting the temporary directory.
645 Put disconnected client instances to the reuse list.
646 The added attributes are not cleaned up,
647 as their values will get overwritten on next connect.
649 This should be a class method,
650 but we prefer to call static methods from Robot.
652 Call this method just before killing/restarting all VPP instances.
654 cls = PapiSocketExecutor
655 # Iterate over copy of entries so deletions do not mess with iterator.
656 keys_copy = list(cls.conn_cache.keys())
657 for key in keys_copy:
658 cls.disconnect_by_key(key)
660 def add(self, csit_papi_command, history=True, **kwargs):
661 """Add next command to internal command list; return self.
663 Unless disabled, new entry to papi history is also added at this point.
664 The kwargs dict is serialized or deep-copied, so it is safe to use
665 the original with partial modifications for subsequent calls.
667 Any pending conflicts from .api.json processing are raised.
668 Then the command name is checked for known CRCs.
669 Unsupported commands raise an exception, as CSIT change
670 should not start using messages without making sure which CRCs
672 Each CRC issue is raised only once, so subsequent tests
673 can raise other issues.
675 With async handling mode, this method also serializes and sends
676 the command, skips CRC check to gain speed, and saves memory
677 by putting a sentinel (instead of deepcopy) to api command list.
679 For scale tests, the call sites are responsible to set history values
680 in a way that hints what is done without overwhelming the papi history.
682 Note to contributors: Do not rename "csit_papi_command"
683 to anything VPP could possibly use as an API field name.
685 :param csit_papi_command: VPP API command.
686 :param history: Enable/disable adding command to PAPI command history.
687 :param kwargs: Optional key-value arguments.
688 :type csit_papi_command: str
691 :returns: self, so that method chaining is possible.
692 :rtype: PapiSocketExecutor
693 :raises RuntimeError: If unverified or conflicting CRC is encountered.
695 self.crc_checker.report_initial_conflicts()
697 # No need for deepcopy yet, serialization isolates from edits.
698 PapiHistory.add_to_papi_history(
699 self._node, csit_papi_command, **kwargs
701 self.crc_checker.check_api_name(csit_papi_command)
703 # Save memory but still count the number of expected replies.
704 self._api_command_list.append(0)
705 api_object = self.get_connected_client(check_connected=False).api
706 func = getattr(api_object, csit_papi_command)
707 # No need for deepcopy yet, serialization isolates from edits.
710 # No serialization, so deepcopy is needed here.
711 self._api_command_list.append(
712 dict(api_name=csit_papi_command, api_args=copy.deepcopy(kwargs))
716 def get_replies(self, err_msg="Failed to get replies."):
717 """Get reply for each command from VPP Python API.
719 This method expects one reply per command,
720 and gains performance by reading replies only after
721 sending all commands.
723 The replies are parsed into dict-like objects,
724 "retval" field (if present) is guaranteed to be zero on success.
726 Do not use this for messages with variable number of replies,
727 use get_details instead.
728 Do not use for commands trigering VPP-2033,
729 use series of get_reply instead.
731 :param err_msg: The message used if the PAPI command(s) execution fails.
733 :returns: Responses, dict objects with fields due to API and "retval".
735 :raises RuntimeError: If retval is nonzero, parsing or ssh error.
737 if not self._is_async:
738 raise RuntimeError("Sync handling does not suport get_replies.")
739 return self._execute(err_msg=err_msg, do_async=True)
741 def get_reply(self, err_msg="Failed to get reply."):
742 """Get reply to single command from VPP Python API.
744 This method waits for a single reply (no control ping),
745 thus avoiding bugs like VPP-2033.
747 The reply is parsed into a dict-like object,
748 "retval" field (if present) is guaranteed to be zero on success.
750 :param err_msg: The message used if the PAPI command(s) execution fails.
752 :returns: Response, dict object with fields due to API and "retval".
754 :raises AssertionError: If retval is nonzero, parsing or ssh error.
757 raise RuntimeError("Async handling does not suport get_reply.")
758 replies = self._execute(err_msg=err_msg, do_async=False)
759 if len(replies) != 1:
760 raise RuntimeError(f"Expected single reply, got {replies!r}")
763 def get_sw_if_index(self, err_msg="Failed to get reply."):
764 """Get sw_if_index from reply from VPP Python API.
766 Frequently, the caller is only interested in sw_if_index field
767 of the reply, this wrapper around get_reply (thus safe against VPP-2033)
768 makes such call sites shorter.
770 :param err_msg: The message used if the PAPI command(s) execution fails.
772 :returns: Response, sw_if_index value of the reply.
774 :raises AssertionError: If retval is nonzero, parsing or ssh error.
777 raise RuntimeError("Async handling does not suport get_sw_if_index")
778 reply = self.get_reply(err_msg=err_msg)
779 return reply["sw_if_index"]
781 def get_details(self, err_msg="Failed to get dump details."):
782 """Get details (for possibly multiple dumps) from VPP Python API.
784 The details are parsed into dict-like objects.
785 The number of details per single dump command can vary,
786 and all association between details and dumps is lost,
787 so if you care about the association (as opposed to
788 logging everything at once for debugging purposes),
789 it is recommended to call get_details for each dump (type) separately.
791 This method uses control ping to detect end of replies,
792 so it is not suitable for commands which trigger VPP-2033
793 (but arguably no dump currently triggers it).
795 :param err_msg: The message used if the PAPI command(s) execution fails.
797 :returns: Details, dict objects with fields due to API without "retval".
801 raise RuntimeError("Async handling does not suport get_details.")
802 return self._execute(err_msg, do_async=False, single_reply=False)
806 node, cli_cmd, log=True, remote_vpp_socket=Constants.SOCKSVR_PATH
808 """Run a CLI command as cli_inband, return the "reply" field of reply.
810 Optionally, log the field value.
811 This is a convenience wrapper around get_reply.
813 :param node: Node to run command on.
814 :param cli_cmd: The CLI command to be run on the node.
815 :param remote_vpp_socket: Path to remote socket to tunnel to.
816 :param log: If True, the response is logged.
818 :type remote_vpp_socket: str
821 :returns: CLI output.
825 args = dict(cmd=cli_cmd)
827 f"Failed to run 'cli_inband {cli_cmd}' PAPI command"
828 f" on host {node['host']}"
831 with PapiSocketExecutor(node, remote_vpp_socket) as papi_exec:
832 reply = papi_exec.add(cmd, **args).get_reply(err_msg)["reply"]
835 f"{cli_cmd} ({node['host']} - {remote_vpp_socket}):\n"
841 def run_cli_cmd_on_all_sockets(node, cli_cmd, log=True):
842 """Run a CLI command as cli_inband, on all sockets in topology file.
844 Just a run_cli_cmd, looping over sockets.
846 :param node: Node to run command on.
847 :param cli_cmd: The CLI command to be run on the node.
848 :param log: If True, the response is logged.
853 sockets = Topology.get_node_sockets(node, socket_type=SocketType.PAPI)
855 for socket in sockets.values():
856 PapiSocketExecutor.run_cli_cmd(
857 node, cli_cmd, log=log, remote_vpp_socket=socket
861 def dump_and_log(node, cmds):
862 """Dump and log requested information, return None.
864 Just a get_details (with logging), looping over commands.
866 :param node: DUT node.
867 :param cmds: Dump commands to be executed.
869 :type cmds: list of str
871 with PapiSocketExecutor(node) as papi_exec:
873 dump = papi_exec.add(cmd).get_details()
874 logger.debug(f"{cmd}:\n{pformat(dump)}")
877 def _read_internal(vpp_instance, timeout=None):
878 """Blockingly read within timeout.
880 This covers behaviors both before and after 37758.
881 One read attempt is guaranteed even with zero timeout.
883 TODO: Simplify after 2302 RCA is done.
885 :param vpp_instance: Client instance to read from.
886 :param timeout: How long to wait for reply (or transport default).
887 :type vpp_instance: vpp_papi.VPPApiClient
888 :type timeout: Optional[float]
889 :returns: Message read or None if nothing got read.
890 :rtype: Optional[namedtuple]
892 timeout = vpp_instance.read_timeout if timeout is None else timeout
893 if vpp_instance.csit_deque is None:
894 return vpp_instance.read_blocking(timeout=timeout)
895 time_stop = time.monotonic() + timeout
898 return vpp_instance.csit_deque.popleft()
900 # We could busy-wait but that seems to starve the reader thread.
902 if time.monotonic() > time_stop:
906 def _read(vpp_instance, tries=3):
907 """Blockingly read within timeout, retry on early None.
909 For (sometimes) unknown reasons, VPP client in async mode likes
910 to return None occasionally before time runs out.
911 This function retries in that case.
913 Most of the time, early None means VPP crashed (see VPP-2033),
914 but is is better to give VPP more chances to respond without failure.
916 TODO: Perhaps CSIT now never triggers VPP-2033,
917 so investigate and remove this layer if even more speed is needed.
919 :param vpp_instance: Client instance to read from.
920 :param tries: Maximum number of tries to attempt.
921 :type vpp_instance: vpp_papi.VPPApiClient
923 :returns: Message read or None if nothing got read even with retries.
924 :rtype: Optional[namedtuple]
926 timeout = vpp_instance.read_timeout
927 for _ in range(tries):
928 time_stop = time.monotonic() + 0.9 * timeout
929 reply = PapiSocketExecutor._read_internal(vpp_instance)
930 if reply is None and time.monotonic() < time_stop:
931 logger.trace("Early None. Retry?")
934 logger.trace(f"Got {tries} early Nones, probably a real None.")
938 def _drain(vpp_instance, err_msg, timeout=30.0):
939 """Keep reading with until None or timeout.
941 This is needed to mitigate the risk of a state with unread responses
942 (e.g. after non-zero retval in the middle of get_replies)
943 causing failures in everything subsequent (until disconnect).
945 The reads are done without any waiting.
947 It is possible some responses have not arrived yet,
948 but that is unlikely as Python is usually slower than VPP.
950 :param vpp_instance: Client instance to read from.
951 :param err_msg: Error message to use when overstepping timeout.
952 :param timeout: How long to try before giving up.
953 :type vpp_instance: vpp_papi.VPPApiClient
956 :raises RuntimeError: If read keeps returning nonzero after timeout.
958 time_stop = time.monotonic() + timeout
959 while time.monotonic() < time_stop:
960 if PapiSocketExecutor._read_internal(vpp_instance, 0.0) is None:
962 raise RuntimeError(f"{err_msg}\nTimed out while draining.")
964 def _execute(self, err_msg, do_async, single_reply=True):
965 """Turn internal command list into data and execute; return replies.
967 This method also clears the internal command list.
969 :param err_msg: The message used if the PAPI command(s) execution fails.
970 :param do_async: If true, assume one reply per command and do not wait
971 for each reply before sending next request.
972 Dump commands (and calls causing VPP-2033) need False.
973 :param single_reply: For sync emulation mode (cannot be False
974 if do_async is True). When false use control ping.
975 When true, wait for a single reply.
978 :type single_reply: bool
979 :returns: Papi replies parsed into a dict-like object,
980 with fields due to API (possibly including retval).
981 :rtype: NoneType or list of dict
982 :raises RuntimeError: If the replies are not all correct.
984 local_list = self._api_command_list
985 # Clear first as execution may fail.
986 self._api_command_list = list()
989 raise RuntimeError("Async papi needs one reply per request.")
990 return self._execute_async(local_list, err_msg=err_msg)
991 return self._execute_sync(
992 local_list, err_msg=err_msg, single_reply=single_reply
995 def _execute_sync(self, local_list, err_msg, single_reply):
996 """Execute commands waiting for replies one by one; return replies.
998 This implementation either expects a single response per request,
999 or uses control ping to emulate sync PAPI calls.
1000 Reliable, but slow. Required for dumps. Needed for calls
1001 which trigger VPP-2033.
1003 CRC checking is done for the replies (requests are checked in .add).
1005 :param local_list: The list of PAPI commands to be executed on the node.
1006 :param err_msg: The message used if the PAPI command(s) execution fails.
1007 :param single_reply: When false use control ping.
1008 When true, wait for a single reply.
1009 :type local_list: list of dict
1011 :type single_reply: bool
1012 :returns: Papi replies parsed into a dict-like object,
1013 with fields due to API (possibly including retval).
1014 :rtype: List[UserDict]
1015 :raises AttributeError: If VPP does not know the command.
1016 :raises RuntimeError: If the replies are not all correct.
1018 vpp_instance = self.get_connected_client()
1019 control_ping_fn = getattr(vpp_instance.api, "control_ping")
1021 for command in local_list:
1022 api_name = command["api_name"]
1023 papi_fn = getattr(vpp_instance.api, api_name)
1026 # Send the command maybe followed by control ping.
1027 main_context = papi_fn(**command["api_args"])
1029 replies.append(PapiSocketExecutor._read(vpp_instance))
1031 ping_context = control_ping_fn()
1032 # Receive the replies.
1034 reply = PapiSocketExecutor._read(vpp_instance)
1037 f"{err_msg}\nSync PAPI timed out."
1039 if reply.context == ping_context:
1041 if reply.context != main_context:
1043 f"{err_msg}\nUnexpected context: {reply!r}"
1045 replies.append(reply)
1046 except (AttributeError, IOError, struct.error) as err:
1047 # TODO: Add retry if it is still needed.
1048 raise AssertionError(f"{err_msg}") from err
1050 # Discard any unprocessed replies to avoid secondary failures.
1051 PapiSocketExecutor._drain(vpp_instance, err_msg)
1052 # Process replies for this command.
1053 for reply in replies:
1054 self.crc_checker.check_api_name(reply.__class__.__name__)
1055 dictized_reply = dictize_and_check_retval(reply, err_msg)
1056 ret_list.append(dictized_reply)
1059 def _execute_async(self, local_list, err_msg):
1060 """Read, process and return replies.
1062 The messages were already sent by .add() in this mode,
1063 local_list is used just so we know how many replies to read.
1065 Beware: It is not clear what to do when socket read fails
1066 in the middle of async processing.
1068 The implementation assumes each command results in exactly one reply,
1069 there is no reordering in either commands nor replies,
1070 and context numbers increase one by one (and are matching for replies).
1072 To speed processing up, reply CRC values are not checked.
1074 The current implementation does not limit the number of messages
1075 in-flight, we rely on VPP PAPI background thread to move replies
1076 from socket to queue fast enough.
1078 :param local_list: The list of PAPI commands to get replies for.
1079 :param err_msg: The message used if the PAPI command(s) execution fails.
1080 :type local_list: list
1082 :returns: Papi replies parsed into a dict-like object, with fields
1083 according to API (possibly including retval).
1084 :rtype: List[UserDict]
1085 :raises RuntimeError: If the replies are not all correct.
1087 vpp_instance = self.get_connected_client()
1090 for index, _ in enumerate(local_list):
1091 # Blocks up to timeout.
1092 reply = PapiSocketExecutor._read(vpp_instance)
1094 time_msg = f"PAPI async timeout: idx {index}"
1095 raise RuntimeError(f"{err_msg}\n{time_msg}")
1096 ret_list.append(dictize_and_check_retval(reply, err_msg))
1098 # Discard any unprocessed replies to avoid secondary failures.
1099 PapiSocketExecutor._drain(vpp_instance, err_msg)
1104 """Class for holding a single keyword."""
1107 def disconnect_all_papi_connections():
1108 """Disconnect all connected client instances, tear down the SSH tunnels.
1110 Also remove the local sockets by deleting the temporary directory.
1111 Put disconnected client instances to the reuse list.
1112 The added attributes are not cleaned up,
1113 as their values will get overwritten on next connect.
1115 Call this method just before killing/restarting all VPP instances.
1117 This could be a class method of PapiSocketExecutor.
1118 But Robot calls methods on instances, and it would be weird
1119 to give node argument for constructor in import.
1120 Also, as we have a class of the same name as the module,
1121 the keywords defined on module level are not accessible.
1123 cls = PapiSocketExecutor
1124 # Iterate over copy of entries so deletions do not mess with iterator.
1125 for key in list(cls.conn_cache.keys()):
1126 cls.disconnect_by_key(key)
1130 """Contains methods for executing VPP Python API commands on DUTs.
1132 TODO: Remove .add step, make get_stats accept paths directly.
1134 This class processes only one type of VPP PAPI methods: vpp-stats.
1136 The recommended ways of use are (examples):
1138 path = ['^/if', '/err/ip4-input', '/sys/node/ip4-input']
1139 with PapiExecutor(node) as papi_exec:
1140 stats = papi_exec.add(api_name='vpp-stats', path=path).get_stats()
1142 print('RX interface core 0, sw_if_index 0:\n{0}'.\
1143 format(stats[0]['/if/rx'][0][0]))
1148 path_2 = ['^/if', '/err/ip4-input', '/sys/node/ip4-input']
1149 with PapiExecutor(node) as papi_exec:
1150 stats = papi_exec.add('vpp-stats', path=path_1).\
1151 add('vpp-stats', path=path_2).get_stats()
1153 print('RX interface core 0, sw_if_index 0:\n{0}'.\
1154 format(stats[1]['/if/rx'][0][0]))
1156 Note: In this case, when PapiExecutor method 'add' is used:
1157 - its parameter 'csit_papi_command' is used only to keep information
1158 that vpp-stats are requested. It is not further processed but it is
1159 included in the PAPI history this way:
1160 vpp-stats(path=['^/if', '/err/ip4-input', '/sys/node/ip4-input'])
1161 Always use csit_papi_command="vpp-stats" if the VPP PAPI method
1163 - the second parameter must be 'path' as it is used by PapiExecutor
1165 - even if the parameter contains multiple paths, there is only one
1166 reply item (for each .add).
1169 def __init__(self, node):
1172 :param node: Node to run command(s) on.
1175 # Node to run command(s) on.
1178 # The list of PAPI commands to be executed on the node.
1179 self._api_command_list = list()
1183 def __enter__(self):
1185 self._ssh.connect(self._node)
1186 except IOError as err:
1187 msg = f"PAPI: Cannot open SSH connection to {self._node['host']}"
1188 raise RuntimeError(msg) from err
1191 def __exit__(self, exc_type, exc_val, exc_tb):
1192 self._ssh.disconnect(self._node)
1194 def add(self, csit_papi_command="vpp-stats", history=True, **kwargs):
1195 """Add next command to internal command list; return self.
1197 The argument name 'csit_papi_command' must be unique enough as it cannot
1198 be repeated in kwargs.
1199 The kwargs dict is deep-copied, so it is safe to use the original
1200 with partial modifications for subsequent commands.
1202 :param csit_papi_command: VPP API command.
1203 :param history: Enable/disable adding command to PAPI command history.
1204 :param kwargs: Optional key-value arguments.
1205 :type csit_papi_command: str
1208 :returns: self, so that method chaining is possible.
1209 :rtype: PapiExecutor
1212 PapiHistory.add_to_papi_history(
1213 self._node, csit_papi_command, **kwargs
1215 self._api_command_list.append(
1216 dict(api_name=csit_papi_command, api_args=copy.deepcopy(kwargs))
1222 err_msg="Failed to get statistics.",
1224 socket=Constants.SOCKSTAT_PATH,
1226 """Get VPP Stats from VPP Python API.
1228 :param err_msg: The message used if the PAPI command(s) execution fails.
1229 :param timeout: Timeout in seconds.
1230 :param socket: Path to Stats socket to tunnel to.
1234 :returns: Requested VPP statistics.
1235 :rtype: list of dict
1237 paths = [cmd["api_args"]["path"] for cmd in self._api_command_list]
1238 self._api_command_list = list()
1240 stdout = self._execute_papi(
1248 return json.loads(stdout)
1251 def _process_api_data(api_d):
1252 """Process API data for smooth converting to JSON string.
1254 Apply binascii.hexlify() method for string values.
1256 :param api_d: List of APIs with their arguments.
1258 :returns: List of APIs with arguments pre-processed for JSON.
1262 def process_value(val):
1265 :param val: Value to be processed.
1267 :returns: Processed value.
1268 :rtype: dict or str or int
1270 if isinstance(val, dict):
1271 for val_k, val_v in val.items():
1272 val[str(val_k)] = process_value(val_v)
1274 elif isinstance(val, list):
1275 for idx, val_l in enumerate(val):
1276 val[idx] = process_value(val_l)
1279 retval = val.encode().hex() if isinstance(val, str) else val
1282 api_data_processed = list()
1284 api_args_processed = dict()
1285 for a_k, a_v in api["api_args"].items():
1286 api_args_processed[str(a_k)] = process_value(a_v)
1287 api_data_processed.append(
1288 dict(api_name=api["api_name"], api_args=api_args_processed)
1290 return api_data_processed
1293 self, api_data, method="request", err_msg="", timeout=120, socket=None
1295 """Execute PAPI command(s) on remote node and store the result.
1297 :param api_data: List of APIs with their arguments.
1298 :param method: VPP Python API method. Supported methods are: 'request',
1300 :param err_msg: The message used if the PAPI command(s) execution fails.
1301 :param timeout: Timeout in seconds.
1302 :type api_data: list
1306 :returns: Stdout from remote python utility, to be parsed by caller.
1308 :raises SSHTimeout: If PAPI command(s) execution has timed out.
1309 :raises RuntimeError: If PAPI executor failed due to another reason.
1310 :raises AssertionError: If PAPI command(s) execution has failed.
1313 raise RuntimeError("No API data provided.")
1316 json.dumps(api_data)
1317 if method in ("stats", "stats_request")
1318 else json.dumps(self._process_api_data(api_data))
1321 sock = f" --socket {socket}" if socket else ""
1323 f"{Constants.REMOTE_FW_DIR}/{Constants.RESOURCES_PAPI_PROVIDER}"
1324 f" --method {method} --data '{json_data}'{sock}"
1327 ret_code, stdout, _ = self._ssh.exec_command_sudo(
1328 cmd=cmd, timeout=timeout, log_stdout_err=False
1330 # TODO: Fail on non-empty stderr?
1333 f"PAPI command(s) execution timeout on host"
1334 f" {self._node['host']}:\n{api_data}"
1337 except Exception as exc:
1339 f"PAPI command(s) execution on host {self._node['host']}"
1340 f" failed: {api_data}"
1343 raise AssertionError(err_msg)