perf: add TCP Nginx+LDPRELOAD suites
[csit.git] / resources / libraries / python / PapiExecutor.py
1 # Copyright (c) 2021 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Python API executor library.
15 """
16
17 import copy
18 import glob
19 import json
20 import shutil
21 import struct  # vpp-papi can raise struct.error
22 import subprocess
23 import sys
24 import tempfile
25 import time
26 from collections import UserDict
27
28
29 from pprint import pformat
30 from robot.api import logger
31
32 from resources.libraries.python.Constants import Constants
33 from resources.libraries.python.LocalExecution import run
34 from resources.libraries.python.FilteredLogger import FilteredLogger
35 from resources.libraries.python.PapiHistory import PapiHistory
36 from resources.libraries.python.ssh import (
37     SSH, SSHTimeout, exec_cmd_no_error, scp_node)
38 from resources.libraries.python.topology import Topology, SocketType
39 from resources.libraries.python.VppApiCrc import VppApiCrcChecker
40
41
42 __all__ = [
43     u"PapiExecutor",
44     u"PapiSocketExecutor",
45     u"Disconnector",
46 ]
47
48
49 def dictize(obj):
50     """A helper method, to make namedtuple-like object accessible as dict.
51
52     If the object is namedtuple-like, its _asdict() form is returned,
53     but in the returned object __getitem__ method is wrapped
54     to dictize also any items returned.
55     If the object does not have _asdict, it will be returned without any change.
56     Integer keys still access the object as tuple.
57
58     A more useful version would be to keep obj mostly as a namedtuple,
59     just add getitem for string keys. Unfortunately, namedtuple inherits
60     from tuple, including its read-only __getitem__ attribute,
61     so we cannot monkey-patch it.
62
63     TODO: Create a proxy for named tuple to allow that.
64
65     :param obj: Arbitrary object to dictize.
66     :type obj: object
67     :returns: Dictized object.
68     :rtype: same as obj type or collections.OrderedDict
69     """
70     if not hasattr(obj, u"_asdict"):
71         return obj
72     overriden = UserDict(obj._asdict())
73     old_get = overriden.__getitem__
74     new_get = lambda self, key: dictize(old_get(self, key))
75     overriden.__getitem__ = new_get
76     return overriden
77
78
79 class PapiSocketExecutor:
80     """Methods for executing VPP Python API commands on forwarded socket.
81
82     Previously, we used an implementation with single client instance
83     and connection being handled by a resource manager.
84     On "with" statement, the instance connected, and disconnected
85     on exit from the "with" block.
86     This was limiting (no nested with blocks) and mainly it was slow:
87     0.7 seconds per disconnect cycle on Skylake, more than 3 second on Taishan.
88
89     The currently used implementation caches the connected client instances,
90     providing speedup and making "with" blocks unnecessary.
91     But with many call sites, "with" blocks are still the main usage pattern.
92     Documentation still lists that as the intended pattern.
93
94     As a downside, clients need to be explicitly told to disconnect
95     before VPP restart.
96     There is some amount of retries and disconnects on disconnect
97     (so unresponsive VPPs do not breach test much more than needed),
98     but it is hard to verify all that works correctly.
99     Especially, if Robot crashes, files and ssh processes may leak.
100
101     Delay for accepting socket connection is 10s.
102     TODO: Decrease 10s to value that is long enough for creating connection
103     and short enough to not affect performance.
104
105     The current implementation downloads and parses .api.json files only once
106     and caches client instances for reuse.
107     Cleanup metadata is added as additional attributes
108     directly to client instances.
109
110     The current implementation seems to run into read error occasionally.
111     Not sure if the error is in Python code on Robot side, ssh forwarding,
112     or socket handling at VPP side. Anyway, reconnect after some sleep
113     seems to help, hoping repeated command execution does not lead to surprises.
114     The reconnection is logged at WARN level, so it is prominently shown
115     in log.html, so we can see how frequently it happens.
116
117     TODO: Support handling of retval!=0 without try/except in caller.
118
119     Note: Use only with "with" statement, e.g.:
120
121         cmd = 'show_version'
122         with PapiSocketExecutor(node) as papi_exec:
123             reply = papi_exec.add(cmd).get_reply(err_msg)
124
125     This class processes two classes of VPP PAPI methods:
126     1. Simple request / reply: method='request'.
127     2. Dump functions: method='dump'.
128
129     Note that access to VPP stats over socket is not supported yet.
130
131     The recommended ways of use are (examples):
132
133     1. Simple request / reply
134
135     a. One request with no arguments:
136
137         cmd = 'show_version'
138         with PapiSocketExecutor(node) as papi_exec:
139             reply = papi_exec.add(cmd).get_reply(err_msg)
140
141     b. Three requests with arguments, the second and the third ones are the same
142        but with different arguments.
143
144         with PapiSocketExecutor(node) as papi_exec:
145             replies = papi_exec.add(cmd1, **args1).add(cmd2, **args2).\
146                 add(cmd2, **args3).get_replies(err_msg)
147
148     2. Dump functions
149
150         cmd = 'sw_interface_rx_placement_dump'
151         with PapiSocketExecutor(node) as papi_exec:
152             details = papi_exec.add(cmd, sw_if_index=ifc['vpp_sw_index']).\
153                 get_details(err_msg)
154     """
155
156     # Class cache for reuse between instances.
157     api_root_dir = None
158     """We copy .api json files and PAPI code from DUT to robot machine.
159     This class variable holds temporary directory once created.
160     When python exits, the directory is deleted, so no downloaded file leaks.
161     The value will be set to TemporaryDirectory class instance (not string path)
162     to ensure deletion at exit."""
163     api_json_path = None
164     """String path to .api.json files, a directory somewhere in api_root_dir."""
165     api_package_path = None
166     """String path to PAPI code, a different directory under api_root_dir."""
167     crc_checker = None
168     """Accesses .api.json files at creation, caching speeds up accessing it."""
169     reusable_vpp_client_list = list()
170     """Each connection needs a separate client instance,
171     and each client instance creation needs to parse all .api files,
172     which takes time. If a client instance disconnects, it is put here,
173     so on next connect we can reuse intead of creating new."""
174     conn_cache = dict()
175     """Mapping from node key to connected client instance."""
176
177     def __init__(self, node, remote_vpp_socket=Constants.SOCKSVR_PATH):
178         """Store the given arguments, declare managed variables.
179
180         :param node: Node to connect to and forward unix domain socket from.
181         :param remote_vpp_socket: Path to remote socket to tunnel to.
182         :type node: dict
183         :type remote_vpp_socket: str
184         """
185         self._node = node
186         self._remote_vpp_socket = remote_vpp_socket
187         # The list of PAPI commands to be executed on the node.
188         self._api_command_list = list()
189
190     def ensure_api_dirs(self):
191         """Copy files from DUT to local temporary directory.
192
193         If the directory is still there, do not copy again.
194         If copying, also initialize CRC checker (this also performs
195         static checks), and remember PAPI package path.
196         Do not add that to PATH yet.
197         """
198         cls = self.__class__
199         if cls.api_package_path:
200             return
201         cls.api_root_dir = tempfile.TemporaryDirectory(dir=u"/tmp")
202         root_path = cls.api_root_dir.name
203         # Pack, copy and unpack Python part of VPP installation from _node.
204         # TODO: Use rsync or recursive version of ssh.scp_node instead?
205         node = self._node
206         exec_cmd_no_error(node, [u"rm", u"-rf", u"/tmp/papi.txz"])
207         # Papi python version depends on OS (and time).
208         # Python 2.7 or 3.4, site-packages or dist-packages.
209         installed_papi_glob = u"/usr/lib/python3*/*-packages/vpp_papi"
210         # We need to wrap this command in bash, in order to expand globs,
211         # and as ssh does join, the inner command has to be quoted.
212         inner_cmd = u" ".join([
213             u"tar", u"cJf", u"/tmp/papi.txz", u"--exclude=*.pyc",
214             installed_papi_glob, u"/usr/share/vpp/api"
215         ])
216         exec_cmd_no_error(node, [u"bash", u"-c", u"'" + inner_cmd + u"'"])
217         scp_node(node, root_path + u"/papi.txz", u"/tmp/papi.txz", get=True)
218         run([u"tar", u"xf", root_path + u"/papi.txz", u"-C", root_path])
219         cls.api_json_path = root_path + u"/usr/share/vpp/api"
220         # Perform initial checks before .api.json files are gone,
221         # by creating the checker instance.
222         cls.crc_checker = VppApiCrcChecker(cls.api_json_path)
223         # When present locally, we finally can find the installation path.
224         cls.api_package_path = glob.glob(root_path + installed_papi_glob)[0]
225         # Package path has to be one level above the vpp_papi directory.
226         cls.api_package_path = cls.api_package_path.rsplit(u"/", 1)[0]
227
228     def ensure_vpp_instance(self):
229         """Create or reuse a closed client instance, return it.
230
231         The instance is initialized for unix domain socket access,
232         it has initialized all the bindings, it is removed from the internal
233         list of disconnected instances, but it is not connected
234         (to a local socket) yet.
235
236         :returns: VPP client instance ready for connect.
237         :rtype: vpp_papi.VPPApiClient
238         """
239         self.ensure_api_dirs()
240         cls = self.__class__
241         if cls.reusable_vpp_client_list:
242             # Reuse in LIFO fashion.
243             *cls.reusable_vpp_client_list, ret = cls.reusable_vpp_client_list
244             return ret
245         # Creating an instance leads to dynamic imports from VPP PAPI code,
246         # so the package directory has to be present until the instance.
247         # But it is simpler to keep the package dir around.
248         try:
249             sys.path.append(cls.api_package_path)
250             # TODO: Pylint says import-outside-toplevel and import-error.
251             # It is right, we should refactor the code and move initialization
252             # of package outside.
253             from vpp_papi.vpp_papi import VPPApiClient as vpp_class
254             vpp_class.apidir = cls.api_json_path
255             # We need to create instance before removing from sys.path.
256             vpp_instance = vpp_class(
257                 use_socket=True, server_address=u"TBD", async_thread=False,
258                 read_timeout=14, logger=FilteredLogger(logger, u"INFO")
259             )
260             # Cannot use loglevel parameter, robot.api.logger lacks support.
261             # TODO: Stop overriding read_timeout when VPP-1722 is fixed.
262         finally:
263             if sys.path[-1] == cls.api_package_path:
264                 sys.path.pop()
265         return vpp_instance
266
267     @classmethod
268     def key_for_node_and_socket(cls, node, remote_socket):
269         """Return a hashable object to distinguish nodes.
270
271         The usual node object (of "dict" type) is not hashable,
272         and can contain mutable information (mostly virtual interfaces).
273         Use this method to get an object suitable for being a key in dict.
274
275         The fields to include are chosen by what ssh needs.
276
277         This class method is needed, for disconnect.
278
279         :param node: The node object to distinguish.
280         :param remote_socket: Path to remote socket.
281         :type node: dict
282         :type remote_socket: str
283         :return: Tuple of values distinguishing this node from similar ones.
284         :rtype: tuple of str
285         """
286         return (
287             node[u"host"],
288             node[u"port"],
289             remote_socket,
290             # TODO: Do we support sockets paths such as "~/vpp/api.socket"?
291             # If yes, add also:
292             # node[u"username"],
293         )
294
295     def key_for_self(self):
296         """Return a hashable object to distinguish nodes.
297
298         Just a wrapper around key_for_node_and_socket
299         which sets up proper arguments.
300
301         :return: Tuple of values distinguishing this node from similar ones.
302         :rtype: tuple of str
303         """
304         return self.__class__.key_for_node_and_socket(
305             self._node, self._remote_vpp_socket,
306         )
307
308     def set_connected_client(self, client):
309         """Add a connected client instance into cache.
310
311         This hides details of what the node key is.
312
313         If there already is a client for the computed key,
314         fail, as it is a sign of resource leakage.
315
316         :param client: VPP client instance in connected state.
317         :type client: vpp_papi.VPPApiClient
318         :raises RuntimeError: If related key already has a cached client.
319         """
320         key = self.key_for_self()
321         cache = self.__class__.conn_cache
322         if key in cache:
323             raise RuntimeError(f"Caching client with existing key: {key}")
324         cache[key] = client
325
326     def get_connected_client(self, check_connected=True):
327         """Return None or cached connected client.
328
329         If check_connected, RuntimeError is raised when the client is
330         not in cache. None is returned if client is not in cache
331         (and the check is disabled).
332
333         This hides details of what the node key is.
334
335         :param check_connected: Whether cache miss raises.
336         :type check_connected: bool
337         :returns: Connected client instance, or None if uncached and no check.
338         :rtype: Optional[vpp_papi.VPPApiClient]
339         :raises RuntimeError: If cache miss and check enabled.
340         """
341         key = self.key_for_self()
342         ret = self.__class__.conn_cache.get(key, None)
343
344         if ret is None:
345             if check_connected:
346                 raise RuntimeError(f"Client not cached for key: {key}")
347         else:
348             # When reading logs, it is good to see which VPP is accessed.
349             logger.debug(f"Activated cached PAPI client for key: {key}")
350         return ret
351
352     def __enter__(self):
353         """Create a tunnel, connect VPP instance.
354
355         If the connected client is in cache, return it.
356         Only if not, create a new (or reuse a disconnected) client instance.
357
358         Only at this point a local socket names are created
359         in a temporary directory, as CSIT can connect to multiple VPPs.
360
361         The following attributes are added to the client instance
362         to simplify caching and cleanup:
363         csit_temp_dir
364             - Temporary socket files are created here.
365         csit_control_socket
366             - This socket controls the local ssh process doing the forwarding.
367         csit_local_vpp_socket
368             - This is the forwarded socket to talk with remote VPP.
369
370         The attribute names do not start with underscore,
371         so pylint does not complain about accessing private attribute.
372         The attribute names start with csit_ to avoid naming conflicts
373         with "real" attributes from VPP Python code.
374
375         :returns: self
376         :rtype: PapiSocketExecutor
377         """
378         # Do we have the connected instance in the cache?
379         vpp_instance = self.get_connected_client(check_connected=False)
380         if vpp_instance is not None:
381             return self
382         # No luck, create and connect a new instance.
383         time_enter = time.time()
384         node = self._node
385         # Parsing takes longer than connecting, prepare instance before tunnel.
386         vpp_instance = self.ensure_vpp_instance()
387         # Store into cache as soon as possible.
388         # If connection fails, it is better to attempt disconnect anyway.
389         self.set_connected_client(vpp_instance)
390         # Set additional attributes.
391         vpp_instance.csit_temp_dir = tempfile.TemporaryDirectory(dir=u"/tmp")
392         temp_path = vpp_instance.csit_temp_dir.name
393         api_socket = temp_path + u"/vpp-api.sock"
394         vpp_instance.csit_local_vpp_socket = api_socket
395         ssh_socket = temp_path + u"/ssh.sock"
396         vpp_instance.csit_control_socket = ssh_socket
397         # Cleanup possibilities.
398         ret_code, _ = run([u"ls", ssh_socket], check=False)
399         if ret_code != 2:
400             # This branch never seems to be hit in CI,
401             # but may be useful when testing manually.
402             run(
403                 [u"ssh", u"-S", ssh_socket, u"-O", u"exit", u"0.0.0.0"],
404                 check=False, log=True
405             )
406             # TODO: Is any sleep necessary? How to prove if not?
407             run([u"sleep", u"0.1"])
408             run([u"rm", u"-vrf", ssh_socket])
409         # Even if ssh can perhaps reuse this file,
410         # we need to remove it for readiness detection to work correctly.
411         run([u"rm", u"-rvf", api_socket])
412         # We use sleep command. The ssh command will exit in 30 second,
413         # unless a local socket connection is established,
414         # in which case the ssh command will exit only when
415         # the ssh connection is closed again (via control socket).
416         # The log level is to suppress "Warning: Permanently added" messages.
417         ssh_cmd = [
418             u"ssh", u"-S", ssh_socket, u"-M", u"-L",
419             api_socket + u":" + self._remote_vpp_socket,
420             u"-p", str(node[u"port"]),
421             u"-o", u"LogLevel=ERROR",
422             u"-o", u"UserKnownHostsFile=/dev/null",
423             u"-o", u"StrictHostKeyChecking=no",
424             u"-o", u"ExitOnForwardFailure=yes",
425             node[u"username"] + u"@" + node[u"host"],
426             u"sleep", u"30"
427         ]
428         priv_key = node.get(u"priv_key")
429         if priv_key:
430             # This is tricky. We need a file to pass the value to ssh command.
431             # And we need ssh command, because paramiko does not support sockets
432             # (neither ssh_socket, nor _remote_vpp_socket).
433             key_file = tempfile.NamedTemporaryFile()
434             key_file.write(priv_key)
435             # Make sure the content is written, but do not close yet.
436             key_file.flush()
437             ssh_cmd[1:1] = [u"-i", key_file.name]
438         password = node.get(u"password")
439         if password:
440             # Prepend sshpass command to set password.
441             ssh_cmd[:0] = [u"sshpass", u"-p", password]
442         time_stop = time.time() + 10.0
443         # subprocess.Popen seems to be the best way to run commands
444         # on background. Other ways (shell=True with "&" and ssh with -f)
445         # seem to be too dependent on shell behavior.
446         # In particular, -f does NOT return values for run().
447         subprocess.Popen(ssh_cmd)
448         # Check socket presence on local side.
449         while time.time() < time_stop:
450             # It can take a moment for ssh to create the socket file.
451             ret_code, _ = run(
452                 [u"ls", u"-l", api_socket], check=False
453             )
454             if not ret_code:
455                 break
456             time.sleep(0.1)
457         else:
458             raise RuntimeError(u"Local side socket has not appeared.")
459         if priv_key:
460             # Socket up means the key has been read. Delete file by closing it.
461             key_file.close()
462         # Everything is ready, set the local socket address and connect.
463         vpp_instance.transport.server_address = api_socket
464         # It seems we can get read error even if every preceding check passed.
465         # Single retry seems to help.
466         for _ in range(2):
467             try:
468                 vpp_instance.connect_sync(u"csit_socket")
469             except (IOError, struct.error) as err:
470                 logger.warn(f"Got initial connect error {err!r}")
471                 vpp_instance.disconnect()
472             else:
473                 break
474         else:
475             raise RuntimeError(u"Failed to connect to VPP over a socket.")
476         logger.trace(
477             f"Establishing socket connection took {time.time()-time_enter}s"
478         )
479         return self
480
481     def __exit__(self, exc_type, exc_val, exc_tb):
482         """No-op, the client instance remains in cache in connected state."""
483
484     @classmethod
485     def disconnect_by_key(cls, key):
486         """Disconnect a connected client instance, noop it not connected.
487
488         Also remove the local sockets by deleting the temporary directory.
489         Put disconnected client instances to the reuse list.
490         The added attributes are not cleaned up,
491         as their values will get overwritten on next connect.
492
493         This method is useful for disconnect_all type of work.
494
495         :param key: Tuple identifying the node (and socket).
496         :type key: tuple of str
497         """
498         client_instance = cls.conn_cache.get(key, None)
499         if client_instance is None:
500             return
501         logger.debug(f"Disconnecting by key: {key}")
502         client_instance.disconnect()
503         run([
504             u"ssh", u"-S", client_instance.csit_control_socket, u"-O",
505             u"exit", u"0.0.0.0"
506         ], check=False)
507         # Temp dir has autoclean, but deleting explicitly
508         # as an error can happen.
509         try:
510             client_instance.csit_temp_dir.cleanup()
511         except FileNotFoundError:
512             # There is a race condition with ssh removing its ssh.sock file.
513             # Single retry should be enough to ensure the complete removal.
514             shutil.rmtree(client_instance.csit_temp_dir.name)
515         # Finally, put disconnected clients to reuse list.
516         cls.reusable_vpp_client_list.append(client_instance)
517         # Invalidate cache last. Repeated errors are better than silent leaks.
518         del cls.conn_cache[key]
519
520     @classmethod
521     def disconnect_by_node_and_socket(
522             cls, node, remote_socket=Constants.SOCKSVR_PATH
523         ):
524         """Disconnect a connected client instance, noop it not connected.
525
526         Also remove the local sockets by deleting the temporary directory.
527         Put disconnected client instances to the reuse list.
528         The added attributes are not cleaned up,
529         as their values will get overwritten on next connect.
530
531         Call this method just before killing/restarting remote VPP instance.
532         """
533         key = cls.key_for_node_and_socket(node, remote_socket)
534         return cls.disconnect_by_key(key)
535
536     @classmethod
537     def disconnect_all_sockets_by_node(cls, node):
538         """Disconnect all socket connected client instance.
539
540         Noop if not connected.
541
542         Also remove the local sockets by deleting the temporary directory.
543         Put disconnected client instances to the reuse list.
544         The added attributes are not cleaned up,
545         as their values will get overwritten on next connect.
546
547         Call this method just before killing/restarting remote VPP instance.
548         """
549         sockets = Topology.get_node_sockets(node, socket_type=SocketType.PAPI)
550         if sockets:
551             for socket in sockets.values():
552                 # TODO: Remove sockets from topology.
553                 PapiSocketExecutor.disconnect_by_node_and_socket(node, socket)
554         # Always attempt to disconnect the default socket.
555         return cls.disconnect_by_node_and_socket(node)
556
557     @staticmethod
558     def disconnect_all_papi_connections():
559         """Disconnect all connected client instances, tear down the SSH tunnels.
560
561         Also remove the local sockets by deleting the temporary directory.
562         Put disconnected client instances to the reuse list.
563         The added attributes are not cleaned up,
564         as their values will get overwritten on next connect.
565
566         This should be a class method,
567         but we prefer to call static methods from Robot.
568
569         Call this method just before killing/restarting all VPP instances.
570         """
571         cls = PapiSocketExecutor
572         # Iterate over copy of entries so deletions do not mess with iterator.
573         keys_copy = list(cls.conn_cache.keys())
574         for key in keys_copy:
575             cls.disconnect_by_key(key)
576
577     def add(self, csit_papi_command, history=True, **kwargs):
578         """Add next command to internal command list; return self.
579
580         Unless disabled, new entry to papi history is also added at this point.
581         The argument name 'csit_papi_command' must be unique enough as it cannot
582         be repeated in kwargs.
583         The kwargs dict is deep-copied, so it is safe to use the original
584         with partial modifications for subsequent commands.
585
586         Any pending conflicts from .api.json processing are raised.
587         Then the command name is checked for known CRCs.
588         Unsupported commands raise an exception, as CSIT change
589         should not start using messages without making sure which CRCs
590         are supported.
591         Each CRC issue is raised only once, so subsequent tests
592         can raise other issues.
593
594         :param csit_papi_command: VPP API command.
595         :param history: Enable/disable adding command to PAPI command history.
596         :param kwargs: Optional key-value arguments.
597         :type csit_papi_command: str
598         :type history: bool
599         :type kwargs: dict
600         :returns: self, so that method chaining is possible.
601         :rtype: PapiSocketExecutor
602         :raises RuntimeError: If unverified or conflicting CRC is encountered.
603         """
604         self.crc_checker.report_initial_conflicts()
605         if history:
606             PapiHistory.add_to_papi_history(
607                 self._node, csit_papi_command, **kwargs
608             )
609         self.crc_checker.check_api_name(csit_papi_command)
610         self._api_command_list.append(
611             dict(
612                 api_name=csit_papi_command,
613                 api_args=copy.deepcopy(kwargs)
614             )
615         )
616         return self
617
618     def get_replies(self, err_msg="Failed to get replies."):
619         """Get replies from VPP Python API.
620
621         The replies are parsed into dict-like objects,
622         "retval" field is guaranteed to be zero on success.
623
624         :param err_msg: The message used if the PAPI command(s) execution fails.
625         :type err_msg: str
626         :returns: Responses, dict objects with fields due to API and "retval".
627         :rtype: list of dict
628         :raises RuntimeError: If retval is nonzero, parsing or ssh error.
629         """
630         return self._execute(err_msg=err_msg)
631
632     def get_reply(self, err_msg=u"Failed to get reply."):
633         """Get reply from VPP Python API.
634
635         The reply is parsed into dict-like object,
636         "retval" field is guaranteed to be zero on success.
637
638         TODO: Discuss exception types to raise, unify with inner methods.
639
640         :param err_msg: The message used if the PAPI command(s) execution fails.
641         :type err_msg: str
642         :returns: Response, dict object with fields due to API and "retval".
643         :rtype: dict
644         :raises AssertionError: If retval is nonzero, parsing or ssh error.
645         """
646         replies = self.get_replies(err_msg=err_msg)
647         if len(replies) != 1:
648             raise RuntimeError(f"Expected single reply, got {replies!r}")
649         return replies[0]
650
651     def get_sw_if_index(self, err_msg=u"Failed to get reply."):
652         """Get sw_if_index from reply from VPP Python API.
653
654         Frequently, the caller is only interested in sw_if_index field
655         of the reply, this wrapper makes such call sites shorter.
656
657         TODO: Discuss exception types to raise, unify with inner methods.
658
659         :param err_msg: The message used if the PAPI command(s) execution fails.
660         :type err_msg: str
661         :returns: Response, sw_if_index value of the reply.
662         :rtype: int
663         :raises AssertionError: If retval is nonzero, parsing or ssh error.
664         """
665         reply = self.get_reply(err_msg=err_msg)
666         logger.trace(f"Getting index from {reply!r}")
667         return reply[u"sw_if_index"]
668
669     def get_details(self, err_msg="Failed to get dump details."):
670         """Get dump details from VPP Python API.
671
672         The details are parsed into dict-like objects.
673         The number of details per single dump command can vary,
674         and all association between details and dumps is lost,
675         so if you care about the association (as opposed to
676         logging everything at once for debugging purposes),
677         it is recommended to call get_details for each dump (type) separately.
678
679         :param err_msg: The message used if the PAPI command(s) execution fails.
680         :type err_msg: str
681         :returns: Details, dict objects with fields due to API without "retval".
682         :rtype: list of dict
683         """
684         return self._execute(err_msg)
685
686     @staticmethod
687     def run_cli_cmd(
688             node, cli_cmd, log=True, remote_vpp_socket=Constants.SOCKSVR_PATH):
689         """Run a CLI command as cli_inband, return the "reply" field of reply.
690
691         Optionally, log the field value.
692
693         :param node: Node to run command on.
694         :param cli_cmd: The CLI command to be run on the node.
695         :param remote_vpp_socket: Path to remote socket to tunnel to.
696         :param log: If True, the response is logged.
697         :type node: dict
698         :type remote_vpp_socket: str
699         :type cli_cmd: str
700         :type log: bool
701         :returns: CLI output.
702         :rtype: str
703         """
704         cmd = u"cli_inband"
705         args = dict(
706             cmd=cli_cmd
707         )
708         err_msg = f"Failed to run 'cli_inband {cli_cmd}' PAPI command " \
709             f"on host {node[u'host']}"
710
711         with PapiSocketExecutor(node, remote_vpp_socket) as papi_exec:
712             reply = papi_exec.add(cmd, **args).get_reply(err_msg)["reply"]
713         if log:
714             logger.info(
715                 f"{cli_cmd} ({node[u'host']} - {remote_vpp_socket}):\n"
716                 f"{reply.strip()}"
717             )
718         return reply
719
720     @staticmethod
721     def run_cli_cmd_on_all_sockets(node, cli_cmd, log=True):
722         """Run a CLI command as cli_inband, on all sockets in topology file.
723
724         :param node: Node to run command on.
725         :param cli_cmd: The CLI command to be run on the node.
726         :param log: If True, the response is logged.
727         :type node: dict
728         :type cli_cmd: str
729         :type log: bool
730         """
731         sockets = Topology.get_node_sockets(node, socket_type=SocketType.PAPI)
732         if sockets:
733             for socket in sockets.values():
734                 PapiSocketExecutor.run_cli_cmd(
735                     node, cli_cmd, log=log, remote_vpp_socket=socket
736                 )
737
738     @staticmethod
739     def dump_and_log(node, cmds):
740         """Dump and log requested information, return None.
741
742         :param node: DUT node.
743         :param cmds: Dump commands to be executed.
744         :type node: dict
745         :type cmds: list of str
746         """
747         with PapiSocketExecutor(node) as papi_exec:
748             for cmd in cmds:
749                 dump = papi_exec.add(cmd).get_details()
750                 logger.debug(f"{cmd}:\n{pformat(dump)}")
751
752     def _execute(self, err_msg=u"Undefined error message", exp_rv=0):
753         """Turn internal command list into data and execute; return replies.
754
755         This method also clears the internal command list.
756
757         IMPORTANT!
758         Do not use this method in L1 keywords. Use:
759         - get_replies()
760         - get_reply()
761         - get_sw_if_index()
762         - get_details()
763
764         :param err_msg: The message used if the PAPI command(s) execution fails.
765         :type err_msg: str
766         :returns: Papi responses parsed into a dict-like object,
767             with fields due to API (possibly including retval).
768         :rtype: list of dict
769         :raises RuntimeError: If the replies are not all correct.
770         """
771         vpp_instance = self.get_connected_client()
772         local_list = self._api_command_list
773         # Clear first as execution may fail.
774         self._api_command_list = list()
775         replies = list()
776         for command in local_list:
777             api_name = command[u"api_name"]
778             papi_fn = getattr(vpp_instance.api, api_name)
779             try:
780                 try:
781                     reply = papi_fn(**command[u"api_args"])
782                 except (IOError, struct.error) as err:
783                     # Occasionally an error happens, try reconnect.
784                     logger.warn(f"Reconnect after error: {err!r}")
785                     vpp_instance.disconnect()
786                     # Testing shows immediate reconnect fails.
787                     time.sleep(1)
788                     vpp_instance.connect_sync(u"csit_socket")
789                     logger.trace(u"Reconnected.")
790                     reply = papi_fn(**command[u"api_args"])
791             except (AttributeError, IOError, struct.error) as err:
792                 raise AssertionError(err_msg) from err
793             # *_dump commands return list of objects, convert, ordinary reply.
794             if not isinstance(reply, list):
795                 reply = [reply]
796             for item in reply:
797                 self.crc_checker.check_api_name(item.__class__.__name__)
798                 dict_item = dictize(item)
799                 if u"retval" in dict_item.keys():
800                     # *_details messages do not contain retval.
801                     retval = dict_item[u"retval"]
802                     if retval != exp_rv:
803                         # TODO: What exactly to log and raise here?
804                         raise AssertionError(
805                             f"Retval {retval!r} does not match expected "
806                             f"retval {exp_rv!r}"
807                         )
808                 replies.append(dict_item)
809         return replies
810
811
812 class Disconnector:
813     """Class for holding a single keyword."""
814
815     @staticmethod
816     def disconnect_all_papi_connections():
817         """Disconnect all connected client instances, tear down the SSH tunnels.
818
819         Also remove the local sockets by deleting the temporary directory.
820         Put disconnected client instances to the reuse list.
821         The added attributes are not cleaned up,
822         as their values will get overwritten on next connect.
823
824         Call this method just before killing/restarting all VPP instances.
825
826         This could be a class method of PapiSocketExecutor.
827         But Robot calls methods on instances, and it would be weird
828         to give node argument for constructor in import.
829         Also, as we have a class of the same name as the module,
830         the keywords defined on module level are not accessible.
831         """
832         cls = PapiSocketExecutor
833         # Iterate over copy of entries so deletions do not mess with iterator.
834         keys_copy = list(cls.conn_cache.keys())
835         for key in keys_copy:
836             cls.disconnect_by_key(key)
837
838
839 class PapiExecutor:
840     """Contains methods for executing VPP Python API commands on DUTs.
841
842     TODO: Remove .add step, make get_stats accept paths directly.
843
844     This class processes only one type of VPP PAPI methods: vpp-stats.
845
846     The recommended ways of use are (examples):
847
848     path = ['^/if', '/err/ip4-input', '/sys/node/ip4-input']
849     with PapiExecutor(node) as papi_exec:
850         stats = papi_exec.add(api_name='vpp-stats', path=path).get_stats()
851
852     print('RX interface core 0, sw_if_index 0:\n{0}'.\
853         format(stats[0]['/if/rx'][0][0]))
854
855     or
856
857     path_1 = ['^/if', ]
858     path_2 = ['^/if', '/err/ip4-input', '/sys/node/ip4-input']
859     with PapiExecutor(node) as papi_exec:
860         stats = papi_exec.add('vpp-stats', path=path_1).\
861             add('vpp-stats', path=path_2).get_stats()
862
863     print('RX interface core 0, sw_if_index 0:\n{0}'.\
864         format(stats[1]['/if/rx'][0][0]))
865
866     Note: In this case, when PapiExecutor method 'add' is used:
867     - its parameter 'csit_papi_command' is used only to keep information
868       that vpp-stats are requested. It is not further processed but it is
869       included in the PAPI history this way:
870       vpp-stats(path=['^/if', '/err/ip4-input', '/sys/node/ip4-input'])
871       Always use csit_papi_command="vpp-stats" if the VPP PAPI method
872       is "stats".
873     - the second parameter must be 'path' as it is used by PapiExecutor
874       method 'add'.
875     - even if the parameter contains multiple paths, there is only one
876       reply item (for each .add).
877     """
878
879     def __init__(self, node):
880         """Initialization.
881
882         :param node: Node to run command(s) on.
883         :type node: dict
884         """
885         # Node to run command(s) on.
886         self._node = node
887
888         # The list of PAPI commands to be executed on the node.
889         self._api_command_list = list()
890
891         self._ssh = SSH()
892
893     def __enter__(self):
894         try:
895             self._ssh.connect(self._node)
896         except IOError:
897             raise RuntimeError(
898                 f"Cannot open SSH connection to host {self._node[u'host']} "
899                 f"to execute PAPI command(s)"
900             )
901         return self
902
903     def __exit__(self, exc_type, exc_val, exc_tb):
904         self._ssh.disconnect(self._node)
905
906     def add(self, csit_papi_command=u"vpp-stats", history=True, **kwargs):
907         """Add next command to internal command list; return self.
908
909         The argument name 'csit_papi_command' must be unique enough as it cannot
910         be repeated in kwargs.
911         The kwargs dict is deep-copied, so it is safe to use the original
912         with partial modifications for subsequent commands.
913
914         :param csit_papi_command: VPP API command.
915         :param history: Enable/disable adding command to PAPI command history.
916         :param kwargs: Optional key-value arguments.
917         :type csit_papi_command: str
918         :type history: bool
919         :type kwargs: dict
920         :returns: self, so that method chaining is possible.
921         :rtype: PapiExecutor
922         """
923         if history:
924             PapiHistory.add_to_papi_history(
925                 self._node, csit_papi_command, **kwargs
926             )
927         self._api_command_list.append(
928             dict(
929                 api_name=csit_papi_command, api_args=copy.deepcopy(kwargs)
930             )
931         )
932         return self
933
934     def get_stats(
935             self, err_msg=u"Failed to get statistics.", timeout=120,
936             socket=Constants.SOCKSTAT_PATH):
937         """Get VPP Stats from VPP Python API.
938
939         :param err_msg: The message used if the PAPI command(s) execution fails.
940         :param timeout: Timeout in seconds.
941         :param socket: Path to Stats socket to tunnel to.
942         :type err_msg: str
943         :type timeout: int
944         :type socket: str
945         :returns: Requested VPP statistics.
946         :rtype: list of dict
947         """
948         paths = [cmd[u"api_args"][u"path"] for cmd in self._api_command_list]
949         self._api_command_list = list()
950
951         stdout = self._execute_papi(
952             paths, method=u"stats", err_msg=err_msg, timeout=timeout,
953             socket=socket
954         )
955
956         return json.loads(stdout)
957
958     @staticmethod
959     def _process_api_data(api_d):
960         """Process API data for smooth converting to JSON string.
961
962         Apply binascii.hexlify() method for string values.
963
964         :param api_d: List of APIs with their arguments.
965         :type api_d: list
966         :returns: List of APIs with arguments pre-processed for JSON.
967         :rtype: list
968         """
969
970         def process_value(val):
971             """Process value.
972
973             :param val: Value to be processed.
974             :type val: object
975             :returns: Processed value.
976             :rtype: dict or str or int
977             """
978             if isinstance(val, dict):
979                 for val_k, val_v in val.items():
980                     val[str(val_k)] = process_value(val_v)
981                 retval = val
982             elif isinstance(val, list):
983                 for idx, val_l in enumerate(val):
984                     val[idx] = process_value(val_l)
985                 retval = val
986             else:
987                 retval = val.encode().hex() if isinstance(val, str) else val
988             return retval
989
990         api_data_processed = list()
991         for api in api_d:
992             api_args_processed = dict()
993             for a_k, a_v in api[u"api_args"].items():
994                 api_args_processed[str(a_k)] = process_value(a_v)
995             api_data_processed.append(
996                 dict(
997                     api_name=api[u"api_name"],
998                     api_args=api_args_processed
999                 )
1000             )
1001         return api_data_processed
1002
1003     def _execute_papi(
1004             self, api_data, method=u"request", err_msg=u"", timeout=120,
1005             socket=None):
1006         """Execute PAPI command(s) on remote node and store the result.
1007
1008         :param api_data: List of APIs with their arguments.
1009         :param method: VPP Python API method. Supported methods are: 'request',
1010             'dump' and 'stats'.
1011         :param err_msg: The message used if the PAPI command(s) execution fails.
1012         :param timeout: Timeout in seconds.
1013         :type api_data: list
1014         :type method: str
1015         :type err_msg: str
1016         :type timeout: int
1017         :returns: Stdout from remote python utility, to be parsed by caller.
1018         :rtype: str
1019         :raises SSHTimeout: If PAPI command(s) execution has timed out.
1020         :raises RuntimeError: If PAPI executor failed due to another reason.
1021         :raises AssertionError: If PAPI command(s) execution has failed.
1022         """
1023         if not api_data:
1024             raise RuntimeError(u"No API data provided.")
1025
1026         json_data = json.dumps(api_data) \
1027             if method in (u"stats", u"stats_request") \
1028             else json.dumps(self._process_api_data(api_data))
1029
1030         sock = f" --socket {socket}" if socket else u""
1031         cmd = f"{Constants.REMOTE_FW_DIR}/{Constants.RESOURCES_PAPI_PROVIDER}" \
1032             f" --method {method} --data '{json_data}'{sock}"
1033         try:
1034             ret_code, stdout, _ = self._ssh.exec_command_sudo(
1035                 cmd=cmd, timeout=timeout, log_stdout_err=False
1036             )
1037         # TODO: Fail on non-empty stderr?
1038         except SSHTimeout:
1039             logger.error(
1040                 f"PAPI command(s) execution timeout on host "
1041                 f"{self._node[u'host']}:\n{api_data}"
1042             )
1043             raise
1044         except Exception as exc:
1045             raise RuntimeError(
1046                 f"PAPI command(s) execution on host {self._node[u'host']} "
1047                 f"failed: {api_data}"
1048             ) from exc
1049         if ret_code != 0:
1050             raise AssertionError(err_msg)
1051
1052         return stdout