feat(model): Cleanup and introduce telemetry
[csit.git] / resources / libraries / python / ssh.py
1 # Copyright (c) 2022 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Library for SSH connection management."""
15
16
17 import socket
18
19 from io import StringIO
20 from time import monotonic, sleep
21
22 from paramiko import RSAKey, SSHClient, AutoAddPolicy
23 from paramiko.ssh_exception import SSHException, NoValidConnectionsError
24 from robot.api import logger
25 from scp import SCPClient, SCPException
26
27 from resources.libraries.python.OptionString import OptionString
28
29 __all__ = [
30     u"exec_cmd", u"exec_cmd_no_error", u"SSH", u"SSHTimeout", u"scp_node"
31 ]
32
33 # TODO: load priv key
34
35
36 class SSHTimeout(Exception):
37     """This exception is raised when a timeout occurs."""
38
39
40 class SSH:
41     """Contains methods for managing and using SSH connections."""
42
43     __MAX_RECV_BUF = 10 * 1024 * 1024
44     __existing_connections = dict()
45
46     def __init__(self):
47         self._ssh = None
48         self._node = None
49
50     @staticmethod
51     def _node_hash(node):
52         """Get IP address and port hash from node dictionary.
53
54         :param node: Node in topology.
55         :type node: dict
56         :returns: IP address and port for the specified node.
57         :rtype: int
58         """
59         return hash(frozenset([node[u"host"], node[u"port"]]))
60
61     def connect(self, node, attempts=5):
62         """Connect to node prior to running exec_command or scp.
63
64         If there already is a connection to the node, this method reuses it.
65
66         :param node: Node in topology.
67         :param attempts: Number of reconnect attempts.
68         :type node: dict
69         :type attempts: int
70         :raises IOError: If cannot connect to host.
71         """
72         self._node = node
73         node_hash = self._node_hash(node)
74         if node_hash in SSH.__existing_connections:
75             self._ssh = SSH.__existing_connections[node_hash]
76             if self._ssh.get_transport().is_active():
77                 logger.debug(f"Reusing SSH: {self._ssh}")
78             else:
79                 if attempts > 0:
80                     self._reconnect(attempts-1)
81                 else:
82                     raise IOError(f"Cannot connect to {node['host']}")
83         else:
84             try:
85                 start = monotonic()
86                 pkey = None
87                 if u"priv_key" in node:
88                     pkey = RSAKey.from_private_key(StringIO(node[u"priv_key"]))
89
90                 self._ssh = SSHClient()
91                 self._ssh.set_missing_host_key_policy(AutoAddPolicy())
92
93                 self._ssh.connect(
94                     node[u"host"], username=node[u"username"],
95                     password=node.get(u"password"), pkey=pkey,
96                     port=node[u"port"]
97                 )
98
99                 self._ssh.get_transport().set_keepalive(10)
100
101                 SSH.__existing_connections[node_hash] = self._ssh
102                 logger.debug(
103                     f"New SSH to {self._ssh.get_transport().getpeername()} "
104                     f"took {monotonic() - start} seconds: {self._ssh}"
105                 )
106             except SSHException as exc:
107                 raise IOError(f"Cannot connect to {node[u'host']}") from exc
108             except NoValidConnectionsError as err:
109                 raise IOError(
110                     f"Unable to connect to port {node[u'port']} on "
111                     f"{node[u'host']}"
112                 ) from err
113
114     def disconnect(self, node=None):
115         """Close SSH connection to the node.
116
117         :param node: The node to disconnect from. None means last connected.
118         :type node: dict or None
119         """
120         if node is None:
121             node = self._node
122         if node is None:
123             return
124         node_hash = self._node_hash(node)
125         if node_hash in SSH.__existing_connections:
126             logger.debug(
127                 f"Disconnecting peer: {node[u'host']}, {node[u'port']}"
128             )
129             ssh = SSH.__existing_connections.pop(node_hash)
130             ssh.close()
131
132     def _reconnect(self, attempts=0):
133         """Close the SSH connection and open it again.
134
135         :param attempts: Number of reconnect attempts.
136         :type attempts: int
137         """
138         node = self._node
139         self.disconnect(node)
140         self.connect(node, attempts)
141         logger.debug(
142             f"Reconnecting peer done: {node[u'host']}, {node[u'port']}"
143         )
144
145     def exec_command(self, cmd, timeout=10, log_stdout_err=True):
146         """Execute SSH command on a new channel on the connected Node.
147
148         :param cmd: Command to run on the Node.
149         :param timeout: Maximal time in seconds to wait until the command is
150             done. If set to None then wait forever.
151         :param log_stdout_err: If True, stdout and stderr are logged. stdout
152             and stderr are logged also if the return code is not zero
153             independently of the value of log_stdout_err.
154             Needed for calls outside Robot (e.g. from reservation script).
155         :type cmd: str or OptionString
156         :type timeout: int
157         :type log_stdout_err: bool
158         :returns: return_code, stdout, stderr
159         :rtype: tuple(int, str, str)
160         :raises SSHTimeout: If command is not finished in timeout time.
161         """
162         if isinstance(cmd, (list, tuple)):
163             cmd = OptionString(cmd)
164         cmd = str(cmd)
165         stdout = u""
166         stderr = u""
167         try:
168             chan = self._ssh.get_transport().open_session(timeout=5)
169             peer = self._ssh.get_transport().getpeername()
170         except (AttributeError, SSHException):
171             self._reconnect()
172             chan = self._ssh.get_transport().open_session(timeout=5)
173             peer = self._ssh.get_transport().getpeername()
174         chan.settimeout(timeout)
175
176         logger.trace(f"exec_command on {peer} with timeout {timeout}: {cmd}")
177
178         start = monotonic()
179         chan.exec_command(cmd)
180         while not chan.exit_status_ready() and timeout is not None:
181             if chan.recv_ready():
182                 s_out = chan.recv(self.__MAX_RECV_BUF)
183                 stdout += s_out.decode(encoding=u'utf-8', errors=u'ignore') \
184                     if isinstance(s_out, bytes) else s_out
185
186             if chan.recv_stderr_ready():
187                 s_err = chan.recv_stderr(self.__MAX_RECV_BUF)
188                 stderr += s_err.decode(encoding=u'utf-8', errors=u'ignore') \
189                     if isinstance(s_err, bytes) else s_err
190
191             duration = monotonic() - start
192             if duration > timeout:
193                 raise SSHTimeout(
194                     f"Timeout exception during execution of command: {cmd}\n"
195                     f"Current contents of stdout buffer: "
196                     f"{stdout}\n"
197                     f"Current contents of stderr buffer: "
198                     f"{stderr}\n"
199                 )
200
201             sleep(0.1)
202         return_code = chan.recv_exit_status()
203
204         while chan.recv_ready():
205             s_out = chan.recv(self.__MAX_RECV_BUF)
206             stdout += s_out.decode(encoding=u'utf-8', errors=u'ignore') \
207                 if isinstance(s_out, bytes) else s_out
208
209         while chan.recv_stderr_ready():
210             s_err = chan.recv_stderr(self.__MAX_RECV_BUF)
211             stderr += s_err.decode(encoding=u'utf-8', errors=u'ignore') \
212                 if isinstance(s_err, bytes) else s_err
213
214         duration = monotonic() - start
215         logger.trace(f"exec_command on {peer} took {duration} seconds")
216
217         logger.trace(f"return RC {return_code}")
218         if log_stdout_err or int(return_code):
219             logger.trace(
220                 f"return STDOUT {stdout}"
221             )
222             logger.trace(
223                 f"return STDERR {stderr}"
224             )
225         return return_code, stdout, stderr
226
227     def exec_command_sudo(
228             self, cmd, cmd_input=None, timeout=30, log_stdout_err=True):
229         """Execute SSH command with sudo on a new channel on the connected Node.
230
231         :param cmd: Command to be executed.
232         :param cmd_input: Input redirected to the command.
233         :param timeout: Timeout.
234         :param log_stdout_err: If True, stdout and stderr are logged.
235             Needed for calls outside Robot (e.g. from reservation script).
236         :type cmd: str
237         :type cmd_input: str
238         :type timeout: int
239         :type log_stdout_err: bool
240         :returns: return_code, stdout, stderr
241         :rtype: tuple(int, str, str)
242
243         :Example:
244
245         >>> from ssh import SSH
246         >>> ssh = SSH()
247         >>> ssh.connect(node)
248         >>> # Execute command without input (sudo -S cmd)
249         >>> ssh.exec_command_sudo(u"ifconfig eth0 down")
250         >>> # Execute command with input (sudo -S cmd <<< 'input')
251         >>> ssh.exec_command_sudo(u"vpp_api_test", u"dump_interface_table")
252         """
253         if isinstance(cmd, (list, tuple)):
254             cmd = OptionString(cmd)
255         if cmd_input is None:
256             command = f"sudo -E -S {cmd}"
257         else:
258             command = f"sudo -E -S {cmd} <<< \"{cmd_input}\""
259         return self.exec_command(
260             command, timeout, log_stdout_err=log_stdout_err
261         )
262
263     def exec_command_lxc(
264             self, lxc_cmd, lxc_name, lxc_params=u"", sudo=True, timeout=30):
265         """Execute command in LXC on a new SSH channel on the connected Node.
266
267         :param lxc_cmd: Command to be executed.
268         :param lxc_name: LXC name.
269         :param lxc_params: Additional parameters for LXC attach.
270         :param sudo: Run in privileged LXC mode. Default: privileged
271         :param timeout: Timeout.
272         :type lxc_cmd: str
273         :type lxc_name: str
274         :type lxc_params: str
275         :type sudo: bool
276         :type timeout: int
277         :returns: return_code, stdout, stderr
278         """
279         command = f"lxc-attach {lxc_params} --name {lxc_name} -- /bin/sh " \
280             f"-c \"{lxc_cmd}\""
281
282         if sudo:
283             command = f"sudo -E -S {command}"
284         return self.exec_command(command, timeout)
285
286     def interactive_terminal_open(self, time_out=45):
287         """Open interactive terminal on a new channel on the connected Node.
288
289         :param time_out: Timeout in seconds.
290         :returns: SSH channel with opened terminal.
291
292         .. warning:: Interruptingcow is used here, and it uses
293            signal(SIGALRM) to let the operating system interrupt program
294            execution. This has the following limitations: Python signal
295            handlers only apply to the main thread, so you cannot use this
296            from other threads. You must not use this in a program that
297            uses SIGALRM itself (this includes certain profilers)
298         """
299         chan = self._ssh.get_transport().open_session()
300         chan.get_pty()
301         chan.invoke_shell()
302         chan.settimeout(int(time_out))
303         chan.set_combine_stderr(True)
304
305         buf = u""
306         while not buf.endswith((u":~# ", u":~$ ", u"~]$ ", u"~]# ")):
307             try:
308                 s_out = chan.recv(self.__MAX_RECV_BUF)
309                 if not s_out:
310                     break
311                 buf += s_out.decode(encoding=u'utf-8', errors=u'ignore') \
312                     if isinstance(s_out, bytes) else s_out
313                 if chan.exit_status_ready():
314                     logger.error(u"Channel exit status ready")
315                     break
316             except socket.timeout as exc:
317                 raise Exception(f"Socket timeout: {buf}") from exc
318         return chan
319
320     def interactive_terminal_exec_command(self, chan, cmd, prompt):
321         """Execute command on interactive terminal.
322
323         interactive_terminal_open() method has to be called first!
324
325         :param chan: SSH channel with opened terminal.
326         :param cmd: Command to be executed.
327         :param prompt: Command prompt, sequence of characters used to
328             indicate readiness to accept commands.
329         :returns: Command output.
330
331         .. warning:: Interruptingcow is used here, and it uses
332            signal(SIGALRM) to let the operating system interrupt program
333            execution. This has the following limitations: Python signal
334            handlers only apply to the main thread, so you cannot use this
335            from other threads. You must not use this in a program that
336            uses SIGALRM itself (this includes certain profilers)
337         """
338         chan.sendall(f"{cmd}\n")
339         buf = u""
340         while not buf.endswith(prompt):
341             try:
342                 s_out = chan.recv(self.__MAX_RECV_BUF)
343                 if not s_out:
344                     break
345                 buf += s_out.decode(encoding=u'utf-8', errors=u'ignore') \
346                     if isinstance(s_out, bytes) else s_out
347                 if chan.exit_status_ready():
348                     logger.error(u"Channel exit status ready")
349                     break
350             except socket.timeout as exc:
351                 raise Exception(
352                     f"Socket timeout during execution of command: {cmd}\n"
353                     f"Buffer content:\n{buf}"
354                 ) from exc
355         tmp = buf.replace(cmd.replace(u"\n", u""), u"")
356         for item in prompt:
357             tmp.replace(item, u"")
358         return tmp
359
360     @staticmethod
361     def interactive_terminal_close(chan):
362         """Close interactive terminal SSH channel.
363
364         :param chan: SSH channel to be closed.
365         """
366         chan.close()
367
368     def scp(
369             self, local_path, remote_path, get=False, timeout=30,
370             wildcard=False):
371         """Copy files from local_path to remote_path or vice versa.
372
373         connect() method has to be called first!
374
375         :param local_path: Path to local file that should be uploaded; or
376             path where to save remote file.
377         :param remote_path: Remote path where to place uploaded file; or
378             path to remote file which should be downloaded.
379         :param get: scp operation to perform. Default is put.
380         :param timeout: Timeout value in seconds.
381         :param wildcard: If path has wildcard characters. Default is false.
382         :type local_path: str
383         :type remote_path: str
384         :type get: bool
385         :type timeout: int
386         :type wildcard: bool
387         """
388         if not get:
389             logger.trace(
390                 f"SCP {local_path} to "
391                 f"{self._ssh.get_transport().getpeername()}:{remote_path}"
392             )
393         else:
394             logger.trace(
395                 f"SCP {self._ssh.get_transport().getpeername()}:{remote_path} "
396                 f"to {local_path}"
397             )
398         # SCPCLient takes a paramiko transport as its only argument
399         if not wildcard:
400             scp = SCPClient(self._ssh.get_transport(), socket_timeout=timeout)
401         else:
402             scp = SCPClient(
403                 self._ssh.get_transport(), sanitize=lambda x: x,
404                 socket_timeout=timeout
405             )
406         start = monotonic()
407         if not get:
408             scp.put(local_path, remote_path)
409         else:
410             scp.get(remote_path, local_path)
411         scp.close()
412         duration = monotonic() - start
413         logger.trace(f"SCP took {duration} seconds")
414
415
416 def exec_cmd(
417         node, cmd, timeout=600, sudo=False, disconnect=False,
418         log_stdout_err=True
419     ):
420     """Convenience function to ssh/exec/return rc, out & err.
421
422     Returns (rc, stdout, stderr).
423
424     :param node: The node to execute command on.
425     :param cmd: Command to execute.
426     :param timeout: Timeout value in seconds. Default: 600.
427     :param sudo: Sudo privilege execution flag. Default: False.
428     :param disconnect: Close the opened SSH connection if True.
429     :param log_stdout_err: If True, stdout and stderr are logged. stdout
430         and stderr are logged also if the return code is not zero
431         independently of the value of log_stdout_err.
432         Needed for calls outside Robot (e.g. from reservation script).
433     :type node: dict
434     :type cmd: str or OptionString
435     :type timeout: int
436     :type sudo: bool
437     :type disconnect: bool
438     :type log_stdout_err: bool
439     :returns: RC, Stdout, Stderr.
440     :rtype: Tuple[int, str, str]
441     """
442     if node is None:
443         raise TypeError(u"Node parameter is None")
444     if cmd is None:
445         raise TypeError(u"Command parameter is None")
446     if not cmd:
447         raise ValueError(u"Empty command parameter")
448
449     ssh = SSH()
450
451     try:
452         ssh.connect(node)
453     except SSHException as err:
454         logger.error(f"Failed to connect to node {node[u'host']}\n{err!r}")
455         return None, None, None
456
457     try:
458         if not sudo:
459             ret_code, stdout, stderr = ssh.exec_command(
460                 cmd, timeout=timeout, log_stdout_err=log_stdout_err
461             )
462         else:
463             ret_code, stdout, stderr = ssh.exec_command_sudo(
464                 cmd, timeout=timeout, log_stdout_err=log_stdout_err
465             )
466     except SSHException as err:
467         logger.error(repr(err))
468         return None, None, None
469     finally:
470         if disconnect:
471             ssh.disconnect()
472
473     return ret_code, stdout, stderr
474
475
476 def exec_cmd_no_error(
477         node, cmd, timeout=600, sudo=False, message=None, disconnect=False,
478         retries=0, include_reason=False, log_stdout_err=True
479     ):
480     """Convenience function to ssh/exec/return out & err.
481
482     Verifies that return code is zero.
483     Supports retries, timeout is related to each try separately then. There is
484     sleep(1) before each retry.
485     Disconnect (if enabled) is applied after each try.
486
487     :param node: DUT node.
488     :param cmd: Command to be executed.
489     :param timeout: Timeout value in seconds. Default: 600.
490     :param sudo: Sudo privilege execution flag. Default: False.
491     :param message: Error message in case of failure. Default: None.
492     :param disconnect: Close the opened SSH connection if True.
493     :param retries: How many times to retry on failure.
494     :param include_reason: Whether default info should be appended to message.
495     :param log_stdout_err: If True, stdout and stderr are logged. stdout
496         and stderr are logged also if the return code is not zero
497         independently of the value of log_stdout_err.
498         Needed for calls outside Robot thread (e.g. parallel framework setup).
499     :type node: dict
500     :type cmd: str or OptionString
501     :type timeout: int
502     :type sudo: bool
503     :type message: str
504     :type disconnect: bool
505     :type retries: int
506     :type include_reason: bool
507     :type log_stdout_err: bool
508     :returns: Stdout, Stderr.
509     :rtype: tuple(str, str)
510     :raises RuntimeError: If bash return code is not 0.
511     """
512     for _ in range(retries + 1):
513         ret_code, stdout, stderr = exec_cmd(
514             node, cmd, timeout=timeout, sudo=sudo, disconnect=disconnect,
515             log_stdout_err=log_stdout_err
516         )
517         if ret_code == 0:
518             break
519         sleep(1)
520     else:
521         msg = f"Command execution failed: '{cmd}'\nRC: {ret_code}\n{stderr}"
522         logger.info(msg)
523         if message:
524             msg = f"{message}\n{msg}" if include_reason else message
525         raise RuntimeError(msg)
526
527     return stdout, stderr
528
529
530 def scp_node(
531         node, local_path, remote_path, get=False, timeout=30, disconnect=False):
532     """Copy files from local_path to remote_path or vice versa.
533
534     :param node: SUT node.
535     :param local_path: Path to local file that should be uploaded; or
536         path where to save remote file.
537     :param remote_path: Remote path where to place uploaded file; or
538         path to remote file which should be downloaded.
539     :param get: scp operation to perform. Default is put.
540     :param timeout: Timeout value in seconds.
541     :param disconnect: Close the opened SSH connection if True.
542     :type node: dict
543     :type local_path: str
544     :type remote_path: str
545     :type get: bool
546     :type timeout: int
547     :type disconnect: bool
548     :raises RuntimeError: If SSH connection failed or SCP transfer failed.
549     """
550     ssh = SSH()
551
552     try:
553         ssh.connect(node)
554     except SSHException as exc:
555         raise RuntimeError(f"Failed to connect to {node[u'host']}!") from exc
556     try:
557         ssh.scp(local_path, remote_path, get, timeout)
558     except SCPException as exc:
559         raise RuntimeError(f"SCP execution failed on {node[u'host']}!") from exc
560     finally:
561         if disconnect:
562             ssh.disconnect()