feat(etl): SOAK
[csit.git] / resources / libraries / python / PapiExecutor.py
1 # Copyright (c) 2024 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Python API executor library.
15
16 TODO: Document sync and async handling properly.
17 """
18
19 import copy
20 import glob
21 import json
22 import logging
23 import shutil
24 import struct  # vpp-papi can raise struct.error
25 import subprocess
26 import sys
27 import tempfile
28 import time
29 from collections import deque, UserDict
30
31 from pprint import pformat
32 from robot.api import logger
33
34 from resources.libraries.python.Constants import Constants
35 from resources.libraries.python.LocalExecution import run
36 from resources.libraries.python.FilteredLogger import FilteredLogger
37 from resources.libraries.python.PapiHistory import PapiHistory
38 from resources.libraries.python.ssh import (
39     SSH,
40     SSHTimeout,
41     exec_cmd_no_error,
42     scp_node,
43 )
44 from resources.libraries.python.topology import Topology, SocketType
45 from resources.libraries.python.VppApiCrc import VppApiCrcChecker
46
47
48 __all__ = [
49     "PapiExecutor",
50     "PapiSocketExecutor",
51     "Disconnector",
52 ]
53
54
55 def dictize(obj):
56     """A helper method, to make namedtuple-like object accessible as dict.
57
58     If the object is namedtuple-like, its _asdict() form is returned,
59     but in the returned object __getitem__ method is wrapped
60     to dictize also any items returned.
61     If the object does not have _asdict, it will be returned without any change.
62     Integer keys still access the object as tuple.
63
64     A more useful version would be to keep obj mostly as a namedtuple,
65     just add getitem for string keys. Unfortunately, namedtuple inherits
66     from tuple, including its read-only __getitem__ attribute,
67     so we cannot monkey-patch it.
68
69     TODO: Create a proxy for named tuple to allow that.
70
71     :param obj: Arbitrary object to dictize.
72     :type obj: object
73     :returns: Dictized object.
74     :rtype: same as obj type or collections.UserDict
75     """
76     if not hasattr(obj, "_asdict"):
77         return obj
78     overriden = UserDict(obj._asdict())
79     old_get = overriden.__getitem__
80     overriden.__getitem__ = lambda self, key: dictize(old_get(self, key))
81     return overriden
82
83
84 def dictize_and_check_retval(obj, err_msg):
85     """Make namedtuple-like object accessible as dict, check retval if exists.
86
87     If the object contains "retval" field, raise when the value is non-zero.
88
89     See dictize() for what it means to dictize.
90
91     :param obj: Arbitrary object to dictize.
92     :param err_msg: The (additional) text for the raised exception.
93     :type obj: object
94     :type err_msg: str
95     :returns: Dictized object.
96     :rtype: same as obj type or collections.UserDict
97     :raises AssertionError: If retval field is present with nonzero value.
98     """
99     ret = dictize(obj)
100     # *_details messages do not contain retval.
101     retval = ret.get("retval", 0)
102     if retval != 0:
103         raise AssertionError(f"{err_msg}\nRetval nonzero in object {ret!r}")
104     return ret
105
106
107 class PapiSocketExecutor:
108     """Methods for executing VPP Python API commands on forwarded socket.
109
110     The current implementation downloads and parses .api.json files only once
111     and caches client instances for reuse.
112     Cleanup metadata is added as additional attributes
113     directly to the client instances.
114
115     The current implementation caches the connected client instances.
116     As a downside, clients need to be explicitly told to disconnect
117     before VPP restart.
118
119     The current implementation seems to run into read error occasionally.
120     Not sure if the error is in Python code on Robot side, ssh forwarding,
121     or socket handling at VPP side. Anyway, reconnect after some sleep
122     seems to help, hoping repeated command execution does not lead to surprises.
123     The reconnection is logged at WARN level, so it is prominently shown
124     in log.html, so we can see how frequently it happens.
125     There are similar retries cleanups in other places
126     (so unresponsive VPPs do not break test much more than needed),
127     but it is hard to verify all that works correctly.
128     Especially, if Robot crashes, files and ssh processes may leak.
129
130     TODO: Decrease current timeout value when creating connections
131     so broken VPP does not prolong job duration too much
132     while good VPP (almost) never fails to connect.
133
134     TODO: Support handling of retval!=0 without try/except in caller.
135
136     This class processes two classes of VPP PAPI methods:
137     1. Simple request / reply: method='request'.
138     2. Dump functions: method='dump'.
139
140     Note that access to VPP stats over socket is not supported yet.
141
142     The recommended ways of use are (examples):
143
144     1. Simple request / reply. Example with no arguments:
145
146         cmd = "show_version"
147         with PapiSocketExecutor(node) as papi_exec:
148             reply = papi_exec.add(cmd).get_reply(err_msg)
149
150     2. Dump functions:
151
152         cmd = "sw_interface_rx_placement_dump"
153         with PapiSocketExecutor(node) as papi_exec:
154             papi_exec.add(cmd, sw_if_index=ifc["vpp_sw_index"])
155             details = papi_exec.get_details(err_msg)
156
157     3. Multiple requests with one reply each.
158        In this example, there are three requests with arguments,
159        the second and the third ones are the same but with different arguments.
160        This example also showcases method chaining.
161
162         with PapiSocketExecutor(node, is_async=True) as papi_exec:
163             replies = papi_exec.add(cmd1, **args1).add(cmd2, **args2).\
164                 add(cmd2, **args3).get_replies(err_msg)
165
166     The "is_async=True" part in the last example enables "async handling mode",
167     which imposes limitations but gains speed and saves memory.
168     This is different than async mode of VPP PAPI, as the default handling mode
169     also uses async PAPI connections.
170
171     The implementation contains more hidden details, such as
172     support for old VPP PAPI async mode behavior, API CRC checking
173     conditional usage of control ping, and possible susceptibility to VPP-2033.
174     See docstring of methods for more detailed info.
175     """
176
177     # Class cache for reuse between instances.
178     api_root_dir = None
179     """We copy .api json files and PAPI code from DUT to robot machine.
180     This class variable holds temporary directory once created.
181     When python exits, the directory is deleted, so no downloaded file leaks.
182     The value will be set to TemporaryDirectory class instance (not string path)
183     to ensure deletion at exit."""
184     api_json_path = None
185     """String path to .api.json files, a directory somewhere in api_root_dir."""
186     api_package_path = None
187     """String path to PAPI code, a different directory under api_root_dir."""
188     crc_checker = None
189     """Accesses .api.json files at creation, caching speeds up accessing it."""
190     reusable_vpp_client_list = list()
191     """Each connection needs a separate client instance,
192     and each client instance creation needs to parse all .api files,
193     which takes time. If a client instance disconnects, it is put here,
194     so on next connect we can reuse intead of creating new."""
195     conn_cache = dict()
196     """Mapping from node key to connected client instance."""
197
198     def __init__(
199         self, node, remote_vpp_socket=Constants.SOCKSVR_PATH, is_async=False
200     ):
201         """Store the given arguments, declare managed variables.
202
203         :param node: Node to connect to and forward unix domain socket from.
204         :param remote_vpp_socket: Path to remote socket to tunnel to.
205         :param is_async: Whether to use async handling.
206         :type node: dict
207         :type remote_vpp_socket: str
208         :type is_async: bool
209         """
210         self._node = node
211         self._remote_vpp_socket = remote_vpp_socket
212         self._is_async = is_async
213         # The list of PAPI commands to be executed on the node.
214         self._api_command_list = list()
215
216     def ensure_api_dirs(self):
217         """Copy files from DUT to local temporary directory.
218
219         If the directory is still there, do not copy again.
220         If copying, also initialize CRC checker (this also performs
221         static checks), and remember PAPI package path.
222         Do not add that to PATH yet.
223         """
224         cls = self.__class__
225         if cls.api_package_path:
226             return
227         # Pylint suggests to use "with" statement, which we cannot,
228         # do as the dir should stay for multiple ensure_vpp_instance calls.
229         cls.api_root_dir = tempfile.TemporaryDirectory(dir="/tmp")
230         root_path = cls.api_root_dir.name
231         # Pack, copy and unpack Python part of VPP installation from _node.
232         # TODO: Use rsync or recursive version of ssh.scp_node instead?
233         node = self._node
234         exec_cmd_no_error(node, ["rm", "-rf", "/tmp/papi.txz"])
235         # Papi python version depends on OS (and time).
236         # Python 3.4 or higher, site-packages or dist-packages.
237         installed_papi_glob = "/usr/lib/python3*/*-packages/vpp_papi"
238         # We need to wrap this command in bash, in order to expand globs,
239         # and as ssh does join, the inner command has to be quoted.
240         inner_cmd = " ".join(
241             [
242                 "tar",
243                 "cJf",
244                 "/tmp/papi.txz",
245                 "--exclude=*.pyc",
246                 installed_papi_glob,
247                 "/usr/share/vpp/api",
248             ]
249         )
250         exec_cmd_no_error(node, ["bash", "-c", f"'{inner_cmd}'"])
251         scp_node(node, root_path + "/papi.txz", "/tmp/papi.txz", get=True)
252         run(["tar", "xf", root_path + "/papi.txz", "-C", root_path])
253         cls.api_json_path = root_path + "/usr/share/vpp/api"
254         # Perform initial checks before .api.json files are gone,
255         # by creating the checker instance.
256         cls.crc_checker = VppApiCrcChecker(cls.api_json_path)
257         # When present locally, we finally can find the installation path.
258         cls.api_package_path = glob.glob(root_path + installed_papi_glob)[0]
259         # Package path has to be one level above the vpp_papi directory.
260         cls.api_package_path = cls.api_package_path.rsplit("/", 1)[0]
261
262     def ensure_vpp_instance(self):
263         """Create or reuse a closed client instance, return it.
264
265         The instance is initialized for unix domain socket access,
266         it has initialized all the bindings, it is removed from the internal
267         list of disconnected instances, but it is not connected
268         (to a local socket) yet.
269
270         :returns: VPP client instance ready for connect.
271         :rtype: vpp_papi.VPPApiClient
272         """
273         self.ensure_api_dirs()
274         cls = self.__class__
275         if cls.reusable_vpp_client_list:
276             # Reuse in LIFO fashion.
277             *cls.reusable_vpp_client_list, ret = cls.reusable_vpp_client_list
278             return ret
279         # Creating an instance leads to dynamic imports from VPP PAPI code,
280         # so the package directory has to be present until the instance.
281         # But it is simpler to keep the package dir around.
282         try:
283             sys.path.append(cls.api_package_path)
284             # TODO: Pylint says import-outside-toplevel and import-error.
285             # It is right, we should refactor the code and move initialization
286             # of package outside.
287             from vpp_papi.vpp_papi import VPPApiClient as vpp_class
288             try:
289                 # The old way. Deduplicate when pre-2402 support is not needed.
290
291                 vpp_class.apidir = cls.api_json_path
292                 # We need to create instance before removing from sys.path.
293                 # Cannot use loglevel parameter, robot.api.logger lacks the support.
294                 vpp_instance = vpp_class(
295                     use_socket=True,
296                     server_address="TBD",
297                     async_thread=False,
298                     # Large read timeout was originally there for VPP-1722,
299                     # it may still be helping against AVF device creation failures.
300                     read_timeout=14,
301                     logger=FilteredLogger(logger, "INFO"),
302                 )
303             except vpp_class.VPPRuntimeError:
304                 # The 39871 way.
305
306                 # We need to create instance before removing from sys.path.
307                 # Cannot use loglevel parameter, robot.api.logger lacks the support.
308                 vpp_instance = vpp_class(
309                     apidir=cls.api_json_path,
310                     use_socket=True,
311                     server_address="TBD",
312                     async_thread=False,
313                     # Large read timeout was originally there for VPP-1722,
314                     # it may still be helping against AVF device creation failures.
315                     read_timeout=14,
316                     logger=FilteredLogger(logger, "INFO"),
317                 )
318             # The following is needed to prevent union (e.g. Ip4) debug logging
319             # of VPP part of PAPI from spamming robot logs.
320             logging.getLogger("vpp_papi.serializer").setLevel(logging.INFO)
321         finally:
322             if sys.path[-1] == cls.api_package_path:
323                 sys.path.pop()
324         return vpp_instance
325
326     @classmethod
327     def key_for_node_and_socket(cls, node, remote_socket):
328         """Return a hashable object to distinguish nodes.
329
330         The usual node object (of "dict" type) is not hashable,
331         and can contain mutable information (mostly virtual interfaces).
332         Use this method to get an object suitable for being a key in dict.
333
334         The fields to include are chosen by what ssh needs.
335
336         This class method is needed, for disconnect.
337
338         :param node: The node object to distinguish.
339         :param remote_socket: Path to remote socket.
340         :type node: dict
341         :type remote_socket: str
342         :return: Tuple of values distinguishing this node from similar ones.
343         :rtype: tuple of str
344         """
345         return (
346             node["host"],
347             node["port"],
348             remote_socket,
349             # TODO: Do we support sockets paths such as "~/vpp/api.socket"?
350             # If yes, add also:
351             # node[u"username"],
352         )
353
354     def key_for_self(self):
355         """Return a hashable object to distinguish nodes.
356
357         Just a wrapper around key_for_node_and_socket
358         which sets up proper arguments.
359
360         :return: Tuple of values distinguishing this node from similar ones.
361         :rtype: tuple of str
362         """
363         return self.__class__.key_for_node_and_socket(
364             self._node,
365             self._remote_vpp_socket,
366         )
367
368     def set_connected_client(self, client):
369         """Add a connected client instance into cache.
370
371         This hides details of what the node key is.
372
373         If there already is a client for the computed key,
374         fail, as it is a sign of resource leakage.
375
376         :param client: VPP client instance in connected state.
377         :type client: vpp_papi.VPPApiClient
378         :raises RuntimeError: If related key already has a cached client.
379         """
380         key = self.key_for_self()
381         cache = self.__class__.conn_cache
382         if key in cache:
383             raise RuntimeError(f"Caching client with existing key: {key}")
384         cache[key] = client
385
386     def get_connected_client(self, check_connected=True):
387         """Return None or cached connected client.
388
389         If check_connected, RuntimeError is raised when the client is
390         not in cache. None is returned if client is not in cache
391         (and the check is disabled).
392         Successful retrieval from cache is logged only when check_connected.
393
394         This hides details of what the node key is.
395
396         :param check_connected: Whether cache miss raises (and success logs).
397         :type check_connected: bool
398         :returns: Connected client instance, or None if uncached and no check.
399         :rtype: Optional[vpp_papi.VPPApiClient]
400         :raises RuntimeError: If cache miss and check enabled.
401         """
402         key = self.key_for_self()
403         ret = self.__class__.conn_cache.get(key, None)
404         if check_connected:
405             if ret is None:
406                 raise RuntimeError(f"Client not cached for key: {key}")
407             # When reading logs, it is good to see which VPP is accessed.
408             logger.debug(f"Activated cached PAPI client for key: {key}")
409         return ret
410
411     def __enter__(self):
412         """Create a tunnel, connect VPP instance.
413
414         If the connected client is in cache, return it.
415         Only if not, create a new (or reuse a disconnected) client instance.
416
417         Only at this point a local socket names are created
418         in a temporary directory, as CSIT can connect to multiple VPPs.
419
420         The following attributes are added to the client instance
421         to simplify caching and cleanup:
422         csit_temp_dir
423             - Temporary socket files are created here.
424         csit_control_socket
425             - This socket controls the local ssh process doing the forwarding.
426         csit_local_vpp_socket
427             - This is the forwarded socket to talk with remote VPP.
428         csit_deque
429             - Queue for responses.
430
431         The attribute names do not start with underscore,
432         so pylint does not complain about accessing private attribute.
433         The attribute names start with csit_ to avoid naming conflicts
434         with "real" attributes from VPP Python code.
435
436         :returns: self
437         :rtype: PapiSocketExecutor
438         """
439         # Do we have the connected instance in the cache?
440         vpp_instance = self.get_connected_client(check_connected=False)
441         if vpp_instance is not None:
442             return self
443         # No luck, create and connect a new instance.
444         time_enter = time.monotonic()
445         node = self._node
446         # Parsing takes longer than connecting, prepare instance before tunnel.
447         vpp_instance = self.ensure_vpp_instance()
448         # Store into cache as soon as possible.
449         # If connection fails, it is better to attempt disconnect anyway.
450         self.set_connected_client(vpp_instance)
451         # Set additional attributes.
452         vpp_instance.csit_temp_dir = tempfile.TemporaryDirectory(dir="/tmp")
453         temp_path = vpp_instance.csit_temp_dir.name
454         api_socket = temp_path + "/vpp-api.sock"
455         vpp_instance.csit_local_vpp_socket = api_socket
456         ssh_socket = temp_path + "/ssh.sock"
457         vpp_instance.csit_control_socket = ssh_socket
458         # Cleanup possibilities.
459         ret_code, _ = run(["ls", ssh_socket], check=False)
460         if ret_code != 2:
461             # This branch never seems to be hit in CI,
462             # but may be useful when testing manually.
463             run(
464                 ["ssh", "-S", ssh_socket, "-O", "exit", "0.0.0.0"],
465                 check=False,
466                 log=True,
467             )
468             # TODO: Is any sleep necessary? How to prove if not?
469             run(["sleep", "0.1"])
470             run(["rm", "-vrf", ssh_socket])
471         # Even if ssh can perhaps reuse this file,
472         # we need to remove it for readiness detection to work correctly.
473         run(["rm", "-rvf", api_socket])
474         # We use sleep command. The ssh command will exit in 30 second,
475         # unless a local socket connection is established,
476         # in which case the ssh command will exit only when
477         # the ssh connection is closed again (via control socket).
478         # The log level is to suppress "Warning: Permanently added" messages.
479         ssh_cmd = [
480             "ssh",
481             "-S",
482             ssh_socket,
483             "-M",
484             "-L",
485             f"{api_socket}:{self._remote_vpp_socket}",
486             "-p",
487             str(node["port"]),
488             "-o",
489             "LogLevel=ERROR",
490             "-o",
491             "UserKnownHostsFile=/dev/null",
492             "-o",
493             "StrictHostKeyChecking=no",
494             "-o",
495             "ExitOnForwardFailure=yes",
496             f"{node['username']}@{node['host']}",
497             "sleep",
498             "30",
499         ]
500         priv_key = node.get("priv_key")
501         if priv_key:
502             # This is tricky. We need a file to pass the value to ssh command.
503             # And we need ssh command, because paramiko does not support sockets
504             # (neither ssh_socket, nor _remote_vpp_socket).
505             key_file = tempfile.NamedTemporaryFile()
506             key_file.write(priv_key)
507             # Make sure the content is written, but do not close yet.
508             key_file.flush()
509             ssh_cmd[1:1] = ["-i", key_file.name]
510         password = node.get("password")
511         if password:
512             # Prepend sshpass command to set password.
513             ssh_cmd[:0] = ["sshpass", "-p", password]
514         time_stop = time.monotonic() + 10.0
515         # subprocess.Popen seems to be the best way to run commands
516         # on background. Other ways (shell=True with "&" and ssh with -f)
517         # seem to be too dependent on shell behavior.
518         # In particular, -f does NOT return values for run().
519         subprocess.Popen(ssh_cmd)
520         # Check socket presence on local side.
521         while time.monotonic() < time_stop:
522             # It can take a moment for ssh to create the socket file.
523             ret_code, _ = run(["ls", "-l", api_socket], check=False)
524             if not ret_code:
525                 break
526             time.sleep(0.01)
527         else:
528             raise RuntimeError("Local side socket has not appeared.")
529         if priv_key:
530             # Socket up means the key has been read. Delete file by closing it.
531             key_file.close()
532         # Everything is ready, set the local socket address and connect.
533         vpp_instance.transport.server_address = api_socket
534         # It seems we can get read error even if every preceding check passed.
535         # Single retry seems to help. TODO: Confirm this is still needed.
536         for _ in range(2):
537             try:
538                 vpp_instance.connect("csit_socket", do_async=True)
539             except (IOError, struct.error) as err:
540                 logger.warn(f"Got initial connect error {err!r}")
541                 vpp_instance.disconnect()
542             else:
543                 break
544         else:
545             raise RuntimeError("Failed to connect to VPP over a socket.")
546         # Only after rls2302 all relevant VPP builds should have do_async.
547         if hasattr(vpp_instance.transport, "do_async"):
548             deq = deque()
549             vpp_instance.csit_deque = deq
550             vpp_instance.register_event_callback(lambda x, y: deq.append(y))
551         else:
552             vpp_instance.csit_deque = None
553         duration_conn = time.monotonic() - time_enter
554         logger.trace(f"Establishing socket connection took {duration_conn}s.")
555         return self
556
557     def __exit__(self, exc_type, exc_val, exc_tb):
558         """No-op, the client instance remains in cache in connected state."""
559
560     @classmethod
561     def disconnect_by_key(cls, key):
562         """Disconnect a connected client instance, noop it not connected.
563
564         Also remove the local sockets by deleting the temporary directory.
565         Put disconnected client instances to the reuse list.
566         The added attributes are not cleaned up,
567         as their values will get overwritten on next connect.
568
569         This method is useful for disconnect_all type of work.
570
571         :param key: Tuple identifying the node (and socket).
572         :type key: tuple of str
573         """
574         client_instance = cls.conn_cache.get(key, None)
575         if client_instance is None:
576             return
577         logger.debug(f"Disconnecting by key: {key}")
578         client_instance.disconnect()
579         run(
580             [
581                 "ssh",
582                 "-S",
583                 client_instance.csit_control_socket,
584                 "-O",
585                 "exit",
586                 "0.0.0.0",
587             ],
588             check=False,
589         )
590         # Temp dir has autoclean, but deleting explicitly
591         # as an error can happen.
592         try:
593             client_instance.csit_temp_dir.cleanup()
594         except FileNotFoundError:
595             # There is a race condition with ssh removing its ssh.sock file.
596             # Single retry should be enough to ensure the complete removal.
597             shutil.rmtree(client_instance.csit_temp_dir.name)
598         # Finally, put disconnected clients to reuse list.
599         cls.reusable_vpp_client_list.append(client_instance)
600         # Invalidate cache last. Repeated errors are better than silent leaks.
601         del cls.conn_cache[key]
602
603     @classmethod
604     def disconnect_by_node_and_socket(
605         cls, node, remote_socket=Constants.SOCKSVR_PATH
606     ):
607         """Disconnect a connected client instance, noop it not connected.
608
609         Also remove the local sockets by deleting the temporary directory.
610         Put disconnected client instances to the reuse list.
611         The added attributes are not cleaned up,
612         as their values will get overwritten on next connect.
613
614         Call this method just before killing/restarting remote VPP instance.
615         """
616         key = cls.key_for_node_and_socket(node, remote_socket)
617         return cls.disconnect_by_key(key)
618
619     @classmethod
620     def disconnect_all_sockets_by_node(cls, node):
621         """Disconnect all socket connected client instance.
622
623         Noop if not connected.
624
625         Also remove the local sockets by deleting the temporary directory.
626         Put disconnected client instances to the reuse list.
627         The added attributes are not cleaned up,
628         as their values will get overwritten on next connect.
629
630         Call this method just before killing/restarting remote VPP instance.
631         """
632         sockets = Topology.get_node_sockets(node, socket_type=SocketType.PAPI)
633         if sockets:
634             for socket in sockets.values():
635                 # TODO: Remove sockets from topology.
636                 PapiSocketExecutor.disconnect_by_node_and_socket(node, socket)
637         # Always attempt to disconnect the default socket.
638         return cls.disconnect_by_node_and_socket(node)
639
640     @staticmethod
641     def disconnect_all_papi_connections():
642         """Disconnect all connected client instances, tear down the SSH tunnels.
643
644         Also remove the local sockets by deleting the temporary directory.
645         Put disconnected client instances to the reuse list.
646         The added attributes are not cleaned up,
647         as their values will get overwritten on next connect.
648
649         This should be a class method,
650         but we prefer to call static methods from Robot.
651
652         Call this method just before killing/restarting all VPP instances.
653         """
654         cls = PapiSocketExecutor
655         # Iterate over copy of entries so deletions do not mess with iterator.
656         keys_copy = list(cls.conn_cache.keys())
657         for key in keys_copy:
658             cls.disconnect_by_key(key)
659
660     def add(self, csit_papi_command, history=True, **kwargs):
661         """Add next command to internal command list; return self.
662
663         Unless disabled, new entry to papi history is also added at this point.
664         The kwargs dict is serialized or deep-copied, so it is safe to use
665         the original with partial modifications for subsequent calls.
666
667         Any pending conflicts from .api.json processing are raised.
668         Then the command name is checked for known CRCs.
669         Unsupported commands raise an exception, as CSIT change
670         should not start using messages without making sure which CRCs
671         are supported.
672         Each CRC issue is raised only once, so subsequent tests
673         can raise other issues.
674
675         With async handling mode, this method also serializes and sends
676         the command, skips CRC check to gain speed, and saves memory
677         by putting a sentinel (instead of deepcopy) to api command list.
678
679         For scale tests, the call sites are responsible to set history values
680         in a way that hints what is done without overwhelming the papi history.
681
682         Note to contributors: Do not rename "csit_papi_command"
683         to anything VPP could possibly use as an API field name.
684
685         :param csit_papi_command: VPP API command.
686         :param history: Enable/disable adding command to PAPI command history.
687         :param kwargs: Optional key-value arguments.
688         :type csit_papi_command: str
689         :type history: bool
690         :type kwargs: dict
691         :returns: self, so that method chaining is possible.
692         :rtype: PapiSocketExecutor
693         :raises RuntimeError: If unverified or conflicting CRC is encountered.
694         """
695         self.crc_checker.report_initial_conflicts()
696         if history:
697             # No need for deepcopy yet, serialization isolates from edits.
698             PapiHistory.add_to_papi_history(
699                 self._node, csit_papi_command, **kwargs
700             )
701         self.crc_checker.check_api_name(csit_papi_command)
702         if self._is_async:
703             # Save memory but still count the number of expected replies.
704             self._api_command_list.append(0)
705             api_object = self.get_connected_client(check_connected=False).api
706             func = getattr(api_object, csit_papi_command)
707             # No need for deepcopy yet, serialization isolates from edits.
708             func(**kwargs)
709         else:
710             # No serialization, so deepcopy is needed here.
711             self._api_command_list.append(
712                 dict(api_name=csit_papi_command, api_args=copy.deepcopy(kwargs))
713             )
714         return self
715
716     def get_replies(self, err_msg="Failed to get replies."):
717         """Get reply for each command from VPP Python API.
718
719         This method expects one reply per command,
720         and gains performance by reading replies only after
721         sending all commands.
722
723         The replies are parsed into dict-like objects,
724         "retval" field (if present) is guaranteed to be zero on success.
725
726         Do not use this for messages with variable number of replies,
727         use get_details instead.
728         Do not use for commands trigering VPP-2033,
729         use series of get_reply instead.
730
731         :param err_msg: The message used if the PAPI command(s) execution fails.
732         :type err_msg: str
733         :returns: Responses, dict objects with fields due to API and "retval".
734         :rtype: list of dict
735         :raises RuntimeError: If retval is nonzero, parsing or ssh error.
736         """
737         if not self._is_async:
738             raise RuntimeError("Sync handling does not suport get_replies.")
739         return self._execute(err_msg=err_msg, do_async=True)
740
741     def get_reply(self, err_msg="Failed to get reply."):
742         """Get reply to single command from VPP Python API.
743
744         This method waits for a single reply (no control ping),
745         thus avoiding bugs like VPP-2033.
746
747         The reply is parsed into a dict-like object,
748         "retval" field (if present) is guaranteed to be zero on success.
749
750         :param err_msg: The message used if the PAPI command(s) execution fails.
751         :type err_msg: str
752         :returns: Response, dict object with fields due to API and "retval".
753         :rtype: dict
754         :raises AssertionError: If retval is nonzero, parsing or ssh error.
755         """
756         if self._is_async:
757             raise RuntimeError("Async handling does not suport get_reply.")
758         replies = self._execute(err_msg=err_msg, do_async=False)
759         if len(replies) != 1:
760             raise RuntimeError(f"Expected single reply, got {replies!r}")
761         return replies[0]
762
763     def get_sw_if_index(self, err_msg="Failed to get reply."):
764         """Get sw_if_index from reply from VPP Python API.
765
766         Frequently, the caller is only interested in sw_if_index field
767         of the reply, this wrapper around get_reply (thus safe against VPP-2033)
768         makes such call sites shorter.
769
770         :param err_msg: The message used if the PAPI command(s) execution fails.
771         :type err_msg: str
772         :returns: Response, sw_if_index value of the reply.
773         :rtype: int
774         :raises AssertionError: If retval is nonzero, parsing or ssh error.
775         """
776         if self._is_async:
777             raise RuntimeError("Async handling does not suport get_sw_if_index")
778         reply = self.get_reply(err_msg=err_msg)
779         return reply["sw_if_index"]
780
781     def get_details(self, err_msg="Failed to get dump details."):
782         """Get details (for possibly multiple dumps) from VPP Python API.
783
784         The details are parsed into dict-like objects.
785         The number of details per single dump command can vary,
786         and all association between details and dumps is lost,
787         so if you care about the association (as opposed to
788         logging everything at once for debugging purposes),
789         it is recommended to call get_details for each dump (type) separately.
790
791         This method uses control ping to detect end of replies,
792         so it is not suitable for commands which trigger VPP-2033
793         (but arguably no dump currently triggers it).
794
795         :param err_msg: The message used if the PAPI command(s) execution fails.
796         :type err_msg: str
797         :returns: Details, dict objects with fields due to API without "retval".
798         :rtype: list of dict
799         """
800         if self._is_async:
801             raise RuntimeError("Async handling does not suport get_details.")
802         return self._execute(err_msg, do_async=False, single_reply=False)
803
804     @staticmethod
805     def run_cli_cmd(
806         node, cli_cmd, log=True, remote_vpp_socket=Constants.SOCKSVR_PATH
807     ):
808         """Run a CLI command as cli_inband, return the "reply" field of reply.
809
810         Optionally, log the field value.
811         This is a convenience wrapper around get_reply.
812
813         :param node: Node to run command on.
814         :param cli_cmd: The CLI command to be run on the node.
815         :param remote_vpp_socket: Path to remote socket to tunnel to.
816         :param log: If True, the response is logged.
817         :type node: dict
818         :type remote_vpp_socket: str
819         :type cli_cmd: str
820         :type log: bool
821         :returns: CLI output.
822         :rtype: str
823         """
824         cmd = "cli_inband"
825         args = dict(cmd=cli_cmd)
826         err_msg = (
827             f"Failed to run 'cli_inband {cli_cmd}' PAPI command"
828             f" on host {node['host']}"
829         )
830
831         with PapiSocketExecutor(node, remote_vpp_socket) as papi_exec:
832             reply = papi_exec.add(cmd, **args).get_reply(err_msg)["reply"]
833         if log:
834             logger.info(
835                 f"{cli_cmd} ({node['host']} - {remote_vpp_socket}):\n"
836                 f"{reply.strip()}"
837             )
838         return reply
839
840     @staticmethod
841     def run_cli_cmd_on_all_sockets(node, cli_cmd, log=True):
842         """Run a CLI command as cli_inband, on all sockets in topology file.
843
844         Just a run_cli_cmd, looping over sockets.
845
846         :param node: Node to run command on.
847         :param cli_cmd: The CLI command to be run on the node.
848         :param log: If True, the response is logged.
849         :type node: dict
850         :type cli_cmd: str
851         :type log: bool
852         """
853         sockets = Topology.get_node_sockets(node, socket_type=SocketType.PAPI)
854         if sockets:
855             for socket in sockets.values():
856                 PapiSocketExecutor.run_cli_cmd(
857                     node, cli_cmd, log=log, remote_vpp_socket=socket
858                 )
859
860     @staticmethod
861     def dump_and_log(node, cmds):
862         """Dump and log requested information, return None.
863
864         Just a get_details (with logging), looping over commands.
865
866         :param node: DUT node.
867         :param cmds: Dump commands to be executed.
868         :type node: dict
869         :type cmds: list of str
870         """
871         with PapiSocketExecutor(node) as papi_exec:
872             for cmd in cmds:
873                 dump = papi_exec.add(cmd).get_details()
874                 logger.debug(f"{cmd}:\n{pformat(dump)}")
875
876     @staticmethod
877     def _read_internal(vpp_instance, timeout=None):
878         """Blockingly read within timeout.
879
880         This covers behaviors both before and after 37758.
881         One read attempt is guaranteed even with zero timeout.
882
883         TODO: Simplify after 2302 RCA is done.
884
885         :param vpp_instance: Client instance to read from.
886         :param timeout: How long to wait for reply (or transport default).
887         :type vpp_instance: vpp_papi.VPPApiClient
888         :type timeout: Optional[float]
889         :returns: Message read or None if nothing got read.
890         :rtype: Optional[namedtuple]
891         """
892         timeout = vpp_instance.read_timeout if timeout is None else timeout
893         if vpp_instance.csit_deque is None:
894             return vpp_instance.read_blocking(timeout=timeout)
895         time_stop = time.monotonic() + timeout
896         while 1:
897             try:
898                 return vpp_instance.csit_deque.popleft()
899             except IndexError:
900                 # We could busy-wait but that seems to starve the reader thread.
901                 time.sleep(0.01)
902             if time.monotonic() > time_stop:
903                 return None
904
905     @staticmethod
906     def _read(vpp_instance, tries=3):
907         """Blockingly read within timeout, retry on early None.
908
909         For (sometimes) unknown reasons, VPP client in async mode likes
910         to return None occasionally before time runs out.
911         This function retries in that case.
912
913         Most of the time, early None means VPP crashed (see VPP-2033),
914         but is is better to give VPP more chances to respond without failure.
915
916         TODO: Perhaps CSIT now never triggers VPP-2033,
917         so investigate and remove this layer if even more speed is needed.
918
919         :param vpp_instance: Client instance to read from.
920         :param tries: Maximum number of tries to attempt.
921         :type vpp_instance: vpp_papi.VPPApiClient
922         :type tries: int
923         :returns: Message read or None if nothing got read even with retries.
924         :rtype: Optional[namedtuple]
925         """
926         timeout = vpp_instance.read_timeout
927         for _ in range(tries):
928             time_stop = time.monotonic() + 0.9 * timeout
929             reply = PapiSocketExecutor._read_internal(vpp_instance)
930             if reply is None and time.monotonic() < time_stop:
931                 logger.trace("Early None. Retry?")
932                 continue
933             return reply
934         logger.trace(f"Got {tries} early Nones, probably a real None.")
935         return None
936
937     @staticmethod
938     def _drain(vpp_instance, err_msg, timeout=30.0):
939         """Keep reading with until None or timeout.
940
941         This is needed to mitigate the risk of a state with unread responses
942         (e.g. after non-zero retval in the middle of get_replies)
943         causing failures in everything subsequent (until disconnect).
944
945         The reads are done without any waiting.
946
947         It is possible some responses have not arrived yet,
948         but that is unlikely as Python is usually slower than VPP.
949
950         :param vpp_instance: Client instance to read from.
951         :param err_msg: Error message to use when overstepping timeout.
952         :param timeout: How long to try before giving up.
953         :type vpp_instance: vpp_papi.VPPApiClient
954         :type err_msg: str
955         :type timeout: float
956         :raises RuntimeError: If read keeps returning nonzero after timeout.
957         """
958         time_stop = time.monotonic() + timeout
959         while time.monotonic() < time_stop:
960             if PapiSocketExecutor._read_internal(vpp_instance, 0.0) is None:
961                 return
962         raise RuntimeError(f"{err_msg}\nTimed out while draining.")
963
964     def _execute(self, err_msg, do_async, single_reply=True):
965         """Turn internal command list into data and execute; return replies.
966
967         This method also clears the internal command list.
968
969         :param err_msg: The message used if the PAPI command(s) execution fails.
970         :param do_async: If true, assume one reply per command and do not wait
971             for each reply before sending next request.
972             Dump commands (and calls causing VPP-2033) need False.
973         :param single_reply: For sync emulation mode (cannot be False
974             if do_async is True). When false use control ping.
975             When true, wait for a single reply.
976         :type err_msg: str
977         :type do_async: bool
978         :type single_reply: bool
979         :returns: Papi replies parsed into a dict-like object,
980             with fields due to API (possibly including retval).
981         :rtype: NoneType or list of dict
982         :raises RuntimeError: If the replies are not all correct.
983         """
984         local_list = self._api_command_list
985         # Clear first as execution may fail.
986         self._api_command_list = list()
987         if do_async:
988             if not single_reply:
989                 raise RuntimeError("Async papi needs one reply per request.")
990             return self._execute_async(local_list, err_msg=err_msg)
991         return self._execute_sync(
992             local_list, err_msg=err_msg, single_reply=single_reply
993         )
994
995     def _execute_sync(self, local_list, err_msg, single_reply):
996         """Execute commands waiting for replies one by one; return replies.
997
998         This implementation either expects a single response per request,
999         or uses control ping to emulate sync PAPI calls.
1000         Reliable, but slow. Required for dumps. Needed for calls
1001         which trigger VPP-2033.
1002
1003         CRC checking is done for the replies (requests are checked in .add).
1004
1005         :param local_list: The list of PAPI commands to be executed on the node.
1006         :param err_msg: The message used if the PAPI command(s) execution fails.
1007         :param single_reply: When false use control ping.
1008             When true, wait for a single reply.
1009         :type local_list: list of dict
1010         :type err_msg: str
1011         :type single_reply: bool
1012         :returns: Papi replies parsed into a dict-like object,
1013             with fields due to API (possibly including retval).
1014         :rtype: List[UserDict]
1015         :raises AttributeError: If VPP does not know the command.
1016         :raises RuntimeError: If the replies are not all correct.
1017         """
1018         vpp_instance = self.get_connected_client()
1019         control_ping_fn = getattr(vpp_instance.api, "control_ping")
1020         ret_list = list()
1021         for command in local_list:
1022             api_name = command["api_name"]
1023             papi_fn = getattr(vpp_instance.api, api_name)
1024             replies = list()
1025             try:
1026                 # Send the command maybe followed by control ping.
1027                 main_context = papi_fn(**command["api_args"])
1028                 if single_reply:
1029                     replies.append(PapiSocketExecutor._read(vpp_instance))
1030                 else:
1031                     ping_context = control_ping_fn()
1032                     # Receive the replies.
1033                     while 1:
1034                         reply = PapiSocketExecutor._read(vpp_instance)
1035                         if reply is None:
1036                             raise RuntimeError(
1037                                 f"{err_msg}\nSync PAPI timed out."
1038                             )
1039                         if reply.context == ping_context:
1040                             break
1041                         if reply.context != main_context:
1042                             raise RuntimeError(
1043                                 f"{err_msg}\nUnexpected context: {reply!r}"
1044                             )
1045                         replies.append(reply)
1046             except (AttributeError, IOError, struct.error) as err:
1047                 # TODO: Add retry if it is still needed.
1048                 raise AssertionError(f"{err_msg}") from err
1049             finally:
1050                 # Discard any unprocessed replies to avoid secondary failures.
1051                 PapiSocketExecutor._drain(vpp_instance, err_msg)
1052             # Process replies for this command.
1053             for reply in replies:
1054                 self.crc_checker.check_api_name(reply.__class__.__name__)
1055                 dictized_reply = dictize_and_check_retval(reply, err_msg)
1056                 ret_list.append(dictized_reply)
1057         return ret_list
1058
1059     def _execute_async(self, local_list, err_msg):
1060         """Read, process and return replies.
1061
1062         The messages were already sent by .add() in this mode,
1063         local_list is used just so we know how many replies to read.
1064
1065         Beware: It is not clear what to do when socket read fails
1066         in the middle of async processing.
1067
1068         The implementation assumes each command results in exactly one reply,
1069         there is no reordering in either commands nor replies,
1070         and context numbers increase one by one (and are matching for replies).
1071
1072         To speed processing up, reply CRC values are not checked.
1073
1074         The current implementation does not limit the number of messages
1075         in-flight, we rely on VPP PAPI background thread to move replies
1076         from socket to queue fast enough.
1077
1078         :param local_list: The list of PAPI commands to get replies for.
1079         :param err_msg: The message used if the PAPI command(s) execution fails.
1080         :type local_list: list
1081         :type err_msg: str
1082         :returns: Papi replies parsed into a dict-like object, with fields
1083             according to API (possibly including retval).
1084         :rtype: List[UserDict]
1085         :raises RuntimeError: If the replies are not all correct.
1086         """
1087         vpp_instance = self.get_connected_client()
1088         ret_list = list()
1089         try:
1090             for index, _ in enumerate(local_list):
1091                 # Blocks up to timeout.
1092                 reply = PapiSocketExecutor._read(vpp_instance)
1093                 if reply is None:
1094                     time_msg = f"PAPI async timeout: idx {index}"
1095                     raise RuntimeError(f"{err_msg}\n{time_msg}")
1096                 ret_list.append(dictize_and_check_retval(reply, err_msg))
1097         finally:
1098             # Discard any unprocessed replies to avoid secondary failures.
1099             PapiSocketExecutor._drain(vpp_instance, err_msg)
1100         return ret_list
1101
1102
1103 class Disconnector:
1104     """Class for holding a single keyword."""
1105
1106     @staticmethod
1107     def disconnect_all_papi_connections():
1108         """Disconnect all connected client instances, tear down the SSH tunnels.
1109
1110         Also remove the local sockets by deleting the temporary directory.
1111         Put disconnected client instances to the reuse list.
1112         The added attributes are not cleaned up,
1113         as their values will get overwritten on next connect.
1114
1115         Call this method just before killing/restarting all VPP instances.
1116
1117         This could be a class method of PapiSocketExecutor.
1118         But Robot calls methods on instances, and it would be weird
1119         to give node argument for constructor in import.
1120         Also, as we have a class of the same name as the module,
1121         the keywords defined on module level are not accessible.
1122         """
1123         cls = PapiSocketExecutor
1124         # Iterate over copy of entries so deletions do not mess with iterator.
1125         for key in list(cls.conn_cache.keys()):
1126             cls.disconnect_by_key(key)
1127
1128
1129 class PapiExecutor:
1130     """Contains methods for executing VPP Python API commands on DUTs.
1131
1132     TODO: Remove .add step, make get_stats accept paths directly.
1133
1134     This class processes only one type of VPP PAPI methods: vpp-stats.
1135
1136     The recommended ways of use are (examples):
1137
1138     path = ['^/if', '/err/ip4-input', '/sys/node/ip4-input']
1139     with PapiExecutor(node) as papi_exec:
1140         stats = papi_exec.add(api_name='vpp-stats', path=path).get_stats()
1141
1142     print('RX interface core 0, sw_if_index 0:\n{0}'.\
1143         format(stats[0]['/if/rx'][0][0]))
1144
1145     or
1146
1147     path_1 = ['^/if', ]
1148     path_2 = ['^/if', '/err/ip4-input', '/sys/node/ip4-input']
1149     with PapiExecutor(node) as papi_exec:
1150         stats = papi_exec.add('vpp-stats', path=path_1).\
1151             add('vpp-stats', path=path_2).get_stats()
1152
1153     print('RX interface core 0, sw_if_index 0:\n{0}'.\
1154         format(stats[1]['/if/rx'][0][0]))
1155
1156     Note: In this case, when PapiExecutor method 'add' is used:
1157     - its parameter 'csit_papi_command' is used only to keep information
1158       that vpp-stats are requested. It is not further processed but it is
1159       included in the PAPI history this way:
1160       vpp-stats(path=['^/if', '/err/ip4-input', '/sys/node/ip4-input'])
1161       Always use csit_papi_command="vpp-stats" if the VPP PAPI method
1162       is "stats".
1163     - the second parameter must be 'path' as it is used by PapiExecutor
1164       method 'add'.
1165     - even if the parameter contains multiple paths, there is only one
1166       reply item (for each .add).
1167     """
1168
1169     def __init__(self, node):
1170         """Initialization.
1171
1172         :param node: Node to run command(s) on.
1173         :type node: dict
1174         """
1175         # Node to run command(s) on.
1176         self._node = node
1177
1178         # The list of PAPI commands to be executed on the node.
1179         self._api_command_list = list()
1180
1181         self._ssh = SSH()
1182
1183     def __enter__(self):
1184         try:
1185             self._ssh.connect(self._node)
1186         except IOError as err:
1187             msg = f"PAPI: Cannot open SSH connection to {self._node['host']}"
1188             raise RuntimeError(msg) from err
1189         return self
1190
1191     def __exit__(self, exc_type, exc_val, exc_tb):
1192         self._ssh.disconnect(self._node)
1193
1194     def add(self, csit_papi_command="vpp-stats", history=True, **kwargs):
1195         """Add next command to internal command list; return self.
1196
1197         The argument name 'csit_papi_command' must be unique enough as it cannot
1198         be repeated in kwargs.
1199         The kwargs dict is deep-copied, so it is safe to use the original
1200         with partial modifications for subsequent commands.
1201
1202         :param csit_papi_command: VPP API command.
1203         :param history: Enable/disable adding command to PAPI command history.
1204         :param kwargs: Optional key-value arguments.
1205         :type csit_papi_command: str
1206         :type history: bool
1207         :type kwargs: dict
1208         :returns: self, so that method chaining is possible.
1209         :rtype: PapiExecutor
1210         """
1211         if history:
1212             PapiHistory.add_to_papi_history(
1213                 self._node, csit_papi_command, **kwargs
1214             )
1215         self._api_command_list.append(
1216             dict(api_name=csit_papi_command, api_args=copy.deepcopy(kwargs))
1217         )
1218         return self
1219
1220     def get_stats(
1221         self,
1222         err_msg="Failed to get statistics.",
1223         timeout=120,
1224         socket=Constants.SOCKSTAT_PATH,
1225     ):
1226         """Get VPP Stats from VPP Python API.
1227
1228         :param err_msg: The message used if the PAPI command(s) execution fails.
1229         :param timeout: Timeout in seconds.
1230         :param socket: Path to Stats socket to tunnel to.
1231         :type err_msg: str
1232         :type timeout: int
1233         :type socket: str
1234         :returns: Requested VPP statistics.
1235         :rtype: list of dict
1236         """
1237         paths = [cmd["api_args"]["path"] for cmd in self._api_command_list]
1238         self._api_command_list = list()
1239
1240         stdout = self._execute_papi(
1241             paths,
1242             method="stats",
1243             err_msg=err_msg,
1244             timeout=timeout,
1245             socket=socket,
1246         )
1247
1248         return json.loads(stdout)
1249
1250     @staticmethod
1251     def _process_api_data(api_d):
1252         """Process API data for smooth converting to JSON string.
1253
1254         Apply binascii.hexlify() method for string values.
1255
1256         :param api_d: List of APIs with their arguments.
1257         :type api_d: list
1258         :returns: List of APIs with arguments pre-processed for JSON.
1259         :rtype: list
1260         """
1261
1262         def process_value(val):
1263             """Process value.
1264
1265             :param val: Value to be processed.
1266             :type val: object
1267             :returns: Processed value.
1268             :rtype: dict or str or int
1269             """
1270             if isinstance(val, dict):
1271                 for val_k, val_v in val.items():
1272                     val[str(val_k)] = process_value(val_v)
1273                 retval = val
1274             elif isinstance(val, list):
1275                 for idx, val_l in enumerate(val):
1276                     val[idx] = process_value(val_l)
1277                 retval = val
1278             else:
1279                 retval = val.encode().hex() if isinstance(val, str) else val
1280             return retval
1281
1282         api_data_processed = list()
1283         for api in api_d:
1284             api_args_processed = dict()
1285             for a_k, a_v in api["api_args"].items():
1286                 api_args_processed[str(a_k)] = process_value(a_v)
1287             api_data_processed.append(
1288                 dict(api_name=api["api_name"], api_args=api_args_processed)
1289             )
1290         return api_data_processed
1291
1292     def _execute_papi(
1293         self, api_data, method="request", err_msg="", timeout=120, socket=None
1294     ):
1295         """Execute PAPI command(s) on remote node and store the result.
1296
1297         :param api_data: List of APIs with their arguments.
1298         :param method: VPP Python API method. Supported methods are: 'request',
1299             'dump' and 'stats'.
1300         :param err_msg: The message used if the PAPI command(s) execution fails.
1301         :param timeout: Timeout in seconds.
1302         :type api_data: list
1303         :type method: str
1304         :type err_msg: str
1305         :type timeout: int
1306         :returns: Stdout from remote python utility, to be parsed by caller.
1307         :rtype: str
1308         :raises SSHTimeout: If PAPI command(s) execution has timed out.
1309         :raises RuntimeError: If PAPI executor failed due to another reason.
1310         :raises AssertionError: If PAPI command(s) execution has failed.
1311         """
1312         if not api_data:
1313             raise RuntimeError("No API data provided.")
1314
1315         json_data = (
1316             json.dumps(api_data)
1317             if method in ("stats", "stats_request")
1318             else json.dumps(self._process_api_data(api_data))
1319         )
1320
1321         sock = f" --socket {socket}" if socket else ""
1322         cmd = (
1323             f"{Constants.REMOTE_FW_DIR}/{Constants.RESOURCES_PAPI_PROVIDER}"
1324             f" --method {method} --data '{json_data}'{sock}"
1325         )
1326         try:
1327             ret_code, stdout, _ = self._ssh.exec_command_sudo(
1328                 cmd=cmd, timeout=timeout, log_stdout_err=False
1329             )
1330         # TODO: Fail on non-empty stderr?
1331         except SSHTimeout:
1332             logger.error(
1333                 f"PAPI command(s) execution timeout on host"
1334                 f" {self._node['host']}:\n{api_data}"
1335             )
1336             raise
1337         except Exception as exc:
1338             raise RuntimeError(
1339                 f"PAPI command(s) execution on host {self._node['host']}"
1340                 f" failed: {api_data}"
1341             ) from exc
1342         if ret_code != 0:
1343             raise AssertionError(err_msg)
1344
1345         return stdout