style(papi): reformat code before real changes
[csit.git] / resources / libraries / python / PapiExecutor.py
1 # Copyright (c) 2023 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Python API executor library."""
15
16 import copy
17 import glob
18 import json
19 import shutil
20 import struct  # vpp-papi can raise struct.error
21 import subprocess
22 import sys
23 import tempfile
24 import time
25 from collections import UserDict
26
27
28 from pprint import pformat
29 from robot.api import logger
30
31 from resources.libraries.python.Constants import Constants
32 from resources.libraries.python.LocalExecution import run
33 from resources.libraries.python.FilteredLogger import FilteredLogger
34 from resources.libraries.python.PapiHistory import PapiHistory
35 from resources.libraries.python.ssh import (
36     SSH,
37     SSHTimeout,
38     exec_cmd_no_error,
39     scp_node,
40 )
41 from resources.libraries.python.topology import Topology, SocketType
42 from resources.libraries.python.VppApiCrc import VppApiCrcChecker
43
44
45 __all__ = [
46     "PapiExecutor",
47     "PapiSocketExecutor",
48     "Disconnector",
49 ]
50
51
52 def dictize(obj):
53     """A helper method, to make namedtuple-like object accessible as dict.
54
55     If the object is namedtuple-like, its _asdict() form is returned,
56     but in the returned object __getitem__ method is wrapped
57     to dictize also any items returned.
58     If the object does not have _asdict, it will be returned without any change.
59     Integer keys still access the object as tuple.
60
61     A more useful version would be to keep obj mostly as a namedtuple,
62     just add getitem for string keys. Unfortunately, namedtuple inherits
63     from tuple, including its read-only __getitem__ attribute,
64     so we cannot monkey-patch it.
65
66     TODO: Create a proxy for named tuple to allow that.
67
68     :param obj: Arbitrary object to dictize.
69     :type obj: object
70     :returns: Dictized object.
71     :rtype: same as obj type or collections.OrderedDict
72     """
73     if not hasattr(obj, "_asdict"):
74         return obj
75     overriden = UserDict(obj._asdict())
76     old_get = overriden.__getitem__
77     new_get = lambda self, key: dictize(old_get(self, key))
78     overriden.__getitem__ = new_get
79     return overriden
80
81
82 class PapiSocketExecutor:
83     """Methods for executing VPP Python API commands on forwarded socket.
84
85     Previously, we used an implementation with single client instance
86     and connection being handled by a resource manager.
87     On "with" statement, the instance connected, and disconnected
88     on exit from the "with" block.
89     This was limiting (no nested with blocks) and mainly it was slow:
90     0.7 seconds per disconnect cycle on Skylake, more than 3 second on Taishan.
91
92     The currently used implementation caches the connected client instances,
93     providing speedup and making "with" blocks unnecessary.
94     But with many call sites, "with" blocks are still the main usage pattern.
95     Documentation still lists that as the intended pattern.
96
97     As a downside, clients need to be explicitly told to disconnect
98     before VPP restart.
99     There is some amount of retries and disconnects on disconnect
100     (so unresponsive VPPs do not breach test much more than needed),
101     but it is hard to verify all that works correctly.
102     Especially, if Robot crashes, files and ssh processes may leak.
103
104     Delay for accepting socket connection is 10s.
105     TODO: Decrease 10s to value that is long enough for creating connection
106     and short enough to not affect performance.
107
108     The current implementation downloads and parses .api.json files only once
109     and caches client instances for reuse.
110     Cleanup metadata is added as additional attributes
111     directly to client instances.
112
113     The current implementation seems to run into read error occasionally.
114     Not sure if the error is in Python code on Robot side, ssh forwarding,
115     or socket handling at VPP side. Anyway, reconnect after some sleep
116     seems to help, hoping repeated command execution does not lead to surprises.
117     The reconnection is logged at WARN level, so it is prominently shown
118     in log.html, so we can see how frequently it happens.
119
120     TODO: Support handling of retval!=0 without try/except in caller.
121
122     Note: Use only with "with" statement, e.g.:
123
124         cmd = 'show_version'
125         with PapiSocketExecutor(node) as papi_exec:
126             reply = papi_exec.add(cmd).get_reply(err_msg)
127
128     This class processes two classes of VPP PAPI methods:
129     1. Simple request / reply: method='request'.
130     2. Dump functions: method='dump'.
131
132     Note that access to VPP stats over socket is not supported yet.
133
134     The recommended ways of use are (examples):
135
136     1. Simple request / reply
137
138     a. One request with no arguments:
139
140         cmd = 'show_version'
141         with PapiSocketExecutor(node) as papi_exec:
142             reply = papi_exec.add(cmd).get_reply(err_msg)
143
144     b. Three requests with arguments, the second and the third ones are the same
145        but with different arguments.
146
147         with PapiSocketExecutor(node) as papi_exec:
148             replies = papi_exec.add(cmd1, **args1).add(cmd2, **args2).\
149                 add(cmd2, **args3).get_replies(err_msg)
150
151     2. Dump functions
152
153         cmd = 'sw_interface_rx_placement_dump'
154         with PapiSocketExecutor(node) as papi_exec:
155             details = papi_exec.add(cmd, sw_if_index=ifc['vpp_sw_index']).\
156                 get_details(err_msg)
157     """
158
159     # Class cache for reuse between instances.
160     api_root_dir = None
161     """We copy .api json files and PAPI code from DUT to robot machine.
162     This class variable holds temporary directory once created.
163     When python exits, the directory is deleted, so no downloaded file leaks.
164     The value will be set to TemporaryDirectory class instance (not string path)
165     to ensure deletion at exit."""
166     api_json_path = None
167     """String path to .api.json files, a directory somewhere in api_root_dir."""
168     api_package_path = None
169     """String path to PAPI code, a different directory under api_root_dir."""
170     crc_checker = None
171     """Accesses .api.json files at creation, caching speeds up accessing it."""
172     reusable_vpp_client_list = list()
173     """Each connection needs a separate client instance,
174     and each client instance creation needs to parse all .api files,
175     which takes time. If a client instance disconnects, it is put here,
176     so on next connect we can reuse intead of creating new."""
177     conn_cache = dict()
178     """Mapping from node key to connected client instance."""
179
180     def __init__(self, node, remote_vpp_socket=Constants.SOCKSVR_PATH):
181         """Store the given arguments, declare managed variables.
182
183         :param node: Node to connect to and forward unix domain socket from.
184         :param remote_vpp_socket: Path to remote socket to tunnel to.
185         :type node: dict
186         :type remote_vpp_socket: str
187         """
188         self._node = node
189         self._remote_vpp_socket = remote_vpp_socket
190         # The list of PAPI commands to be executed on the node.
191         self._api_command_list = list()
192
193     def ensure_api_dirs(self):
194         """Copy files from DUT to local temporary directory.
195
196         If the directory is still there, do not copy again.
197         If copying, also initialize CRC checker (this also performs
198         static checks), and remember PAPI package path.
199         Do not add that to PATH yet.
200         """
201         cls = self.__class__
202         if cls.api_package_path:
203             return
204         cls.api_root_dir = tempfile.TemporaryDirectory(dir="/tmp")
205         root_path = cls.api_root_dir.name
206         # Pack, copy and unpack Python part of VPP installation from _node.
207         # TODO: Use rsync or recursive version of ssh.scp_node instead?
208         node = self._node
209         exec_cmd_no_error(node, ["rm", "-rf", "/tmp/papi.txz"])
210         # Papi python version depends on OS (and time).
211         # Python 2.7 or 3.4, site-packages or dist-packages.
212         installed_papi_glob = "/usr/lib/python3*/*-packages/vpp_papi"
213         # We need to wrap this command in bash, in order to expand globs,
214         # and as ssh does join, the inner command has to be quoted.
215         inner_cmd = " ".join(
216             [
217                 "tar",
218                 "cJf",
219                 "/tmp/papi.txz",
220                 "--exclude=*.pyc",
221                 installed_papi_glob,
222                 "/usr/share/vpp/api",
223             ]
224         )
225         exec_cmd_no_error(node, ["bash", "-c", "'" + inner_cmd + "'"])
226         scp_node(node, root_path + "/papi.txz", "/tmp/papi.txz", get=True)
227         run(["tar", "xf", root_path + "/papi.txz", "-C", root_path])
228         cls.api_json_path = root_path + "/usr/share/vpp/api"
229         # Perform initial checks before .api.json files are gone,
230         # by creating the checker instance.
231         cls.crc_checker = VppApiCrcChecker(cls.api_json_path)
232         # When present locally, we finally can find the installation path.
233         cls.api_package_path = glob.glob(root_path + installed_papi_glob)[0]
234         # Package path has to be one level above the vpp_papi directory.
235         cls.api_package_path = cls.api_package_path.rsplit("/", 1)[0]
236
237     def ensure_vpp_instance(self):
238         """Create or reuse a closed client instance, return it.
239
240         The instance is initialized for unix domain socket access,
241         it has initialized all the bindings, it is removed from the internal
242         list of disconnected instances, but it is not connected
243         (to a local socket) yet.
244
245         :returns: VPP client instance ready for connect.
246         :rtype: vpp_papi.VPPApiClient
247         """
248         self.ensure_api_dirs()
249         cls = self.__class__
250         if cls.reusable_vpp_client_list:
251             # Reuse in LIFO fashion.
252             *cls.reusable_vpp_client_list, ret = cls.reusable_vpp_client_list
253             return ret
254         # Creating an instance leads to dynamic imports from VPP PAPI code,
255         # so the package directory has to be present until the instance.
256         # But it is simpler to keep the package dir around.
257         try:
258             sys.path.append(cls.api_package_path)
259             # TODO: Pylint says import-outside-toplevel and import-error.
260             # It is right, we should refactor the code and move initialization
261             # of package outside.
262             from vpp_papi.vpp_papi import VPPApiClient as vpp_class
263
264             vpp_class.apidir = cls.api_json_path
265             # We need to create instance before removing from sys.path.
266             vpp_instance = vpp_class(
267                 use_socket=True,
268                 server_address="TBD",
269                 async_thread=False,
270                 read_timeout=14,
271                 logger=FilteredLogger(logger, "INFO"),
272             )
273             # Cannot use loglevel parameter, robot.api.logger lacks support.
274             # TODO: Stop overriding read_timeout when VPP-1722 is fixed.
275         finally:
276             if sys.path[-1] == cls.api_package_path:
277                 sys.path.pop()
278         return vpp_instance
279
280     @classmethod
281     def key_for_node_and_socket(cls, node, remote_socket):
282         """Return a hashable object to distinguish nodes.
283
284         The usual node object (of "dict" type) is not hashable,
285         and can contain mutable information (mostly virtual interfaces).
286         Use this method to get an object suitable for being a key in dict.
287
288         The fields to include are chosen by what ssh needs.
289
290         This class method is needed, for disconnect.
291
292         :param node: The node object to distinguish.
293         :param remote_socket: Path to remote socket.
294         :type node: dict
295         :type remote_socket: str
296         :return: Tuple of values distinguishing this node from similar ones.
297         :rtype: tuple of str
298         """
299         return (
300             node["host"],
301             node["port"],
302             remote_socket,
303             # TODO: Do we support sockets paths such as "~/vpp/api.socket"?
304             # If yes, add also:
305             # node[u"username"],
306         )
307
308     def key_for_self(self):
309         """Return a hashable object to distinguish nodes.
310
311         Just a wrapper around key_for_node_and_socket
312         which sets up proper arguments.
313
314         :return: Tuple of values distinguishing this node from similar ones.
315         :rtype: tuple of str
316         """
317         return self.__class__.key_for_node_and_socket(
318             self._node,
319             self._remote_vpp_socket,
320         )
321
322     def set_connected_client(self, client):
323         """Add a connected client instance into cache.
324
325         This hides details of what the node key is.
326
327         If there already is a client for the computed key,
328         fail, as it is a sign of resource leakage.
329
330         :param client: VPP client instance in connected state.
331         :type client: vpp_papi.VPPApiClient
332         :raises RuntimeError: If related key already has a cached client.
333         """
334         key = self.key_for_self()
335         cache = self.__class__.conn_cache
336         if key in cache:
337             raise RuntimeError(f"Caching client with existing key: {key}")
338         cache[key] = client
339
340     def get_connected_client(self, check_connected=True):
341         """Return None or cached connected client.
342
343         If check_connected, RuntimeError is raised when the client is
344         not in cache. None is returned if client is not in cache
345         (and the check is disabled).
346
347         This hides details of what the node key is.
348
349         :param check_connected: Whether cache miss raises.
350         :type check_connected: bool
351         :returns: Connected client instance, or None if uncached and no check.
352         :rtype: Optional[vpp_papi.VPPApiClient]
353         :raises RuntimeError: If cache miss and check enabled.
354         """
355         key = self.key_for_self()
356         ret = self.__class__.conn_cache.get(key, None)
357
358         if ret is None:
359             if check_connected:
360                 raise RuntimeError(f"Client not cached for key: {key}")
361         else:
362             # When reading logs, it is good to see which VPP is accessed.
363             logger.debug(f"Activated cached PAPI client for key: {key}")
364         return ret
365
366     def __enter__(self):
367         """Create a tunnel, connect VPP instance.
368
369         If the connected client is in cache, return it.
370         Only if not, create a new (or reuse a disconnected) client instance.
371
372         Only at this point a local socket names are created
373         in a temporary directory, as CSIT can connect to multiple VPPs.
374
375         The following attributes are added to the client instance
376         to simplify caching and cleanup:
377         csit_temp_dir
378             - Temporary socket files are created here.
379         csit_control_socket
380             - This socket controls the local ssh process doing the forwarding.
381         csit_local_vpp_socket
382             - This is the forwarded socket to talk with remote VPP.
383
384         The attribute names do not start with underscore,
385         so pylint does not complain about accessing private attribute.
386         The attribute names start with csit_ to avoid naming conflicts
387         with "real" attributes from VPP Python code.
388
389         :returns: self
390         :rtype: PapiSocketExecutor
391         """
392         # Do we have the connected instance in the cache?
393         vpp_instance = self.get_connected_client(check_connected=False)
394         if vpp_instance is not None:
395             return self
396         # No luck, create and connect a new instance.
397         time_enter = time.time()
398         node = self._node
399         # Parsing takes longer than connecting, prepare instance before tunnel.
400         vpp_instance = self.ensure_vpp_instance()
401         # Store into cache as soon as possible.
402         # If connection fails, it is better to attempt disconnect anyway.
403         self.set_connected_client(vpp_instance)
404         # Set additional attributes.
405         vpp_instance.csit_temp_dir = tempfile.TemporaryDirectory(dir="/tmp")
406         temp_path = vpp_instance.csit_temp_dir.name
407         api_socket = temp_path + "/vpp-api.sock"
408         vpp_instance.csit_local_vpp_socket = api_socket
409         ssh_socket = temp_path + "/ssh.sock"
410         vpp_instance.csit_control_socket = ssh_socket
411         # Cleanup possibilities.
412         ret_code, _ = run(["ls", ssh_socket], check=False)
413         if ret_code != 2:
414             # This branch never seems to be hit in CI,
415             # but may be useful when testing manually.
416             run(
417                 ["ssh", "-S", ssh_socket, "-O", "exit", "0.0.0.0"],
418                 check=False,
419                 log=True,
420             )
421             # TODO: Is any sleep necessary? How to prove if not?
422             run(["sleep", "0.1"])
423             run(["rm", "-vrf", ssh_socket])
424         # Even if ssh can perhaps reuse this file,
425         # we need to remove it for readiness detection to work correctly.
426         run(["rm", "-rvf", api_socket])
427         # We use sleep command. The ssh command will exit in 30 second,
428         # unless a local socket connection is established,
429         # in which case the ssh command will exit only when
430         # the ssh connection is closed again (via control socket).
431         # The log level is to suppress "Warning: Permanently added" messages.
432         ssh_cmd = [
433             "ssh",
434             "-S",
435             ssh_socket,
436             "-M",
437             "-L",
438             api_socket + ":" + self._remote_vpp_socket,
439             "-p",
440             str(node["port"]),
441             "-o",
442             "LogLevel=ERROR",
443             "-o",
444             "UserKnownHostsFile=/dev/null",
445             "-o",
446             "StrictHostKeyChecking=no",
447             "-o",
448             "ExitOnForwardFailure=yes",
449             node["username"] + "@" + node["host"],
450             "sleep",
451             "30",
452         ]
453         priv_key = node.get("priv_key")
454         if priv_key:
455             # This is tricky. We need a file to pass the value to ssh command.
456             # And we need ssh command, because paramiko does not support sockets
457             # (neither ssh_socket, nor _remote_vpp_socket).
458             key_file = tempfile.NamedTemporaryFile()
459             key_file.write(priv_key)
460             # Make sure the content is written, but do not close yet.
461             key_file.flush()
462             ssh_cmd[1:1] = ["-i", key_file.name]
463         password = node.get("password")
464         if password:
465             # Prepend sshpass command to set password.
466             ssh_cmd[:0] = ["sshpass", "-p", password]
467         time_stop = time.time() + 10.0
468         # subprocess.Popen seems to be the best way to run commands
469         # on background. Other ways (shell=True with "&" and ssh with -f)
470         # seem to be too dependent on shell behavior.
471         # In particular, -f does NOT return values for run().
472         subprocess.Popen(ssh_cmd)
473         # Check socket presence on local side.
474         while time.time() < time_stop:
475             # It can take a moment for ssh to create the socket file.
476             ret_code, _ = run(["ls", "-l", api_socket], check=False)
477             if not ret_code:
478                 break
479             time.sleep(0.1)
480         else:
481             raise RuntimeError("Local side socket has not appeared.")
482         if priv_key:
483             # Socket up means the key has been read. Delete file by closing it.
484             key_file.close()
485         # Everything is ready, set the local socket address and connect.
486         vpp_instance.transport.server_address = api_socket
487         # It seems we can get read error even if every preceding check passed.
488         # Single retry seems to help.
489         for _ in range(2):
490             try:
491                 vpp_instance.connect_sync("csit_socket")
492             except (IOError, struct.error) as err:
493                 logger.warn(f"Got initial connect error {err!r}")
494                 vpp_instance.disconnect()
495             else:
496                 break
497         else:
498             raise RuntimeError("Failed to connect to VPP over a socket.")
499         logger.trace(
500             f"Establishing socket connection took {time.time()-time_enter}s"
501         )
502         return self
503
504     def __exit__(self, exc_type, exc_val, exc_tb):
505         """No-op, the client instance remains in cache in connected state."""
506
507     @classmethod
508     def disconnect_by_key(cls, key):
509         """Disconnect a connected client instance, noop it not connected.
510
511         Also remove the local sockets by deleting the temporary directory.
512         Put disconnected client instances to the reuse list.
513         The added attributes are not cleaned up,
514         as their values will get overwritten on next connect.
515
516         This method is useful for disconnect_all type of work.
517
518         :param key: Tuple identifying the node (and socket).
519         :type key: tuple of str
520         """
521         client_instance = cls.conn_cache.get(key, None)
522         if client_instance is None:
523             return
524         logger.debug(f"Disconnecting by key: {key}")
525         client_instance.disconnect()
526         run(
527             [
528                 "ssh",
529                 "-S",
530                 client_instance.csit_control_socket,
531                 "-O",
532                 "exit",
533                 "0.0.0.0",
534             ],
535             check=False,
536         )
537         # Temp dir has autoclean, but deleting explicitly
538         # as an error can happen.
539         try:
540             client_instance.csit_temp_dir.cleanup()
541         except FileNotFoundError:
542             # There is a race condition with ssh removing its ssh.sock file.
543             # Single retry should be enough to ensure the complete removal.
544             shutil.rmtree(client_instance.csit_temp_dir.name)
545         # Finally, put disconnected clients to reuse list.
546         cls.reusable_vpp_client_list.append(client_instance)
547         # Invalidate cache last. Repeated errors are better than silent leaks.
548         del cls.conn_cache[key]
549
550     @classmethod
551     def disconnect_by_node_and_socket(
552         cls, node, remote_socket=Constants.SOCKSVR_PATH
553     ):
554         """Disconnect a connected client instance, noop it not connected.
555
556         Also remove the local sockets by deleting the temporary directory.
557         Put disconnected client instances to the reuse list.
558         The added attributes are not cleaned up,
559         as their values will get overwritten on next connect.
560
561         Call this method just before killing/restarting remote VPP instance.
562         """
563         key = cls.key_for_node_and_socket(node, remote_socket)
564         return cls.disconnect_by_key(key)
565
566     @classmethod
567     def disconnect_all_sockets_by_node(cls, node):
568         """Disconnect all socket connected client instance.
569
570         Noop if not connected.
571
572         Also remove the local sockets by deleting the temporary directory.
573         Put disconnected client instances to the reuse list.
574         The added attributes are not cleaned up,
575         as their values will get overwritten on next connect.
576
577         Call this method just before killing/restarting remote VPP instance.
578         """
579         sockets = Topology.get_node_sockets(node, socket_type=SocketType.PAPI)
580         if sockets:
581             for socket in sockets.values():
582                 # TODO: Remove sockets from topology.
583                 PapiSocketExecutor.disconnect_by_node_and_socket(node, socket)
584         # Always attempt to disconnect the default socket.
585         return cls.disconnect_by_node_and_socket(node)
586
587     @staticmethod
588     def disconnect_all_papi_connections():
589         """Disconnect all connected client instances, tear down the SSH tunnels.
590
591         Also remove the local sockets by deleting the temporary directory.
592         Put disconnected client instances to the reuse list.
593         The added attributes are not cleaned up,
594         as their values will get overwritten on next connect.
595
596         This should be a class method,
597         but we prefer to call static methods from Robot.
598
599         Call this method just before killing/restarting all VPP instances.
600         """
601         cls = PapiSocketExecutor
602         # Iterate over copy of entries so deletions do not mess with iterator.
603         keys_copy = list(cls.conn_cache.keys())
604         for key in keys_copy:
605             cls.disconnect_by_key(key)
606
607     def add(self, csit_papi_command, history=True, **kwargs):
608         """Add next command to internal command list; return self.
609
610         Unless disabled, new entry to papi history is also added at this point.
611         The argument name 'csit_papi_command' must be unique enough as it cannot
612         be repeated in kwargs.
613         The kwargs dict is deep-copied, so it is safe to use the original
614         with partial modifications for subsequent commands.
615
616         Any pending conflicts from .api.json processing are raised.
617         Then the command name is checked for known CRCs.
618         Unsupported commands raise an exception, as CSIT change
619         should not start using messages without making sure which CRCs
620         are supported.
621         Each CRC issue is raised only once, so subsequent tests
622         can raise other issues.
623
624         :param csit_papi_command: VPP API command.
625         :param history: Enable/disable adding command to PAPI command history.
626         :param kwargs: Optional key-value arguments.
627         :type csit_papi_command: str
628         :type history: bool
629         :type kwargs: dict
630         :returns: self, so that method chaining is possible.
631         :rtype: PapiSocketExecutor
632         :raises RuntimeError: If unverified or conflicting CRC is encountered.
633         """
634         self.crc_checker.report_initial_conflicts()
635         if history:
636             PapiHistory.add_to_papi_history(
637                 self._node, csit_papi_command, **kwargs
638             )
639         self.crc_checker.check_api_name(csit_papi_command)
640         self._api_command_list.append(
641             dict(api_name=csit_papi_command, api_args=copy.deepcopy(kwargs))
642         )
643         return self
644
645     def get_replies(self, err_msg="Failed to get replies."):
646         """Get replies from VPP Python API.
647
648         The replies are parsed into dict-like objects,
649         "retval" field is guaranteed to be zero on success.
650
651         :param err_msg: The message used if the PAPI command(s) execution fails.
652         :type err_msg: str
653         :returns: Responses, dict objects with fields due to API and "retval".
654         :rtype: list of dict
655         :raises RuntimeError: If retval is nonzero, parsing or ssh error.
656         """
657         return self._execute(err_msg=err_msg)
658
659     def get_reply(self, err_msg="Failed to get reply."):
660         """Get reply from VPP Python API.
661
662         The reply is parsed into dict-like object,
663         "retval" field is guaranteed to be zero on success.
664
665         TODO: Discuss exception types to raise, unify with inner methods.
666
667         :param err_msg: The message used if the PAPI command(s) execution fails.
668         :type err_msg: str
669         :returns: Response, dict object with fields due to API and "retval".
670         :rtype: dict
671         :raises AssertionError: If retval is nonzero, parsing or ssh error.
672         """
673         replies = self.get_replies(err_msg=err_msg)
674         if len(replies) != 1:
675             raise RuntimeError(f"Expected single reply, got {replies!r}")
676         return replies[0]
677
678     def get_sw_if_index(self, err_msg="Failed to get reply."):
679         """Get sw_if_index from reply from VPP Python API.
680
681         Frequently, the caller is only interested in sw_if_index field
682         of the reply, this wrapper makes such call sites shorter.
683
684         TODO: Discuss exception types to raise, unify with inner methods.
685
686         :param err_msg: The message used if the PAPI command(s) execution fails.
687         :type err_msg: str
688         :returns: Response, sw_if_index value of the reply.
689         :rtype: int
690         :raises AssertionError: If retval is nonzero, parsing or ssh error.
691         """
692         reply = self.get_reply(err_msg=err_msg)
693         logger.trace(f"Getting index from {reply!r}")
694         return reply["sw_if_index"]
695
696     def get_details(self, err_msg="Failed to get dump details."):
697         """Get dump details from VPP Python API.
698
699         The details are parsed into dict-like objects.
700         The number of details per single dump command can vary,
701         and all association between details and dumps is lost,
702         so if you care about the association (as opposed to
703         logging everything at once for debugging purposes),
704         it is recommended to call get_details for each dump (type) separately.
705
706         :param err_msg: The message used if the PAPI command(s) execution fails.
707         :type err_msg: str
708         :returns: Details, dict objects with fields due to API without "retval".
709         :rtype: list of dict
710         """
711         return self._execute(err_msg)
712
713     @staticmethod
714     def run_cli_cmd(
715         node, cli_cmd, log=True, remote_vpp_socket=Constants.SOCKSVR_PATH
716     ):
717         """Run a CLI command as cli_inband, return the "reply" field of reply.
718
719         Optionally, log the field value.
720
721         :param node: Node to run command on.
722         :param cli_cmd: The CLI command to be run on the node.
723         :param remote_vpp_socket: Path to remote socket to tunnel to.
724         :param log: If True, the response is logged.
725         :type node: dict
726         :type remote_vpp_socket: str
727         :type cli_cmd: str
728         :type log: bool
729         :returns: CLI output.
730         :rtype: str
731         """
732         cmd = "cli_inband"
733         args = dict(cmd=cli_cmd)
734         err_msg = (
735             f"Failed to run 'cli_inband {cli_cmd}' PAPI command"
736             f" on host {node['host']}"
737         )
738
739         with PapiSocketExecutor(node, remote_vpp_socket) as papi_exec:
740             reply = papi_exec.add(cmd, **args).get_reply(err_msg)["reply"]
741         if log:
742             logger.info(
743                 f"{cli_cmd} ({node['host']} - {remote_vpp_socket}):\n"
744                 f"{reply.strip()}"
745             )
746         return reply
747
748     @staticmethod
749     def run_cli_cmd_on_all_sockets(node, cli_cmd, log=True):
750         """Run a CLI command as cli_inband, on all sockets in topology file.
751
752         :param node: Node to run command on.
753         :param cli_cmd: The CLI command to be run on the node.
754         :param log: If True, the response is logged.
755         :type node: dict
756         :type cli_cmd: str
757         :type log: bool
758         """
759         sockets = Topology.get_node_sockets(node, socket_type=SocketType.PAPI)
760         if sockets:
761             for socket in sockets.values():
762                 PapiSocketExecutor.run_cli_cmd(
763                     node, cli_cmd, log=log, remote_vpp_socket=socket
764                 )
765
766     @staticmethod
767     def dump_and_log(node, cmds):
768         """Dump and log requested information, return None.
769
770         :param node: DUT node.
771         :param cmds: Dump commands to be executed.
772         :type node: dict
773         :type cmds: list of str
774         """
775         with PapiSocketExecutor(node) as papi_exec:
776             for cmd in cmds:
777                 dump = papi_exec.add(cmd).get_details()
778                 logger.debug(f"{cmd}:\n{pformat(dump)}")
779
780     def _execute(self, err_msg="Undefined error message", exp_rv=0):
781         """Turn internal command list into data and execute; return replies.
782
783         This method also clears the internal command list.
784
785         IMPORTANT!
786         Do not use this method in L1 keywords. Use:
787         - get_replies()
788         - get_reply()
789         - get_sw_if_index()
790         - get_details()
791
792         :param err_msg: The message used if the PAPI command(s) execution fails.
793         :type err_msg: str
794         :returns: Papi responses parsed into a dict-like object,
795             with fields due to API (possibly including retval).
796         :rtype: list of dict
797         :raises RuntimeError: If the replies are not all correct.
798         """
799         vpp_instance = self.get_connected_client()
800         local_list = self._api_command_list
801         # Clear first as execution may fail.
802         self._api_command_list = list()
803         replies = list()
804         for command in local_list:
805             api_name = command["api_name"]
806             papi_fn = getattr(vpp_instance.api, api_name)
807             try:
808                 try:
809                     reply = papi_fn(**command["api_args"])
810                 except (IOError, struct.error) as err:
811                     # Occasionally an error happens, try reconnect.
812                     logger.warn(f"Reconnect after error: {err!r}")
813                     vpp_instance.disconnect()
814                     # Testing shows immediate reconnect fails.
815                     time.sleep(1)
816                     vpp_instance.connect_sync("csit_socket")
817                     logger.trace("Reconnected.")
818                     reply = papi_fn(**command["api_args"])
819             except (AttributeError, IOError, struct.error) as err:
820                 raise AssertionError(err_msg) from err
821             # *_dump commands return list of objects, convert, ordinary reply.
822             if not isinstance(reply, list):
823                 reply = [reply]
824             for item in reply:
825                 message_name = item.__class__.__name__
826                 self.crc_checker.check_api_name(message_name)
827                 dict_item = dictize(item)
828                 if "retval" in dict_item.keys():
829                     # *_details messages do not contain retval.
830                     retval = dict_item["retval"]
831                     if retval != exp_rv:
832                         raise AssertionError(
833                             f"Retval {retval!r} does not match expected"
834                             f" retval {exp_rv!r} in message {message_name}"
835                             f" for command {command}."
836                         )
837                 replies.append(dict_item)
838         return replies
839
840
841 class Disconnector:
842     """Class for holding a single keyword."""
843
844     @staticmethod
845     def disconnect_all_papi_connections():
846         """Disconnect all connected client instances, tear down the SSH tunnels.
847
848         Also remove the local sockets by deleting the temporary directory.
849         Put disconnected client instances to the reuse list.
850         The added attributes are not cleaned up,
851         as their values will get overwritten on next connect.
852
853         Call this method just before killing/restarting all VPP instances.
854
855         This could be a class method of PapiSocketExecutor.
856         But Robot calls methods on instances, and it would be weird
857         to give node argument for constructor in import.
858         Also, as we have a class of the same name as the module,
859         the keywords defined on module level are not accessible.
860         """
861         cls = PapiSocketExecutor
862         # Iterate over copy of entries so deletions do not mess with iterator.
863         keys_copy = list(cls.conn_cache.keys())
864         for key in keys_copy:
865             cls.disconnect_by_key(key)
866
867
868 class PapiExecutor:
869     """Contains methods for executing VPP Python API commands on DUTs.
870
871     TODO: Remove .add step, make get_stats accept paths directly.
872
873     This class processes only one type of VPP PAPI methods: vpp-stats.
874
875     The recommended ways of use are (examples):
876
877     path = ['^/if', '/err/ip4-input', '/sys/node/ip4-input']
878     with PapiExecutor(node) as papi_exec:
879         stats = papi_exec.add(api_name='vpp-stats', path=path).get_stats()
880
881     print('RX interface core 0, sw_if_index 0:\n{0}'.\
882         format(stats[0]['/if/rx'][0][0]))
883
884     or
885
886     path_1 = ['^/if', ]
887     path_2 = ['^/if', '/err/ip4-input', '/sys/node/ip4-input']
888     with PapiExecutor(node) as papi_exec:
889         stats = papi_exec.add('vpp-stats', path=path_1).\
890             add('vpp-stats', path=path_2).get_stats()
891
892     print('RX interface core 0, sw_if_index 0:\n{0}'.\
893         format(stats[1]['/if/rx'][0][0]))
894
895     Note: In this case, when PapiExecutor method 'add' is used:
896     - its parameter 'csit_papi_command' is used only to keep information
897       that vpp-stats are requested. It is not further processed but it is
898       included in the PAPI history this way:
899       vpp-stats(path=['^/if', '/err/ip4-input', '/sys/node/ip4-input'])
900       Always use csit_papi_command="vpp-stats" if the VPP PAPI method
901       is "stats".
902     - the second parameter must be 'path' as it is used by PapiExecutor
903       method 'add'.
904     - even if the parameter contains multiple paths, there is only one
905       reply item (for each .add).
906     """
907
908     def __init__(self, node):
909         """Initialization.
910
911         :param node: Node to run command(s) on.
912         :type node: dict
913         """
914         # Node to run command(s) on.
915         self._node = node
916
917         # The list of PAPI commands to be executed on the node.
918         self._api_command_list = list()
919
920         self._ssh = SSH()
921
922     def __enter__(self):
923         try:
924             self._ssh.connect(self._node)
925         except IOError:
926             raise RuntimeError(
927                 f"Cannot open SSH connection to host {self._node['host']}"
928                 f" to execute PAPI command(s)"
929             )
930         return self
931
932     def __exit__(self, exc_type, exc_val, exc_tb):
933         self._ssh.disconnect(self._node)
934
935     def add(self, csit_papi_command="vpp-stats", history=True, **kwargs):
936         """Add next command to internal command list; return self.
937
938         The argument name 'csit_papi_command' must be unique enough as it cannot
939         be repeated in kwargs.
940         The kwargs dict is deep-copied, so it is safe to use the original
941         with partial modifications for subsequent commands.
942
943         :param csit_papi_command: VPP API command.
944         :param history: Enable/disable adding command to PAPI command history.
945         :param kwargs: Optional key-value arguments.
946         :type csit_papi_command: str
947         :type history: bool
948         :type kwargs: dict
949         :returns: self, so that method chaining is possible.
950         :rtype: PapiExecutor
951         """
952         if history:
953             PapiHistory.add_to_papi_history(
954                 self._node, csit_papi_command, **kwargs
955             )
956         self._api_command_list.append(
957             dict(api_name=csit_papi_command, api_args=copy.deepcopy(kwargs))
958         )
959         return self
960
961     def get_stats(
962         self,
963         err_msg="Failed to get statistics.",
964         timeout=120,
965         socket=Constants.SOCKSTAT_PATH,
966     ):
967         """Get VPP Stats from VPP Python API.
968
969         :param err_msg: The message used if the PAPI command(s) execution fails.
970         :param timeout: Timeout in seconds.
971         :param socket: Path to Stats socket to tunnel to.
972         :type err_msg: str
973         :type timeout: int
974         :type socket: str
975         :returns: Requested VPP statistics.
976         :rtype: list of dict
977         """
978         paths = [cmd["api_args"]["path"] for cmd in self._api_command_list]
979         self._api_command_list = list()
980
981         stdout = self._execute_papi(
982             paths,
983             method="stats",
984             err_msg=err_msg,
985             timeout=timeout,
986             socket=socket,
987         )
988
989         return json.loads(stdout)
990
991     @staticmethod
992     def _process_api_data(api_d):
993         """Process API data for smooth converting to JSON string.
994
995         Apply binascii.hexlify() method for string values.
996
997         :param api_d: List of APIs with their arguments.
998         :type api_d: list
999         :returns: List of APIs with arguments pre-processed for JSON.
1000         :rtype: list
1001         """
1002
1003         def process_value(val):
1004             """Process value.
1005
1006             :param val: Value to be processed.
1007             :type val: object
1008             :returns: Processed value.
1009             :rtype: dict or str or int
1010             """
1011             if isinstance(val, dict):
1012                 for val_k, val_v in val.items():
1013                     val[str(val_k)] = process_value(val_v)
1014                 retval = val
1015             elif isinstance(val, list):
1016                 for idx, val_l in enumerate(val):
1017                     val[idx] = process_value(val_l)
1018                 retval = val
1019             else:
1020                 retval = val.encode().hex() if isinstance(val, str) else val
1021             return retval
1022
1023         api_data_processed = list()
1024         for api in api_d:
1025             api_args_processed = dict()
1026             for a_k, a_v in api["api_args"].items():
1027                 api_args_processed[str(a_k)] = process_value(a_v)
1028             api_data_processed.append(
1029                 dict(api_name=api["api_name"], api_args=api_args_processed)
1030             )
1031         return api_data_processed
1032
1033     def _execute_papi(
1034         self, api_data, method="request", err_msg="", timeout=120, socket=None
1035     ):
1036         """Execute PAPI command(s) on remote node and store the result.
1037
1038         :param api_data: List of APIs with their arguments.
1039         :param method: VPP Python API method. Supported methods are: 'request',
1040             'dump' and 'stats'.
1041         :param err_msg: The message used if the PAPI command(s) execution fails.
1042         :param timeout: Timeout in seconds.
1043         :type api_data: list
1044         :type method: str
1045         :type err_msg: str
1046         :type timeout: int
1047         :returns: Stdout from remote python utility, to be parsed by caller.
1048         :rtype: str
1049         :raises SSHTimeout: If PAPI command(s) execution has timed out.
1050         :raises RuntimeError: If PAPI executor failed due to another reason.
1051         :raises AssertionError: If PAPI command(s) execution has failed.
1052         """
1053         if not api_data:
1054             raise RuntimeError("No API data provided.")
1055
1056         json_data = (
1057             json.dumps(api_data)
1058             if method in ("stats", "stats_request")
1059             else json.dumps(self._process_api_data(api_data))
1060         )
1061
1062         sock = f" --socket {socket}" if socket else ""
1063         cmd = (
1064             f"{Constants.REMOTE_FW_DIR}/{Constants.RESOURCES_PAPI_PROVIDER}"
1065             f" --method {method} --data '{json_data}'{sock}"
1066         )
1067         try:
1068             ret_code, stdout, _ = self._ssh.exec_command_sudo(
1069                 cmd=cmd, timeout=timeout, log_stdout_err=False
1070             )
1071         # TODO: Fail on non-empty stderr?
1072         except SSHTimeout:
1073             logger.error(
1074                 f"PAPI command(s) execution timeout on host"
1075                 f" {self._node['host']}:\n{api_data}"
1076             )
1077             raise
1078         except Exception as exc:
1079             raise RuntimeError(
1080                 f"PAPI command(s) execution on host {self._node['host']}"
1081                 f" failed: {api_data}"
1082             ) from exc
1083         if ret_code != 0:
1084             raise AssertionError(err_msg)
1085
1086         return stdout