6b2168052680d6aad636370735ade7d26a13ea03
[csit.git] / resources / libraries / python / PapiExecutor.py
1 # Copyright (c) 2020 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Python API executor library.
15 """
16
17 import copy
18 import glob
19 import json
20 import shutil
21 import struct  # vpp-papi can raise struct.error
22 import subprocess
23 import sys
24 import tempfile
25 import time
26
27 from pprint import pformat
28 from robot.api import logger
29
30 from resources.libraries.python.Constants import Constants
31 from resources.libraries.python.LocalExecution import run
32 from resources.libraries.python.FilteredLogger import FilteredLogger
33 from resources.libraries.python.PapiHistory import PapiHistory
34 from resources.libraries.python.ssh import (
35     SSH, SSHTimeout, exec_cmd_no_error, scp_node)
36 from resources.libraries.python.topology import Topology, SocketType
37 from resources.libraries.python.VppApiCrc import VppApiCrcChecker
38
39
40 __all__ = [
41     u"PapiExecutor",
42     u"PapiSocketExecutor",
43     u"Disconnector",
44 ]
45
46
47 def dictize(obj):
48     """A helper method, to make namedtuple-like object accessible as dict.
49
50     If the object is namedtuple-like, its _asdict() form is returned,
51     but in the returned object __getitem__ method is wrapped
52     to dictize also any items returned.
53     If the object does not have _asdict, it will be returned without any change.
54     Integer keys still access the object as tuple.
55
56     A more useful version would be to keep obj mostly as a namedtuple,
57     just add getitem for string keys. Unfortunately, namedtuple inherits
58     from tuple, including its read-only __getitem__ attribute,
59     so we cannot monkey-patch it.
60
61     TODO: Create a proxy for named tuple to allow that.
62
63     :param obj: Arbitrary object to dictize.
64     :type obj: object
65     :returns: Dictized object.
66     :rtype: same as obj type or collections.OrderedDict
67     """
68     if not hasattr(obj, u"_asdict"):
69         return obj
70     ret = obj._asdict()
71     old_get = ret.__getitem__
72     new_get = lambda self, key: dictize(old_get(self, key))
73     ret.__getitem__ = new_get
74     return ret
75
76
77 class PapiSocketExecutor:
78     """Methods for executing VPP Python API commands on forwarded socket.
79
80     Previously, we used an implementation with single client instance
81     and connection being handled by a resource manager.
82     On "with" statement, the instance connected, and disconnected
83     on exit from the "with" block.
84     This was limiting (no nested with blocks) and mainly it was slow:
85     0.7 seconds per disconnect cycle on Skylake, more than 3 second on Taishan.
86
87     The currently used implementation caches the connected client instances,
88     providing speedup and making "with" blocks unnecessary.
89     But with many call sites, "with" blocks are still the main usage pattern.
90     Documentation still lists that as the intended pattern.
91
92     As a downside, clients need to be explicitly told to disconnect
93     before VPP restart.
94     There is some amount of retries and disconnects on disconnect
95     (so unresponsive VPPs do not breach test much more than needed),
96     but it is hard to verify all that works correctly.
97     Especially, if Robot crashes, files and ssh processes may leak.
98
99     Delay for accepting socket connection is 10s.
100     TODO: Decrease 10s to value that is long enough for creating connection
101     and short enough to not affect performance.
102
103     The current implementation downloads and parses .api.json files only once
104     and caches client instances for reuse.
105     Cleanup metadata is added as additional attributes
106     directly to client instances.
107
108     The current implementation seems to run into read error occasionally.
109     Not sure if the error is in Python code on Robot side, ssh forwarding,
110     or socket handling at VPP side. Anyway, reconnect after some sleep
111     seems to help, hoping repeated command execution does not lead to surprises.
112     The reconnection is logged at WARN level, so it is prominently shown
113     in log.html, so we can see how frequently it happens.
114
115     TODO: Support handling of retval!=0 without try/except in caller.
116
117     Note: Use only with "with" statement, e.g.:
118
119         cmd = 'show_version'
120         with PapiSocketExecutor(node) as papi_exec:
121             reply = papi_exec.add(cmd).get_reply(err_msg)
122
123     This class processes two classes of VPP PAPI methods:
124     1. Simple request / reply: method='request'.
125     2. Dump functions: method='dump'.
126
127     Note that access to VPP stats over socket is not supported yet.
128
129     The recommended ways of use are (examples):
130
131     1. Simple request / reply
132
133     a. One request with no arguments:
134
135         cmd = 'show_version'
136         with PapiSocketExecutor(node) as papi_exec:
137             reply = papi_exec.add(cmd).get_reply(err_msg)
138
139     b. Three requests with arguments, the second and the third ones are the same
140        but with different arguments.
141
142         with PapiSocketExecutor(node) as papi_exec:
143             replies = papi_exec.add(cmd1, **args1).add(cmd2, **args2).\
144                 add(cmd2, **args3).get_replies(err_msg)
145
146     2. Dump functions
147
148         cmd = 'sw_interface_rx_placement_dump'
149         with PapiSocketExecutor(node) as papi_exec:
150             details = papi_exec.add(cmd, sw_if_index=ifc['vpp_sw_index']).\
151                 get_details(err_msg)
152     """
153
154     # Class cache for reuse between instances.
155     api_root_dir = None
156     """We copy .api json files and PAPI code from DUT to robot machine.
157     This class variable holds temporary directory once created.
158     When python exits, the directory is deleted, so no downloaded file leaks.
159     The value will be set to TemporaryDirectory class instance (not string path)
160     to ensure deletion at exit."""
161     api_json_path = None
162     """String path to .api.json files, a directory somewhere in api_root_dir."""
163     api_package_path = None
164     """String path to PAPI code, a different directory under api_root_dir."""
165     crc_checker = None
166     """Accesses .api.json files at creation, caching speeds up accessing it."""
167     reusable_vpp_client_list = list()
168     """Each connection needs a separate client instance,
169     and each client instance creation needs to parse all .api files,
170     which takes time. If a client instance disconnects, it is put here,
171     so on next connect we can reuse intead of creating new."""
172     conn_cache = dict()
173     """Mapping from node key to connected client instance."""
174
175     def __init__(self, node, remote_vpp_socket=Constants.SOCKSVR_PATH):
176         """Store the given arguments, declare managed variables.
177
178         :param node: Node to connect to and forward unix domain socket from.
179         :param remote_vpp_socket: Path to remote socket to tunnel to.
180         :type node: dict
181         :type remote_vpp_socket: str
182         """
183         self._node = node
184         self._remote_vpp_socket = remote_vpp_socket
185         # The list of PAPI commands to be executed on the node.
186         self._api_command_list = list()
187
188     def ensure_api_dirs(self):
189         """Copy files from DUT to local temporary directory.
190
191         If the directory is still there, do not copy again.
192         If copying, also initialize CRC checker (this also performs
193         static checks), and remember PAPI package path.
194         Do not add that to PATH yet.
195         """
196         cls = self.__class__
197         if cls.api_package_path:
198             return
199         cls.api_root_dir = tempfile.TemporaryDirectory(dir=u"/tmp")
200         root_path = cls.api_root_dir.name
201         # Pack, copy and unpack Python part of VPP installation from _node.
202         # TODO: Use rsync or recursive version of ssh.scp_node instead?
203         node = self._node
204         exec_cmd_no_error(node, [u"rm", u"-rf", u"/tmp/papi.txz"])
205         # Papi python version depends on OS (and time).
206         # Python 2.7 or 3.4, site-packages or dist-packages.
207         installed_papi_glob = u"/usr/lib/python3*/*-packages/vpp_papi"
208         # We need to wrap this command in bash, in order to expand globs,
209         # and as ssh does join, the inner command has to be quoted.
210         inner_cmd = u" ".join([
211             u"tar", u"cJf", u"/tmp/papi.txz", u"--exclude=*.pyc",
212             installed_papi_glob, u"/usr/share/vpp/api"
213         ])
214         exec_cmd_no_error(node, [u"bash", u"-c", u"'" + inner_cmd + u"'"])
215         scp_node(node, root_path + u"/papi.txz", u"/tmp/papi.txz", get=True)
216         run([u"tar", u"xf", root_path + u"/papi.txz", u"-C", root_path])
217         cls.api_json_path = root_path + u"/usr/share/vpp/api"
218         # Perform initial checks before .api.json files are gone,
219         # by creating the checker instance.
220         cls.crc_checker = VppApiCrcChecker(cls.api_json_path)
221         # When present locally, we finally can find the installation path.
222         cls.api_package_path = glob.glob(root_path + installed_papi_glob)[0]
223         # Package path has to be one level above the vpp_papi directory.
224         cls.api_package_path = cls.api_package_path.rsplit(u"/", 1)[0]
225
226     def ensure_vpp_instance(self):
227         """Create or reuse a closed client instance, return it.
228
229         The instance is initialized for unix domain socket access,
230         it has initialized all the bindings, it is removed from the internal
231         list of disconnected instances, but it is not connected
232         (to a local socket) yet.
233
234         :returns: VPP client instance ready for connect.
235         :rtype: vpp_papi.VPPApiClient
236         """
237         self.ensure_api_dirs()
238         cls = self.__class__
239         if cls.reusable_vpp_client_list:
240             # Reuse in LIFO fashion.
241             *cls.reusable_vpp_client_list, ret = cls.reusable_vpp_client_list
242             return ret
243         # Creating an instance leads to dynamic imports from VPP PAPI code,
244         # so the package directory has to be present until the instance.
245         # But it is simpler to keep the package dir around.
246         try:
247             sys.path.append(cls.api_package_path)
248             # TODO: Pylint says import-outside-toplevel and import-error.
249             # It is right, we should refactor the code and move initialization
250             # of package outside.
251             from vpp_papi.vpp_papi import VPPApiClient as vpp_class
252             vpp_class.apidir = cls.api_json_path
253             # We need to create instance before removing from sys.path.
254             vpp_instance = vpp_class(
255                 use_socket=True, server_address=u"TBD", async_thread=False,
256                 read_timeout=14, logger=FilteredLogger(logger, u"INFO")
257             )
258             # Cannot use loglevel parameter, robot.api.logger lacks support.
259             # TODO: Stop overriding read_timeout when VPP-1722 is fixed.
260         finally:
261             if sys.path[-1] == cls.api_package_path:
262                 sys.path.pop()
263         return vpp_instance
264
265     @classmethod
266     def key_for_node_and_socket(cls, node, remote_socket):
267         """Return a hashable object to distinguish nodes.
268
269         The usual node object (of "dict" type) is not hashable,
270         and can contain mutable information (mostly virtual interfaces).
271         Use this method to get an object suitable for being a key in dict.
272
273         The fields to include are chosen by what ssh needs.
274
275         This class method is needed, for disconnect.
276
277         :param node: The node object to distinguish.
278         :param remote_socket: Path to remote socket.
279         :type node: dict
280         :type remote_socket: str
281         :return: Tuple of values distinguishing this node from similar ones.
282         :rtype: tuple of str
283         """
284         return (
285             node[u"host"],
286             node[u"port"],
287             remote_socket,
288             # TODO: Do we support sockets paths such as "~/vpp/api.socket"?
289             # If yes, add also:
290             # node[u"username"],
291         )
292
293     def key_for_self(self):
294         """Return a hashable object to distinguish nodes.
295
296         Just a wrapper around key_for_node_and_socket
297         which sets up proper arguments.
298
299         :return: Tuple of values distinguishing this node from similar ones.
300         :rtype: tuple of str
301         """
302         return self.__class__.key_for_node_and_socket(
303             self._node, self._remote_vpp_socket,
304         )
305
306     def set_connected_client(self, client):
307         """Add a connected client instance into cache.
308
309         This hides details of what the node key is.
310
311         If there already is a client for the computed key,
312         fail, as it is a sign of resource leakage.
313
314         :param client: VPP client instance in connected state.
315         :type client: vpp_papi.VPPApiClient
316         :raises RuntimeError: If related key already has a cached client.
317         """
318         key = self.key_for_self()
319         cache = self.__class__.conn_cache
320         if key in cache:
321             raise RuntimeError(f"Caching client with existing key: {key}")
322         cache[key] = client
323
324     def get_connected_client(self, check_connected=True):
325         """Return None or cached connected client.
326
327         If check_connected, RuntimeError is raised when the client is
328         not in cache. None is returned if client is not in cache
329         (and the check is disabled).
330
331         This hides details of what the node key is.
332
333         :param check_connected: Whether cache miss raises.
334         :type check_connected: bool
335         :returns: Connected client instance, or None if uncached and no check.
336         :rtype: Optional[vpp_papi.VPPApiClient]
337         :raises RuntimeError: If cache miss and check enabled.
338         """
339         key = self.key_for_self()
340         ret = self.__class__.conn_cache.get(key, None)
341
342         if ret is None:
343             if check_connected:
344                 raise RuntimeError(f"Client not cached for key: {key}")
345         else:
346             # When reading logs, it is good to see which VPP is accessed.
347             logger.debug(f"Activated cached PAPI client for key: {key}")
348         return ret
349
350     def __enter__(self):
351         """Create a tunnel, connect VPP instance.
352
353         If the connected client is in cache, return it.
354         Only if not, create a new (or reuse a disconnected) client instance.
355
356         Only at this point a local socket names are created
357         in a temporary directory, as CSIT can connect to multiple VPPs.
358
359         The following attributes are added to the client instance
360         to simplify caching and cleanup:
361         csit_temp_dir
362             - Temporary socket files are created here.
363         csit_control_socket
364             - This socket controls the local ssh process doing the forwarding.
365         csit_local_vpp_socket
366             - This is the forwarded socket to talk with remote VPP.
367
368         The attribute names do not start with underscore,
369         so pylint does not complain about accessing private attribute.
370         The attribute names start with csit_ to avoid naming conflicts
371         with "real" attributes from VPP Python code.
372
373         :returns: self
374         :rtype: PapiSocketExecutor
375         """
376         # Do we have the connected instance in the cache?
377         vpp_instance = self.get_connected_client(check_connected=False)
378         if vpp_instance is not None:
379             return self
380         # No luck, create and connect a new instance.
381         time_enter = time.time()
382         node = self._node
383         # Parsing takes longer than connecting, prepare instance before tunnel.
384         vpp_instance = self.ensure_vpp_instance()
385         # Store into cache as soon as possible.
386         # If connection fails, it is better to attempt disconnect anyway.
387         self.set_connected_client(vpp_instance)
388         # Set additional attributes.
389         vpp_instance.csit_temp_dir = tempfile.TemporaryDirectory(dir=u"/tmp")
390         temp_path = vpp_instance.csit_temp_dir.name
391         api_socket = temp_path + u"/vpp-api.sock"
392         vpp_instance.csit_local_vpp_socket = api_socket
393         ssh_socket = temp_path + u"/ssh.sock"
394         vpp_instance.csit_control_socket = ssh_socket
395         # Cleanup possibilities.
396         ret_code, _ = run([u"ls", ssh_socket], check=False)
397         if ret_code != 2:
398             # This branch never seems to be hit in CI,
399             # but may be useful when testing manually.
400             run(
401                 [u"ssh", u"-S", ssh_socket, u"-O", u"exit", u"0.0.0.0"],
402                 check=False, log=True
403             )
404             # TODO: Is any sleep necessary? How to prove if not?
405             run([u"sleep", u"0.1"])
406             run([u"rm", u"-vrf", ssh_socket])
407         # Even if ssh can perhaps reuse this file,
408         # we need to remove it for readiness detection to work correctly.
409         run([u"rm", u"-rvf", api_socket])
410         # We use sleep command. The ssh command will exit in 30 second,
411         # unless a local socket connection is established,
412         # in which case the ssh command will exit only when
413         # the ssh connection is closed again (via control socket).
414         # The log level is to suppress "Warning: Permanently added" messages.
415         ssh_cmd = [
416             u"ssh", u"-S", ssh_socket, u"-M", u"-L",
417             api_socket + u":" + self._remote_vpp_socket,
418             u"-p", str(node[u"port"]),
419             u"-o", u"LogLevel=ERROR",
420             u"-o", u"UserKnownHostsFile=/dev/null",
421             u"-o", u"StrictHostKeyChecking=no",
422             u"-o", u"ExitOnForwardFailure=yes",
423             node[u"username"] + u"@" + node[u"host"],
424             u"sleep", u"30"
425         ]
426         priv_key = node.get(u"priv_key")
427         if priv_key:
428             # This is tricky. We need a file to pass the value to ssh command.
429             # And we need ssh command, because paramiko does not support sockets
430             # (neither ssh_socket, nor _remote_vpp_socket).
431             key_file = tempfile.NamedTemporaryFile()
432             key_file.write(priv_key)
433             # Make sure the content is written, but do not close yet.
434             key_file.flush()
435             ssh_cmd[1:1] = [u"-i", key_file.name]
436         password = node.get(u"password")
437         if password:
438             # Prepend sshpass command to set password.
439             ssh_cmd[:0] = [u"sshpass", u"-p", password]
440         time_stop = time.time() + 10.0
441         # subprocess.Popen seems to be the best way to run commands
442         # on background. Other ways (shell=True with "&" and ssh with -f)
443         # seem to be too dependent on shell behavior.
444         # In particular, -f does NOT return values for run().
445         subprocess.Popen(ssh_cmd)
446         # Check socket presence on local side.
447         while time.time() < time_stop:
448             # It can take a moment for ssh to create the socket file.
449             ret_code, _ = run(
450                 [u"ls", u"-l", api_socket], check=False
451             )
452             if not ret_code:
453                 break
454             time.sleep(0.1)
455         else:
456             raise RuntimeError(u"Local side socket has not appeared.")
457         if priv_key:
458             # Socket up means the key has been read. Delete file by closing it.
459             key_file.close()
460         # Everything is ready, set the local socket address and connect.
461         vpp_instance.transport.server_address = api_socket
462         # It seems we can get read error even if every preceding check passed.
463         # Single retry seems to help.
464         for _ in range(2):
465             try:
466                 vpp_instance.connect_sync(u"csit_socket")
467             except (IOError, struct.error) as err:
468                 logger.warn(f"Got initial connect error {err!r}")
469                 vpp_instance.disconnect()
470             else:
471                 break
472         else:
473             raise RuntimeError(u"Failed to connect to VPP over a socket.")
474         logger.trace(
475             f"Establishing socket connection took {time.time()-time_enter}s"
476         )
477         return self
478
479     def __exit__(self, exc_type, exc_val, exc_tb):
480         """No-op, the client instance remains in cache in connected state."""
481
482     @classmethod
483     def disconnect_by_key(cls, key):
484         """Disconnect a connected client instance, noop it not connected.
485
486         Also remove the local sockets by deleting the temporary directory.
487         Put disconnected client instances to the reuse list.
488         The added attributes are not cleaned up,
489         as their values will get overwritten on next connect.
490
491         This method is useful for disconnect_all type of work.
492
493         :param key: Tuple identifying the node (and socket).
494         :type key: tuple of str
495         """
496         client_instance = cls.conn_cache.get(key, None)
497         if client_instance is None:
498             return
499         logger.debug(f"Disconnecting by key: {key}")
500         client_instance.disconnect()
501         run([
502             u"ssh", u"-S", client_instance.csit_control_socket, u"-O",
503             u"exit", u"0.0.0.0"
504         ], check=False)
505         # Temp dir has autoclean, but deleting explicitly
506         # as an error can happen.
507         try:
508             client_instance.csit_temp_dir.cleanup()
509         except FileNotFoundError:
510             # There is a race condition with ssh removing its ssh.sock file.
511             # Single retry should be enough to ensure the complete removal.
512             shutil.rmtree(client_instance.csit_temp_dir.name)
513         # Finally, put disconnected clients to reuse list.
514         cls.reusable_vpp_client_list.append(client_instance)
515         # Invalidate cache last. Repeated errors are better than silent leaks.
516         del cls.conn_cache[key]
517
518     @classmethod
519     def disconnect_by_node_and_socket(
520             cls, node, remote_socket=Constants.SOCKSVR_PATH
521         ):
522         """Disconnect a connected client instance, noop it not connected.
523
524         Also remove the local sockets by deleting the temporary directory.
525         Put disconnected client instances to the reuse list.
526         The added attributes are not cleaned up,
527         as their values will get overwritten on next connect.
528
529         Call this method just before killing/restarting remote VPP instance.
530         """
531         key = cls.key_for_node_and_socket(node, remote_socket)
532         return cls.disconnect_by_key(key)
533
534     @classmethod
535     def disconnect_all_sockets_by_node(cls, node):
536         """Disconnect all socket connected client instance.
537
538         Noop if not connected.
539
540         Also remove the local sockets by deleting the temporary directory.
541         Put disconnected client instances to the reuse list.
542         The added attributes are not cleaned up,
543         as their values will get overwritten on next connect.
544
545         Call this method just before killing/restarting remote VPP instance.
546         """
547         sockets = Topology.get_node_sockets(node, socket_type=SocketType.PAPI)
548         if sockets:
549             for socket in sockets.values():
550                 # TODO: Remove sockets from topology.
551                 PapiSocketExecutor.disconnect_by_node_and_socket(node, socket)
552         # Always attempt to disconnect the default socket.
553         return cls.disconnect_by_node_and_socket(node)
554
555     @staticmethod
556     def disconnect_all_papi_connections():
557         """Disconnect all connected client instances, tear down the SSH tunnels.
558
559         Also remove the local sockets by deleting the temporary directory.
560         Put disconnected client instances to the reuse list.
561         The added attributes are not cleaned up,
562         as their values will get overwritten on next connect.
563
564         This should be a class method,
565         but we prefer to call static methods from Robot.
566
567         Call this method just before killing/restarting all VPP instances.
568         """
569         cls = PapiSocketExecutor
570         # Iterate over copy of entries so deletions do not mess with iterator.
571         keys_copy = list(cls.conn_cache.keys())
572         for key in keys_copy:
573             cls.disconnect_by_key(key)
574
575     def add(self, csit_papi_command, history=True, **kwargs):
576         """Add next command to internal command list; return self.
577
578         Unless disabled, new entry to papi history is also added at this point.
579         The argument name 'csit_papi_command' must be unique enough as it cannot
580         be repeated in kwargs.
581         The kwargs dict is deep-copied, so it is safe to use the original
582         with partial modifications for subsequent commands.
583
584         Any pending conflicts from .api.json processing are raised.
585         Then the command name is checked for known CRCs.
586         Unsupported commands raise an exception, as CSIT change
587         should not start using messages without making sure which CRCs
588         are supported.
589         Each CRC issue is raised only once, so subsequent tests
590         can raise other issues.
591
592         :param csit_papi_command: VPP API command.
593         :param history: Enable/disable adding command to PAPI command history.
594         :param kwargs: Optional key-value arguments.
595         :type csit_papi_command: str
596         :type history: bool
597         :type kwargs: dict
598         :returns: self, so that method chaining is possible.
599         :rtype: PapiSocketExecutor
600         :raises RuntimeError: If unverified or conflicting CRC is encountered.
601         """
602         self.crc_checker.report_initial_conflicts()
603         if history:
604             PapiHistory.add_to_papi_history(
605                 self._node, csit_papi_command, **kwargs
606             )
607         self.crc_checker.check_api_name(csit_papi_command)
608         self._api_command_list.append(
609             dict(
610                 api_name=csit_papi_command,
611                 api_args=copy.deepcopy(kwargs)
612             )
613         )
614         return self
615
616     def get_replies(self, err_msg="Failed to get replies."):
617         """Get replies from VPP Python API.
618
619         The replies are parsed into dict-like objects,
620         "retval" field is guaranteed to be zero on success.
621
622         :param err_msg: The message used if the PAPI command(s) execution fails.
623         :type err_msg: str
624         :returns: Responses, dict objects with fields due to API and "retval".
625         :rtype: list of dict
626         :raises RuntimeError: If retval is nonzero, parsing or ssh error.
627         """
628         return self._execute(err_msg=err_msg)
629
630     def get_reply(self, err_msg=u"Failed to get reply."):
631         """Get reply from VPP Python API.
632
633         The reply is parsed into dict-like object,
634         "retval" field is guaranteed to be zero on success.
635
636         TODO: Discuss exception types to raise, unify with inner methods.
637
638         :param err_msg: The message used if the PAPI command(s) execution fails.
639         :type err_msg: str
640         :returns: Response, dict object with fields due to API and "retval".
641         :rtype: dict
642         :raises AssertionError: If retval is nonzero, parsing or ssh error.
643         """
644         replies = self.get_replies(err_msg=err_msg)
645         if len(replies) != 1:
646             raise RuntimeError(f"Expected single reply, got {replies!r}")
647         return replies[0]
648
649     def get_sw_if_index(self, err_msg=u"Failed to get reply."):
650         """Get sw_if_index from reply from VPP Python API.
651
652         Frequently, the caller is only interested in sw_if_index field
653         of the reply, this wrapper makes such call sites shorter.
654
655         TODO: Discuss exception types to raise, unify with inner methods.
656
657         :param err_msg: The message used if the PAPI command(s) execution fails.
658         :type err_msg: str
659         :returns: Response, sw_if_index value of the reply.
660         :rtype: int
661         :raises AssertionError: If retval is nonzero, parsing or ssh error.
662         """
663         reply = self.get_reply(err_msg=err_msg)
664         logger.trace(f"Getting index from {reply!r}")
665         return reply[u"sw_if_index"]
666
667     def get_details(self, err_msg="Failed to get dump details."):
668         """Get dump details from VPP Python API.
669
670         The details are parsed into dict-like objects.
671         The number of details per single dump command can vary,
672         and all association between details and dumps is lost,
673         so if you care about the association (as opposed to
674         logging everything at once for debugging purposes),
675         it is recommended to call get_details for each dump (type) separately.
676
677         :param err_msg: The message used if the PAPI command(s) execution fails.
678         :type err_msg: str
679         :returns: Details, dict objects with fields due to API without "retval".
680         :rtype: list of dict
681         """
682         return self._execute(err_msg)
683
684     @staticmethod
685     def run_cli_cmd(
686             node, cli_cmd, log=True, remote_vpp_socket=Constants.SOCKSVR_PATH):
687         """Run a CLI command as cli_inband, return the "reply" field of reply.
688
689         Optionally, log the field value.
690
691         :param node: Node to run command on.
692         :param cli_cmd: The CLI command to be run on the node.
693         :param remote_vpp_socket: Path to remote socket to tunnel to.
694         :param log: If True, the response is logged.
695         :type node: dict
696         :type remote_vpp_socket: str
697         :type cli_cmd: str
698         :type log: bool
699         :returns: CLI output.
700         :rtype: str
701         """
702         cmd = u"cli_inband"
703         args = dict(
704             cmd=cli_cmd
705         )
706         err_msg = f"Failed to run 'cli_inband {cli_cmd}' PAPI command " \
707             f"on host {node[u'host']}"
708
709         with PapiSocketExecutor(node, remote_vpp_socket) as papi_exec:
710             reply = papi_exec.add(cmd, **args).get_reply(err_msg)["reply"]
711         if log:
712             logger.info(
713                 f"{cli_cmd} ({node[u'host']} - {remote_vpp_socket}):\n"
714                 f"{reply.strip()}"
715             )
716         return reply
717
718     @staticmethod
719     def run_cli_cmd_on_all_sockets(node, cli_cmd, log=True):
720         """Run a CLI command as cli_inband, on all sockets in topology file.
721
722         :param node: Node to run command on.
723         :param cli_cmd: The CLI command to be run on the node.
724         :param log: If True, the response is logged.
725         :type node: dict
726         :type cli_cmd: str
727         :type log: bool
728         """
729         sockets = Topology.get_node_sockets(node, socket_type=SocketType.PAPI)
730         if sockets:
731             for socket in sockets.values():
732                 PapiSocketExecutor.run_cli_cmd(
733                     node, cli_cmd, log=log, remote_vpp_socket=socket
734                 )
735
736     @staticmethod
737     def dump_and_log(node, cmds):
738         """Dump and log requested information, return None.
739
740         :param node: DUT node.
741         :param cmds: Dump commands to be executed.
742         :type node: dict
743         :type cmds: list of str
744         """
745         with PapiSocketExecutor(node) as papi_exec:
746             for cmd in cmds:
747                 dump = papi_exec.add(cmd).get_details()
748                 logger.debug(f"{cmd}:\n{pformat(dump)}")
749
750     def _execute(self, err_msg=u"Undefined error message", exp_rv=0):
751         """Turn internal command list into data and execute; return replies.
752
753         This method also clears the internal command list.
754
755         IMPORTANT!
756         Do not use this method in L1 keywords. Use:
757         - get_replies()
758         - get_reply()
759         - get_sw_if_index()
760         - get_details()
761
762         :param err_msg: The message used if the PAPI command(s) execution fails.
763         :type err_msg: str
764         :returns: Papi responses parsed into a dict-like object,
765             with fields due to API (possibly including retval).
766         :rtype: list of dict
767         :raises RuntimeError: If the replies are not all correct.
768         """
769         vpp_instance = self.get_connected_client()
770         local_list = self._api_command_list
771         # Clear first as execution may fail.
772         self._api_command_list = list()
773         replies = list()
774         for command in local_list:
775             api_name = command[u"api_name"]
776             papi_fn = getattr(vpp_instance.api, api_name)
777             try:
778                 try:
779                     reply = papi_fn(**command[u"api_args"])
780                 except (IOError, struct.error) as err:
781                     # Occasionally an error happens, try reconnect.
782                     logger.warn(f"Reconnect after error: {err!r}")
783                     vpp_instance.disconnect()
784                     # Testing shows immediate reconnect fails.
785                     time.sleep(1)
786                     vpp_instance.connect_sync(u"csit_socket")
787                     logger.trace(u"Reconnected.")
788                     reply = papi_fn(**command[u"api_args"])
789             except (AttributeError, IOError, struct.error) as err:
790                 raise AssertionError(err_msg) from err
791             # *_dump commands return list of objects, convert, ordinary reply.
792             if not isinstance(reply, list):
793                 reply = [reply]
794             for item in reply:
795                 self.crc_checker.check_api_name(item.__class__.__name__)
796                 dict_item = dictize(item)
797                 if u"retval" in dict_item.keys():
798                     # *_details messages do not contain retval.
799                     retval = dict_item[u"retval"]
800                     if retval != exp_rv:
801                         # TODO: What exactly to log and raise here?
802                         raise AssertionError(
803                             f"Retval {retval!r} does not match expected "
804                             f"retval {exp_rv!r}"
805                         )
806                 replies.append(dict_item)
807         return replies
808
809
810 class Disconnector:
811     """Class for holding a single keyword."""
812
813     @staticmethod
814     def disconnect_all_papi_connections():
815         """Disconnect all connected client instances, tear down the SSH tunnels.
816
817         Also remove the local sockets by deleting the temporary directory.
818         Put disconnected client instances to the reuse list.
819         The added attributes are not cleaned up,
820         as their values will get overwritten on next connect.
821
822         Call this method just before killing/restarting all VPP instances.
823
824         This could be a class method of PapiSocketExecutor.
825         But Robot calls methods on instances, and it would be weird
826         to give node argument for constructor in import.
827         Also, as we have a class of the same name as the module,
828         the keywords defined on module level are not accessible.
829         """
830         cls = PapiSocketExecutor
831         # Iterate over copy of entries so deletions do not mess with iterator.
832         keys_copy = list(cls.conn_cache.keys())
833         for key in keys_copy:
834             cls.disconnect_by_key(key)
835
836
837 class PapiExecutor:
838     """Contains methods for executing VPP Python API commands on DUTs.
839
840     TODO: Remove .add step, make get_stats accept paths directly.
841
842     This class processes only one type of VPP PAPI methods: vpp-stats.
843
844     The recommended ways of use are (examples):
845
846     path = ['^/if', '/err/ip4-input', '/sys/node/ip4-input']
847     with PapiExecutor(node) as papi_exec:
848         stats = papi_exec.add(api_name='vpp-stats', path=path).get_stats()
849
850     print('RX interface core 0, sw_if_index 0:\n{0}'.\
851         format(stats[0]['/if/rx'][0][0]))
852
853     or
854
855     path_1 = ['^/if', ]
856     path_2 = ['^/if', '/err/ip4-input', '/sys/node/ip4-input']
857     with PapiExecutor(node) as papi_exec:
858         stats = papi_exec.add('vpp-stats', path=path_1).\
859             add('vpp-stats', path=path_2).get_stats()
860
861     print('RX interface core 0, sw_if_index 0:\n{0}'.\
862         format(stats[1]['/if/rx'][0][0]))
863
864     Note: In this case, when PapiExecutor method 'add' is used:
865     - its parameter 'csit_papi_command' is used only to keep information
866       that vpp-stats are requested. It is not further processed but it is
867       included in the PAPI history this way:
868       vpp-stats(path=['^/if', '/err/ip4-input', '/sys/node/ip4-input'])
869       Always use csit_papi_command="vpp-stats" if the VPP PAPI method
870       is "stats".
871     - the second parameter must be 'path' as it is used by PapiExecutor
872       method 'add'.
873     """
874
875     def __init__(self, node):
876         """Initialization.
877
878         :param node: Node to run command(s) on.
879         :type node: dict
880         """
881         # Node to run command(s) on.
882         self._node = node
883
884         # The list of PAPI commands to be executed on the node.
885         self._api_command_list = list()
886
887         self._ssh = SSH()
888
889     def __enter__(self):
890         try:
891             self._ssh.connect(self._node)
892         except IOError:
893             raise RuntimeError(
894                 f"Cannot open SSH connection to host {self._node[u'host']} "
895                 f"to execute PAPI command(s)"
896             )
897         return self
898
899     def __exit__(self, exc_type, exc_val, exc_tb):
900         self._ssh.disconnect(self._node)
901
902     def add(self, csit_papi_command=u"vpp-stats", history=True, **kwargs):
903         """Add next command to internal command list; return self.
904
905         The argument name 'csit_papi_command' must be unique enough as it cannot
906         be repeated in kwargs.
907         The kwargs dict is deep-copied, so it is safe to use the original
908         with partial modifications for subsequent commands.
909
910         :param csit_papi_command: VPP API command.
911         :param history: Enable/disable adding command to PAPI command history.
912         :param kwargs: Optional key-value arguments.
913         :type csit_papi_command: str
914         :type history: bool
915         :type kwargs: dict
916         :returns: self, so that method chaining is possible.
917         :rtype: PapiExecutor
918         """
919         if history:
920             PapiHistory.add_to_papi_history(
921                 self._node, csit_papi_command, **kwargs
922             )
923         self._api_command_list.append(
924             dict(
925                 api_name=csit_papi_command, api_args=copy.deepcopy(kwargs)
926             )
927         )
928         return self
929
930     def get_stats(
931             self, err_msg=u"Failed to get statistics.", timeout=120,
932             socket=Constants.SOCKSTAT_PATH):
933         """Get VPP Stats from VPP Python API.
934
935         :param err_msg: The message used if the PAPI command(s) execution fails.
936         :param timeout: Timeout in seconds.
937         :param socket: Path to Stats socket to tunnel to.
938         :type err_msg: str
939         :type timeout: int
940         :type socket: str
941         :returns: Requested VPP statistics.
942         :rtype: list of dict
943         """
944         paths = [cmd[u"api_args"][u"path"] for cmd in self._api_command_list]
945         self._api_command_list = list()
946
947         stdout = self._execute_papi(
948             paths, method=u"stats", err_msg=err_msg, timeout=timeout,
949             socket=socket
950         )
951
952         return json.loads(stdout)
953
954     @staticmethod
955     def _process_api_data(api_d):
956         """Process API data for smooth converting to JSON string.
957
958         Apply binascii.hexlify() method for string values.
959
960         :param api_d: List of APIs with their arguments.
961         :type api_d: list
962         :returns: List of APIs with arguments pre-processed for JSON.
963         :rtype: list
964         """
965
966         def process_value(val):
967             """Process value.
968
969             :param val: Value to be processed.
970             :type val: object
971             :returns: Processed value.
972             :rtype: dict or str or int
973             """
974             if isinstance(val, dict):
975                 for val_k, val_v in val.items():
976                     val[str(val_k)] = process_value(val_v)
977                 retval = val
978             elif isinstance(val, list):
979                 for idx, val_l in enumerate(val):
980                     val[idx] = process_value(val_l)
981                 retval = val
982             else:
983                 retval = val.encode().hex() if isinstance(val, str) else val
984             return retval
985
986         api_data_processed = list()
987         for api in api_d:
988             api_args_processed = dict()
989             for a_k, a_v in api[u"api_args"].items():
990                 api_args_processed[str(a_k)] = process_value(a_v)
991             api_data_processed.append(
992                 dict(
993                     api_name=api[u"api_name"],
994                     api_args=api_args_processed
995                 )
996             )
997         return api_data_processed
998
999     def _execute_papi(
1000             self, api_data, method=u"request", err_msg=u"", timeout=120,
1001             socket=None):
1002         """Execute PAPI command(s) on remote node and store the result.
1003
1004         :param api_data: List of APIs with their arguments.
1005         :param method: VPP Python API method. Supported methods are: 'request',
1006             'dump' and 'stats'.
1007         :param err_msg: The message used if the PAPI command(s) execution fails.
1008         :param timeout: Timeout in seconds.
1009         :type api_data: list
1010         :type method: str
1011         :type err_msg: str
1012         :type timeout: int
1013         :returns: Stdout from remote python utility, to be parsed by caller.
1014         :rtype: str
1015         :raises SSHTimeout: If PAPI command(s) execution has timed out.
1016         :raises RuntimeError: If PAPI executor failed due to another reason.
1017         :raises AssertionError: If PAPI command(s) execution has failed.
1018         """
1019         if not api_data:
1020             raise RuntimeError(u"No API data provided.")
1021
1022         json_data = json.dumps(api_data) \
1023             if method in (u"stats", u"stats_request") \
1024             else json.dumps(self._process_api_data(api_data))
1025
1026         sock = f" --socket {socket}" if socket else u""
1027         cmd = f"{Constants.REMOTE_FW_DIR}/{Constants.RESOURCES_PAPI_PROVIDER}" \
1028             f" --method {method} --data '{json_data}'{sock}"
1029         try:
1030             ret_code, stdout, _ = self._ssh.exec_command_sudo(
1031                 cmd=cmd, timeout=timeout, log_stdout_err=False
1032             )
1033         # TODO: Fail on non-empty stderr?
1034         except SSHTimeout:
1035             logger.error(
1036                 f"PAPI command(s) execution timeout on host "
1037                 f"{self._node[u'host']}:\n{api_data}"
1038             )
1039             raise
1040         except Exception as exc:
1041             raise RuntimeError(
1042                 f"PAPI command(s) execution on host {self._node[u'host']} "
1043                 f"failed: {api_data}"
1044             ) from exc
1045         if ret_code != 0:
1046             raise AssertionError(err_msg)
1047
1048         return stdout