1 # Copyright (c) 2023 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Python API executor library.
16 TODO: Document sync and async handling properly.
24 import struct # vpp-papi can raise struct.error
29 from collections import deque, UserDict
31 from pprint import pformat
32 from robot.api import logger
34 from resources.libraries.python.Constants import Constants
35 from resources.libraries.python.LocalExecution import run
36 from resources.libraries.python.FilteredLogger import FilteredLogger
37 from resources.libraries.python.PapiHistory import PapiHistory
38 from resources.libraries.python.ssh import (
44 from resources.libraries.python.topology import Topology, SocketType
45 from resources.libraries.python.VppApiCrc import VppApiCrcChecker
56 """A helper method, to make namedtuple-like object accessible as dict.
58 If the object is namedtuple-like, its _asdict() form is returned,
59 but in the returned object __getitem__ method is wrapped
60 to dictize also any items returned.
61 If the object does not have _asdict, it will be returned without any change.
62 Integer keys still access the object as tuple.
64 A more useful version would be to keep obj mostly as a namedtuple,
65 just add getitem for string keys. Unfortunately, namedtuple inherits
66 from tuple, including its read-only __getitem__ attribute,
67 so we cannot monkey-patch it.
69 TODO: Create a proxy for named tuple to allow that.
71 :param obj: Arbitrary object to dictize.
73 :returns: Dictized object.
74 :rtype: same as obj type or collections.UserDict
76 if not hasattr(obj, "_asdict"):
78 overriden = UserDict(obj._asdict())
79 old_get = overriden.__getitem__
80 overriden.__getitem__ = lambda self, key: dictize(old_get(self, key))
84 def dictize_and_check_retval(obj, err_msg):
85 """Make namedtuple-like object accessible as dict, check retval if exists.
87 If the object contains "retval" field, raise when the value is non-zero.
89 See dictize() for what it means to dictize.
91 :param obj: Arbitrary object to dictize.
92 :param err_msg: The (additional) text for the raised exception.
95 :returns: Dictized object.
96 :rtype: same as obj type or collections.UserDict
97 :raises AssertionError: If retval field is present with nonzero value.
100 # *_details messages do not contain retval.
101 retval = ret.get("retval", 0)
103 raise AssertionError(f"{err_msg}\nRetval nonzero in object {ret!r}")
107 class PapiSocketExecutor:
108 """Methods for executing VPP Python API commands on forwarded socket.
110 The current implementation downloads and parses .api.json files only once
111 and caches client instances for reuse.
112 Cleanup metadata is added as additional attributes
113 directly to the client instances.
115 The current implementation caches the connected client instances.
116 As a downside, clients need to be explicitly told to disconnect
119 The current implementation seems to run into read error occasionally.
120 Not sure if the error is in Python code on Robot side, ssh forwarding,
121 or socket handling at VPP side. Anyway, reconnect after some sleep
122 seems to help, hoping repeated command execution does not lead to surprises.
123 The reconnection is logged at WARN level, so it is prominently shown
124 in log.html, so we can see how frequently it happens.
125 There are similar retries cleanups in other places
126 (so unresponsive VPPs do not break test much more than needed),
127 but it is hard to verify all that works correctly.
128 Especially, if Robot crashes, files and ssh processes may leak.
130 TODO: Decrease current timeout value when creating connections
131 so broken VPP does not prolong job duration too much
132 while good VPP (almost) never fails to connect.
134 TODO: Support handling of retval!=0 without try/except in caller.
136 This class processes two classes of VPP PAPI methods:
137 1. Simple request / reply: method='request'.
138 2. Dump functions: method='dump'.
140 Note that access to VPP stats over socket is not supported yet.
142 The recommended ways of use are (examples):
144 1. Simple request / reply. Example with no arguments:
147 with PapiSocketExecutor(node) as papi_exec:
148 reply = papi_exec.add(cmd).get_reply(err_msg)
152 cmd = "sw_interface_rx_placement_dump"
153 with PapiSocketExecutor(node) as papi_exec:
154 papi_exec.add(cmd, sw_if_index=ifc["vpp_sw_index"])
155 details = papi_exec.get_details(err_msg)
157 3. Multiple requests with one reply each.
158 In this example, there are three requests with arguments,
159 the second and the third ones are the same but with different arguments.
160 This example also showcases method chaining.
162 with PapiSocketExecutor(node, is_async=True) as papi_exec:
163 replies = papi_exec.add(cmd1, **args1).add(cmd2, **args2).\
164 add(cmd2, **args3).get_replies(err_msg)
166 The "is_async=True" part in the last example enables "async handling mode",
167 which imposes limitations but gains speed and saves memory.
168 This is different than async mode of VPP PAPI, as the default handling mode
169 also uses async PAPI connections.
171 The implementation contains more hidden details, such as
172 support for old VPP PAPI async mode behavior, API CRC checking
173 conditional usage of control ping, and possible susceptibility to VPP-2033.
174 See docstring of methods for more detailed info.
177 # Class cache for reuse between instances.
179 """We copy .api json files and PAPI code from DUT to robot machine.
180 This class variable holds temporary directory once created.
181 When python exits, the directory is deleted, so no downloaded file leaks.
182 The value will be set to TemporaryDirectory class instance (not string path)
183 to ensure deletion at exit."""
185 """String path to .api.json files, a directory somewhere in api_root_dir."""
186 api_package_path = None
187 """String path to PAPI code, a different directory under api_root_dir."""
189 """Accesses .api.json files at creation, caching speeds up accessing it."""
190 reusable_vpp_client_list = list()
191 """Each connection needs a separate client instance,
192 and each client instance creation needs to parse all .api files,
193 which takes time. If a client instance disconnects, it is put here,
194 so on next connect we can reuse intead of creating new."""
196 """Mapping from node key to connected client instance."""
199 self, node, remote_vpp_socket=Constants.SOCKSVR_PATH, is_async=False
201 """Store the given arguments, declare managed variables.
203 :param node: Node to connect to and forward unix domain socket from.
204 :param remote_vpp_socket: Path to remote socket to tunnel to.
205 :param is_async: Whether to use async handling.
207 :type remote_vpp_socket: str
211 self._remote_vpp_socket = remote_vpp_socket
212 self._is_async = is_async
213 # The list of PAPI commands to be executed on the node.
214 self._api_command_list = list()
216 def ensure_api_dirs(self):
217 """Copy files from DUT to local temporary directory.
219 If the directory is still there, do not copy again.
220 If copying, also initialize CRC checker (this also performs
221 static checks), and remember PAPI package path.
222 Do not add that to PATH yet.
225 if cls.api_package_path:
227 # Pylint suggests to use "with" statement, which we cannot,
228 # do as the dir should stay for multiple ensure_vpp_instance calls.
229 cls.api_root_dir = tempfile.TemporaryDirectory(dir="/tmp")
230 root_path = cls.api_root_dir.name
231 # Pack, copy and unpack Python part of VPP installation from _node.
232 # TODO: Use rsync or recursive version of ssh.scp_node instead?
234 exec_cmd_no_error(node, ["rm", "-rf", "/tmp/papi.txz"])
235 # Papi python version depends on OS (and time).
236 # Python 3.4 or higher, site-packages or dist-packages.
237 installed_papi_glob = "/usr/lib/python3*/*-packages/vpp_papi"
238 # We need to wrap this command in bash, in order to expand globs,
239 # and as ssh does join, the inner command has to be quoted.
240 inner_cmd = " ".join(
247 "/usr/share/vpp/api",
250 exec_cmd_no_error(node, ["bash", "-c", f"'{inner_cmd}'"])
251 scp_node(node, root_path + "/papi.txz", "/tmp/papi.txz", get=True)
252 run(["tar", "xf", root_path + "/papi.txz", "-C", root_path])
253 cls.api_json_path = root_path + "/usr/share/vpp/api"
254 # Perform initial checks before .api.json files are gone,
255 # by creating the checker instance.
256 cls.crc_checker = VppApiCrcChecker(cls.api_json_path)
257 # When present locally, we finally can find the installation path.
258 cls.api_package_path = glob.glob(root_path + installed_papi_glob)[0]
259 # Package path has to be one level above the vpp_papi directory.
260 cls.api_package_path = cls.api_package_path.rsplit("/", 1)[0]
262 def ensure_vpp_instance(self):
263 """Create or reuse a closed client instance, return it.
265 The instance is initialized for unix domain socket access,
266 it has initialized all the bindings, it is removed from the internal
267 list of disconnected instances, but it is not connected
268 (to a local socket) yet.
270 :returns: VPP client instance ready for connect.
271 :rtype: vpp_papi.VPPApiClient
273 self.ensure_api_dirs()
275 if cls.reusable_vpp_client_list:
276 # Reuse in LIFO fashion.
277 *cls.reusable_vpp_client_list, ret = cls.reusable_vpp_client_list
279 # Creating an instance leads to dynamic imports from VPP PAPI code,
280 # so the package directory has to be present until the instance.
281 # But it is simpler to keep the package dir around.
283 sys.path.append(cls.api_package_path)
284 # TODO: Pylint says import-outside-toplevel and import-error.
285 # It is right, we should refactor the code and move initialization
286 # of package outside.
287 from vpp_papi.vpp_papi import VPPApiClient as vpp_class
289 vpp_class.apidir = cls.api_json_path
290 # We need to create instance before removing from sys.path.
291 # Cannot use loglevel parameter, robot.api.logger lacks the support.
292 vpp_instance = vpp_class(
294 server_address="TBD",
296 # Large read timeout was originally there for VPP-1722,
297 # it may still be helping against AVF device creation failures.
299 logger=FilteredLogger(logger, "INFO"),
301 # The following is needed to prevent union (e.g. Ip4) debug logging
302 # of VPP part of PAPI from spamming robot logs.
303 logging.getLogger("vpp_papi.serializer").setLevel(logging.INFO)
305 if sys.path[-1] == cls.api_package_path:
310 def key_for_node_and_socket(cls, node, remote_socket):
311 """Return a hashable object to distinguish nodes.
313 The usual node object (of "dict" type) is not hashable,
314 and can contain mutable information (mostly virtual interfaces).
315 Use this method to get an object suitable for being a key in dict.
317 The fields to include are chosen by what ssh needs.
319 This class method is needed, for disconnect.
321 :param node: The node object to distinguish.
322 :param remote_socket: Path to remote socket.
324 :type remote_socket: str
325 :return: Tuple of values distinguishing this node from similar ones.
332 # TODO: Do we support sockets paths such as "~/vpp/api.socket"?
337 def key_for_self(self):
338 """Return a hashable object to distinguish nodes.
340 Just a wrapper around key_for_node_and_socket
341 which sets up proper arguments.
343 :return: Tuple of values distinguishing this node from similar ones.
346 return self.__class__.key_for_node_and_socket(
348 self._remote_vpp_socket,
351 def set_connected_client(self, client):
352 """Add a connected client instance into cache.
354 This hides details of what the node key is.
356 If there already is a client for the computed key,
357 fail, as it is a sign of resource leakage.
359 :param client: VPP client instance in connected state.
360 :type client: vpp_papi.VPPApiClient
361 :raises RuntimeError: If related key already has a cached client.
363 key = self.key_for_self()
364 cache = self.__class__.conn_cache
366 raise RuntimeError(f"Caching client with existing key: {key}")
369 def get_connected_client(self, check_connected=True):
370 """Return None or cached connected client.
372 If check_connected, RuntimeError is raised when the client is
373 not in cache. None is returned if client is not in cache
374 (and the check is disabled).
375 Successful retrieval from cache is logged only when check_connected.
377 This hides details of what the node key is.
379 :param check_connected: Whether cache miss raises (and success logs).
380 :type check_connected: bool
381 :returns: Connected client instance, or None if uncached and no check.
382 :rtype: Optional[vpp_papi.VPPApiClient]
383 :raises RuntimeError: If cache miss and check enabled.
385 key = self.key_for_self()
386 ret = self.__class__.conn_cache.get(key, None)
389 raise RuntimeError(f"Client not cached for key: {key}")
390 # When reading logs, it is good to see which VPP is accessed.
391 logger.debug(f"Activated cached PAPI client for key: {key}")
395 """Create a tunnel, connect VPP instance.
397 If the connected client is in cache, return it.
398 Only if not, create a new (or reuse a disconnected) client instance.
400 Only at this point a local socket names are created
401 in a temporary directory, as CSIT can connect to multiple VPPs.
403 The following attributes are added to the client instance
404 to simplify caching and cleanup:
406 - Temporary socket files are created here.
408 - This socket controls the local ssh process doing the forwarding.
409 csit_local_vpp_socket
410 - This is the forwarded socket to talk with remote VPP.
412 - Queue for responses.
414 The attribute names do not start with underscore,
415 so pylint does not complain about accessing private attribute.
416 The attribute names start with csit_ to avoid naming conflicts
417 with "real" attributes from VPP Python code.
420 :rtype: PapiSocketExecutor
422 # Do we have the connected instance in the cache?
423 vpp_instance = self.get_connected_client(check_connected=False)
424 if vpp_instance is not None:
426 # No luck, create and connect a new instance.
427 time_enter = time.monotonic()
429 # Parsing takes longer than connecting, prepare instance before tunnel.
430 vpp_instance = self.ensure_vpp_instance()
431 # Store into cache as soon as possible.
432 # If connection fails, it is better to attempt disconnect anyway.
433 self.set_connected_client(vpp_instance)
434 # Set additional attributes.
435 vpp_instance.csit_temp_dir = tempfile.TemporaryDirectory(dir="/tmp")
436 temp_path = vpp_instance.csit_temp_dir.name
437 api_socket = temp_path + "/vpp-api.sock"
438 vpp_instance.csit_local_vpp_socket = api_socket
439 ssh_socket = temp_path + "/ssh.sock"
440 vpp_instance.csit_control_socket = ssh_socket
441 # Cleanup possibilities.
442 ret_code, _ = run(["ls", ssh_socket], check=False)
444 # This branch never seems to be hit in CI,
445 # but may be useful when testing manually.
447 ["ssh", "-S", ssh_socket, "-O", "exit", "0.0.0.0"],
451 # TODO: Is any sleep necessary? How to prove if not?
452 run(["sleep", "0.1"])
453 run(["rm", "-vrf", ssh_socket])
454 # Even if ssh can perhaps reuse this file,
455 # we need to remove it for readiness detection to work correctly.
456 run(["rm", "-rvf", api_socket])
457 # We use sleep command. The ssh command will exit in 30 second,
458 # unless a local socket connection is established,
459 # in which case the ssh command will exit only when
460 # the ssh connection is closed again (via control socket).
461 # The log level is to suppress "Warning: Permanently added" messages.
468 f"{api_socket}:{self._remote_vpp_socket}",
474 "UserKnownHostsFile=/dev/null",
476 "StrictHostKeyChecking=no",
478 "ExitOnForwardFailure=yes",
479 f"{node['username']}@{node['host']}",
483 priv_key = node.get("priv_key")
485 # This is tricky. We need a file to pass the value to ssh command.
486 # And we need ssh command, because paramiko does not support sockets
487 # (neither ssh_socket, nor _remote_vpp_socket).
488 key_file = tempfile.NamedTemporaryFile()
489 key_file.write(priv_key)
490 # Make sure the content is written, but do not close yet.
492 ssh_cmd[1:1] = ["-i", key_file.name]
493 password = node.get("password")
495 # Prepend sshpass command to set password.
496 ssh_cmd[:0] = ["sshpass", "-p", password]
497 time_stop = time.monotonic() + 10.0
498 # subprocess.Popen seems to be the best way to run commands
499 # on background. Other ways (shell=True with "&" and ssh with -f)
500 # seem to be too dependent on shell behavior.
501 # In particular, -f does NOT return values for run().
502 subprocess.Popen(ssh_cmd)
503 # Check socket presence on local side.
504 while time.monotonic() < time_stop:
505 # It can take a moment for ssh to create the socket file.
506 ret_code, _ = run(["ls", "-l", api_socket], check=False)
511 raise RuntimeError("Local side socket has not appeared.")
513 # Socket up means the key has been read. Delete file by closing it.
515 # Everything is ready, set the local socket address and connect.
516 vpp_instance.transport.server_address = api_socket
517 # It seems we can get read error even if every preceding check passed.
518 # Single retry seems to help. TODO: Confirm this is still needed.
521 vpp_instance.connect("csit_socket", do_async=True)
522 except (IOError, struct.error) as err:
523 logger.warn(f"Got initial connect error {err!r}")
524 vpp_instance.disconnect()
528 raise RuntimeError("Failed to connect to VPP over a socket.")
529 # Only after rls2302 all relevant VPP builds should have do_async.
530 if hasattr(vpp_instance.transport, "do_async"):
532 vpp_instance.csit_deque = deq
533 vpp_instance.register_event_callback(lambda x, y: deq.append(y))
535 vpp_instance.csit_deque = None
536 duration_conn = time.monotonic() - time_enter
537 logger.trace(f"Establishing socket connection took {duration_conn}s.")
540 def __exit__(self, exc_type, exc_val, exc_tb):
541 """No-op, the client instance remains in cache in connected state."""
544 def disconnect_by_key(cls, key):
545 """Disconnect a connected client instance, noop it not connected.
547 Also remove the local sockets by deleting the temporary directory.
548 Put disconnected client instances to the reuse list.
549 The added attributes are not cleaned up,
550 as their values will get overwritten on next connect.
552 This method is useful for disconnect_all type of work.
554 :param key: Tuple identifying the node (and socket).
555 :type key: tuple of str
557 client_instance = cls.conn_cache.get(key, None)
558 if client_instance is None:
560 logger.debug(f"Disconnecting by key: {key}")
561 client_instance.disconnect()
566 client_instance.csit_control_socket,
573 # Temp dir has autoclean, but deleting explicitly
574 # as an error can happen.
576 client_instance.csit_temp_dir.cleanup()
577 except FileNotFoundError:
578 # There is a race condition with ssh removing its ssh.sock file.
579 # Single retry should be enough to ensure the complete removal.
580 shutil.rmtree(client_instance.csit_temp_dir.name)
581 # Finally, put disconnected clients to reuse list.
582 cls.reusable_vpp_client_list.append(client_instance)
583 # Invalidate cache last. Repeated errors are better than silent leaks.
584 del cls.conn_cache[key]
587 def disconnect_by_node_and_socket(
588 cls, node, remote_socket=Constants.SOCKSVR_PATH
590 """Disconnect a connected client instance, noop it not connected.
592 Also remove the local sockets by deleting the temporary directory.
593 Put disconnected client instances to the reuse list.
594 The added attributes are not cleaned up,
595 as their values will get overwritten on next connect.
597 Call this method just before killing/restarting remote VPP instance.
599 key = cls.key_for_node_and_socket(node, remote_socket)
600 return cls.disconnect_by_key(key)
603 def disconnect_all_sockets_by_node(cls, node):
604 """Disconnect all socket connected client instance.
606 Noop if not connected.
608 Also remove the local sockets by deleting the temporary directory.
609 Put disconnected client instances to the reuse list.
610 The added attributes are not cleaned up,
611 as their values will get overwritten on next connect.
613 Call this method just before killing/restarting remote VPP instance.
615 sockets = Topology.get_node_sockets(node, socket_type=SocketType.PAPI)
617 for socket in sockets.values():
618 # TODO: Remove sockets from topology.
619 PapiSocketExecutor.disconnect_by_node_and_socket(node, socket)
620 # Always attempt to disconnect the default socket.
621 return cls.disconnect_by_node_and_socket(node)
624 def disconnect_all_papi_connections():
625 """Disconnect all connected client instances, tear down the SSH tunnels.
627 Also remove the local sockets by deleting the temporary directory.
628 Put disconnected client instances to the reuse list.
629 The added attributes are not cleaned up,
630 as their values will get overwritten on next connect.
632 This should be a class method,
633 but we prefer to call static methods from Robot.
635 Call this method just before killing/restarting all VPP instances.
637 cls = PapiSocketExecutor
638 # Iterate over copy of entries so deletions do not mess with iterator.
639 keys_copy = list(cls.conn_cache.keys())
640 for key in keys_copy:
641 cls.disconnect_by_key(key)
643 def add(self, csit_papi_command, history=True, **kwargs):
644 """Add next command to internal command list; return self.
646 Unless disabled, new entry to papi history is also added at this point.
647 The kwargs dict is serialized or deep-copied, so it is safe to use
648 the original with partial modifications for subsequent calls.
650 Any pending conflicts from .api.json processing are raised.
651 Then the command name is checked for known CRCs.
652 Unsupported commands raise an exception, as CSIT change
653 should not start using messages without making sure which CRCs
655 Each CRC issue is raised only once, so subsequent tests
656 can raise other issues.
658 With async handling mode, this method also serializes and sends
659 the command, skips CRC check to gain speed, and saves memory
660 by putting a sentinel (instead of deepcopy) to api command list.
662 For scale tests, the call sites are responsible to set history values
663 in a way that hints what is done without overwhelming the papi history.
665 Note to contributors: Do not rename "csit_papi_command"
666 to anything VPP could possibly use as an API field name.
668 :param csit_papi_command: VPP API command.
669 :param history: Enable/disable adding command to PAPI command history.
670 :param kwargs: Optional key-value arguments.
671 :type csit_papi_command: str
674 :returns: self, so that method chaining is possible.
675 :rtype: PapiSocketExecutor
676 :raises RuntimeError: If unverified or conflicting CRC is encountered.
678 self.crc_checker.report_initial_conflicts()
680 # No need for deepcopy yet, serialization isolates from edits.
681 PapiHistory.add_to_papi_history(
682 self._node, csit_papi_command, **kwargs
684 self.crc_checker.check_api_name(csit_papi_command)
686 # Save memory but still count the number of expected replies.
687 self._api_command_list.append(0)
688 api_object = self.get_connected_client(check_connected=False).api
689 func = getattr(api_object, csit_papi_command)
690 # No need for deepcopy yet, serialization isolates from edits.
693 # No serialization, so deepcopy is needed here.
694 self._api_command_list.append(
695 dict(api_name=csit_papi_command, api_args=copy.deepcopy(kwargs))
699 def get_replies(self, err_msg="Failed to get replies."):
700 """Get reply for each command from VPP Python API.
702 This method expects one reply per command,
703 and gains performance by reading replies only after
704 sending all commands.
706 The replies are parsed into dict-like objects,
707 "retval" field (if present) is guaranteed to be zero on success.
709 Do not use this for messages with variable number of replies,
710 use get_details instead.
711 Do not use for commands trigering VPP-2033,
712 use series of get_reply instead.
714 :param err_msg: The message used if the PAPI command(s) execution fails.
716 :returns: Responses, dict objects with fields due to API and "retval".
718 :raises RuntimeError: If retval is nonzero, parsing or ssh error.
720 if not self._is_async:
721 raise RuntimeError("Sync handling does not suport get_replies.")
722 return self._execute(err_msg=err_msg, do_async=True)
724 def get_reply(self, err_msg="Failed to get reply."):
725 """Get reply to single command from VPP Python API.
727 This method waits for a single reply (no control ping),
728 thus avoiding bugs like VPP-2033.
730 The reply is parsed into a dict-like object,
731 "retval" field (if present) is guaranteed to be zero on success.
733 :param err_msg: The message used if the PAPI command(s) execution fails.
735 :returns: Response, dict object with fields due to API and "retval".
737 :raises AssertionError: If retval is nonzero, parsing or ssh error.
740 raise RuntimeError("Async handling does not suport get_reply.")
741 replies = self._execute(err_msg=err_msg, do_async=False)
742 if len(replies) != 1:
743 raise RuntimeError(f"Expected single reply, got {replies!r}")
746 def get_sw_if_index(self, err_msg="Failed to get reply."):
747 """Get sw_if_index from reply from VPP Python API.
749 Frequently, the caller is only interested in sw_if_index field
750 of the reply, this wrapper around get_reply (thus safe against VPP-2033)
751 makes such call sites shorter.
753 :param err_msg: The message used if the PAPI command(s) execution fails.
755 :returns: Response, sw_if_index value of the reply.
757 :raises AssertionError: If retval is nonzero, parsing or ssh error.
760 raise RuntimeError("Async handling does not suport get_sw_if_index")
761 reply = self.get_reply(err_msg=err_msg)
762 return reply["sw_if_index"]
764 def get_details(self, err_msg="Failed to get dump details."):
765 """Get details (for possibly multiple dumps) from VPP Python API.
767 The details are parsed into dict-like objects.
768 The number of details per single dump command can vary,
769 and all association between details and dumps is lost,
770 so if you care about the association (as opposed to
771 logging everything at once for debugging purposes),
772 it is recommended to call get_details for each dump (type) separately.
774 This method uses control ping to detect end of replies,
775 so it is not suitable for commands which trigger VPP-2033
776 (but arguably no dump currently triggers it).
778 :param err_msg: The message used if the PAPI command(s) execution fails.
780 :returns: Details, dict objects with fields due to API without "retval".
784 raise RuntimeError("Async handling does not suport get_details.")
785 return self._execute(err_msg, do_async=False, single_reply=False)
789 node, cli_cmd, log=True, remote_vpp_socket=Constants.SOCKSVR_PATH
791 """Run a CLI command as cli_inband, return the "reply" field of reply.
793 Optionally, log the field value.
794 This is a convenience wrapper around get_reply.
796 :param node: Node to run command on.
797 :param cli_cmd: The CLI command to be run on the node.
798 :param remote_vpp_socket: Path to remote socket to tunnel to.
799 :param log: If True, the response is logged.
801 :type remote_vpp_socket: str
804 :returns: CLI output.
808 args = dict(cmd=cli_cmd)
810 f"Failed to run 'cli_inband {cli_cmd}' PAPI command"
811 f" on host {node['host']}"
814 with PapiSocketExecutor(node, remote_vpp_socket) as papi_exec:
815 reply = papi_exec.add(cmd, **args).get_reply(err_msg)["reply"]
818 f"{cli_cmd} ({node['host']} - {remote_vpp_socket}):\n"
824 def run_cli_cmd_on_all_sockets(node, cli_cmd, log=True):
825 """Run a CLI command as cli_inband, on all sockets in topology file.
827 Just a run_cli_cmd, looping over sockets.
829 :param node: Node to run command on.
830 :param cli_cmd: The CLI command to be run on the node.
831 :param log: If True, the response is logged.
836 sockets = Topology.get_node_sockets(node, socket_type=SocketType.PAPI)
838 for socket in sockets.values():
839 PapiSocketExecutor.run_cli_cmd(
840 node, cli_cmd, log=log, remote_vpp_socket=socket
844 def dump_and_log(node, cmds):
845 """Dump and log requested information, return None.
847 Just a get_details (with logging), looping over commands.
849 :param node: DUT node.
850 :param cmds: Dump commands to be executed.
852 :type cmds: list of str
854 with PapiSocketExecutor(node) as papi_exec:
856 dump = papi_exec.add(cmd).get_details()
857 logger.debug(f"{cmd}:\n{pformat(dump)}")
860 def _read_internal(vpp_instance, timeout=None):
861 """Blockingly read within timeout.
863 This covers behaviors both before and after 37758.
864 One read attempt is guaranteed even with zero timeout.
866 TODO: Simplify after 2302 RCA is done.
868 :param vpp_instance: Client instance to read from.
869 :param timeout: How long to wait for reply (or transport default).
870 :type vpp_instance: vpp_papi.VPPApiClient
871 :type timeout: Optional[float]
872 :returns: Message read or None if nothing got read.
873 :rtype: Optional[namedtuple]
875 timeout = vpp_instance.read_timeout if timeout is None else timeout
876 if vpp_instance.csit_deque is None:
877 return vpp_instance.read_blocking(timeout=timeout)
878 time_stop = time.monotonic() + timeout
881 return vpp_instance.csit_deque.popleft()
883 # We could busy-wait but that seems to starve the reader thread.
885 if time.monotonic() > time_stop:
889 def _read(vpp_instance, tries=3):
890 """Blockingly read within timeout, retry on early None.
892 For (sometimes) unknown reasons, VPP client in async mode likes
893 to return None occasionally before time runs out.
894 This function retries in that case.
896 Most of the time, early None means VPP crashed (see VPP-2033),
897 but is is better to give VPP more chances to respond without failure.
899 TODO: Perhaps CSIT now never triggers VPP-2033,
900 so investigate and remove this layer if even more speed is needed.
902 :param vpp_instance: Client instance to read from.
903 :param tries: Maximum number of tries to attempt.
904 :type vpp_instance: vpp_papi.VPPApiClient
906 :returns: Message read or None if nothing got read even with retries.
907 :rtype: Optional[namedtuple]
909 timeout = vpp_instance.read_timeout
910 for _ in range(tries):
911 time_stop = time.monotonic() + 0.9 * timeout
912 reply = PapiSocketExecutor._read_internal(vpp_instance)
913 if reply is None and time.monotonic() < time_stop:
914 logger.trace("Early None. Retry?")
917 logger.trace(f"Got {tries} early Nones, probably a real None.")
921 def _drain(vpp_instance, err_msg, timeout=30.0):
922 """Keep reading with until None or timeout.
924 This is needed to mitigate the risk of a state with unread responses
925 (e.g. after non-zero retval in the middle of get_replies)
926 causing failures in everything subsequent (until disconnect).
928 The reads are done without any waiting.
930 It is possible some responses have not arrived yet,
931 but that is unlikely as Python is usually slower than VPP.
933 :param vpp_instance: Client instance to read from.
934 :param err_msg: Error message to use when overstepping timeout.
935 :param timeout: How long to try before giving up.
936 :type vpp_instance: vpp_papi.VPPApiClient
939 :raises RuntimeError: If read keeps returning nonzero after timeout.
941 time_stop = time.monotonic() + timeout
942 while time.monotonic() < time_stop:
943 if PapiSocketExecutor._read_internal(vpp_instance, 0.0) is None:
945 raise RuntimeError(f"{err_msg}\nTimed out while draining.")
947 def _execute(self, err_msg, do_async, single_reply=True):
948 """Turn internal command list into data and execute; return replies.
950 This method also clears the internal command list.
952 :param err_msg: The message used if the PAPI command(s) execution fails.
953 :param do_async: If true, assume one reply per command and do not wait
954 for each reply before sending next request.
955 Dump commands (and calls causing VPP-2033) need False.
956 :param single_reply: For sync emulation mode (cannot be False
957 if do_async is True). When false use control ping.
958 When true, wait for a single reply.
961 :type single_reply: bool
962 :returns: Papi replies parsed into a dict-like object,
963 with fields due to API (possibly including retval).
964 :rtype: NoneType or list of dict
965 :raises RuntimeError: If the replies are not all correct.
967 local_list = self._api_command_list
968 # Clear first as execution may fail.
969 self._api_command_list = list()
972 raise RuntimeError("Async papi needs one reply per request.")
973 return self._execute_async(local_list, err_msg=err_msg)
974 return self._execute_sync(
975 local_list, err_msg=err_msg, single_reply=single_reply
978 def _execute_sync(self, local_list, err_msg, single_reply):
979 """Execute commands waiting for replies one by one; return replies.
981 This implementation either expects a single response per request,
982 or uses control ping to emulate sync PAPI calls.
983 Reliable, but slow. Required for dumps. Needed for calls
984 which trigger VPP-2033.
986 CRC checking is done for the replies (requests are checked in .add).
988 :param local_list: The list of PAPI commands to be executed on the node.
989 :param err_msg: The message used if the PAPI command(s) execution fails.
990 :param single_reply: When false use control ping.
991 When true, wait for a single reply.
992 :type local_list: list of dict
994 :type single_reply: bool
995 :returns: Papi replies parsed into a dict-like object,
996 with fields due to API (possibly including retval).
997 :rtype: List[UserDict]
998 :raises RuntimeError: If the replies are not all correct.
1000 vpp_instance = self.get_connected_client()
1001 control_ping_fn = getattr(vpp_instance.api, "control_ping")
1003 for command in local_list:
1004 api_name = command["api_name"]
1005 papi_fn = getattr(vpp_instance.api, api_name)
1008 # Send the command maybe followed by control ping.
1009 main_context = papi_fn(**command["api_args"])
1011 replies.append(PapiSocketExecutor._read(vpp_instance))
1013 ping_context = control_ping_fn()
1014 # Receive the replies.
1016 reply = PapiSocketExecutor._read(vpp_instance)
1019 f"{err_msg}\nSync PAPI timed out."
1021 if reply.context == ping_context:
1023 if reply.context != main_context:
1025 f"{err_msg}\nUnexpected context: {reply!r}"
1027 replies.append(reply)
1028 except (AttributeError, IOError, struct.error) as err:
1029 # TODO: Add retry if it is still needed.
1030 raise AssertionError(f"{err_msg}") from err
1032 # Discard any unprocessed replies to avoid secondary failures.
1033 PapiSocketExecutor._drain(vpp_instance, err_msg)
1034 # Process replies for this command.
1035 for reply in replies:
1036 self.crc_checker.check_api_name(reply.__class__.__name__)
1037 dictized_reply = dictize_and_check_retval(reply, err_msg)
1038 ret_list.append(dictized_reply)
1041 def _execute_async(self, local_list, err_msg):
1042 """Read, process and return replies.
1044 The messages were already sent by .add() in this mode,
1045 local_list is used just so we know how many replies to read.
1047 Beware: It is not clear what to do when socket read fails
1048 in the middle of async processing.
1050 The implementation assumes each command results in exactly one reply,
1051 there is no reordering in either commands nor replies,
1052 and context numbers increase one by one (and are matching for replies).
1054 To speed processing up, reply CRC values are not checked.
1056 The current implementation does not limit the number of messages
1057 in-flight, we rely on VPP PAPI background thread to move replies
1058 from socket to queue fast enough.
1060 :param local_list: The list of PAPI commands to get replies for.
1061 :param err_msg: The message used if the PAPI command(s) execution fails.
1062 :type local_list: list
1064 :returns: Papi replies parsed into a dict-like object, with fields
1065 according to API (possibly including retval).
1066 :rtype: List[UserDict]
1067 :raises RuntimeError: If the replies are not all correct.
1069 vpp_instance = self.get_connected_client()
1072 for index, _ in enumerate(local_list):
1073 # Blocks up to timeout.
1074 reply = PapiSocketExecutor._read(vpp_instance)
1076 time_msg = f"PAPI async timeout: idx {index}"
1077 raise RuntimeError(f"{err_msg}\n{time_msg}")
1078 ret_list.append(dictize_and_check_retval(reply, err_msg))
1080 # Discard any unprocessed replies to avoid secondary failures.
1081 PapiSocketExecutor._drain(vpp_instance, err_msg)
1086 """Class for holding a single keyword."""
1089 def disconnect_all_papi_connections():
1090 """Disconnect all connected client instances, tear down the SSH tunnels.
1092 Also remove the local sockets by deleting the temporary directory.
1093 Put disconnected client instances to the reuse list.
1094 The added attributes are not cleaned up,
1095 as their values will get overwritten on next connect.
1097 Call this method just before killing/restarting all VPP instances.
1099 This could be a class method of PapiSocketExecutor.
1100 But Robot calls methods on instances, and it would be weird
1101 to give node argument for constructor in import.
1102 Also, as we have a class of the same name as the module,
1103 the keywords defined on module level are not accessible.
1105 cls = PapiSocketExecutor
1106 # Iterate over copy of entries so deletions do not mess with iterator.
1107 for key in list(cls.conn_cache.keys()):
1108 cls.disconnect_by_key(key)
1112 """Contains methods for executing VPP Python API commands on DUTs.
1114 TODO: Remove .add step, make get_stats accept paths directly.
1116 This class processes only one type of VPP PAPI methods: vpp-stats.
1118 The recommended ways of use are (examples):
1120 path = ['^/if', '/err/ip4-input', '/sys/node/ip4-input']
1121 with PapiExecutor(node) as papi_exec:
1122 stats = papi_exec.add(api_name='vpp-stats', path=path).get_stats()
1124 print('RX interface core 0, sw_if_index 0:\n{0}'.\
1125 format(stats[0]['/if/rx'][0][0]))
1130 path_2 = ['^/if', '/err/ip4-input', '/sys/node/ip4-input']
1131 with PapiExecutor(node) as papi_exec:
1132 stats = papi_exec.add('vpp-stats', path=path_1).\
1133 add('vpp-stats', path=path_2).get_stats()
1135 print('RX interface core 0, sw_if_index 0:\n{0}'.\
1136 format(stats[1]['/if/rx'][0][0]))
1138 Note: In this case, when PapiExecutor method 'add' is used:
1139 - its parameter 'csit_papi_command' is used only to keep information
1140 that vpp-stats are requested. It is not further processed but it is
1141 included in the PAPI history this way:
1142 vpp-stats(path=['^/if', '/err/ip4-input', '/sys/node/ip4-input'])
1143 Always use csit_papi_command="vpp-stats" if the VPP PAPI method
1145 - the second parameter must be 'path' as it is used by PapiExecutor
1147 - even if the parameter contains multiple paths, there is only one
1148 reply item (for each .add).
1151 def __init__(self, node):
1154 :param node: Node to run command(s) on.
1157 # Node to run command(s) on.
1160 # The list of PAPI commands to be executed on the node.
1161 self._api_command_list = list()
1165 def __enter__(self):
1167 self._ssh.connect(self._node)
1168 except IOError as err:
1169 msg = f"PAPI: Cannot open SSH connection to {self._node['host']}"
1170 raise RuntimeError(msg) from err
1173 def __exit__(self, exc_type, exc_val, exc_tb):
1174 self._ssh.disconnect(self._node)
1176 def add(self, csit_papi_command="vpp-stats", history=True, **kwargs):
1177 """Add next command to internal command list; return self.
1179 The argument name 'csit_papi_command' must be unique enough as it cannot
1180 be repeated in kwargs.
1181 The kwargs dict is deep-copied, so it is safe to use the original
1182 with partial modifications for subsequent commands.
1184 :param csit_papi_command: VPP API command.
1185 :param history: Enable/disable adding command to PAPI command history.
1186 :param kwargs: Optional key-value arguments.
1187 :type csit_papi_command: str
1190 :returns: self, so that method chaining is possible.
1191 :rtype: PapiExecutor
1194 PapiHistory.add_to_papi_history(
1195 self._node, csit_papi_command, **kwargs
1197 self._api_command_list.append(
1198 dict(api_name=csit_papi_command, api_args=copy.deepcopy(kwargs))
1204 err_msg="Failed to get statistics.",
1206 socket=Constants.SOCKSTAT_PATH,
1208 """Get VPP Stats from VPP Python API.
1210 :param err_msg: The message used if the PAPI command(s) execution fails.
1211 :param timeout: Timeout in seconds.
1212 :param socket: Path to Stats socket to tunnel to.
1216 :returns: Requested VPP statistics.
1217 :rtype: list of dict
1219 paths = [cmd["api_args"]["path"] for cmd in self._api_command_list]
1220 self._api_command_list = list()
1222 stdout = self._execute_papi(
1230 return json.loads(stdout)
1233 def _process_api_data(api_d):
1234 """Process API data for smooth converting to JSON string.
1236 Apply binascii.hexlify() method for string values.
1238 :param api_d: List of APIs with their arguments.
1240 :returns: List of APIs with arguments pre-processed for JSON.
1244 def process_value(val):
1247 :param val: Value to be processed.
1249 :returns: Processed value.
1250 :rtype: dict or str or int
1252 if isinstance(val, dict):
1253 for val_k, val_v in val.items():
1254 val[str(val_k)] = process_value(val_v)
1256 elif isinstance(val, list):
1257 for idx, val_l in enumerate(val):
1258 val[idx] = process_value(val_l)
1261 retval = val.encode().hex() if isinstance(val, str) else val
1264 api_data_processed = list()
1266 api_args_processed = dict()
1267 for a_k, a_v in api["api_args"].items():
1268 api_args_processed[str(a_k)] = process_value(a_v)
1269 api_data_processed.append(
1270 dict(api_name=api["api_name"], api_args=api_args_processed)
1272 return api_data_processed
1275 self, api_data, method="request", err_msg="", timeout=120, socket=None
1277 """Execute PAPI command(s) on remote node and store the result.
1279 :param api_data: List of APIs with their arguments.
1280 :param method: VPP Python API method. Supported methods are: 'request',
1282 :param err_msg: The message used if the PAPI command(s) execution fails.
1283 :param timeout: Timeout in seconds.
1284 :type api_data: list
1288 :returns: Stdout from remote python utility, to be parsed by caller.
1290 :raises SSHTimeout: If PAPI command(s) execution has timed out.
1291 :raises RuntimeError: If PAPI executor failed due to another reason.
1292 :raises AssertionError: If PAPI command(s) execution has failed.
1295 raise RuntimeError("No API data provided.")
1298 json.dumps(api_data)
1299 if method in ("stats", "stats_request")
1300 else json.dumps(self._process_api_data(api_data))
1303 sock = f" --socket {socket}" if socket else ""
1305 f"{Constants.REMOTE_FW_DIR}/{Constants.RESOURCES_PAPI_PROVIDER}"
1306 f" --method {method} --data '{json_data}'{sock}"
1309 ret_code, stdout, _ = self._ssh.exec_command_sudo(
1310 cmd=cmd, timeout=timeout, log_stdout_err=False
1312 # TODO: Fail on non-empty stderr?
1315 f"PAPI command(s) execution timeout on host"
1316 f" {self._node['host']}:\n{api_data}"
1319 except Exception as exc:
1321 f"PAPI command(s) execution on host {self._node['host']}"
1322 f" failed: {api_data}"
1325 raise AssertionError(err_msg)