X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Flibraries%2Fpython%2Fssh.py;h=6f6fc557f42c10aec384c94d887f1f9d4e38751e;hp=1908da41539b5e8ac0b06985f1b2dc9bd8e321d4;hb=HEAD;hpb=a6d233d80c359813cc0a350f07449c03d552d166 diff --git a/resources/libraries/python/ssh.py b/resources/libraries/python/ssh.py index 1908da4153..437b1ad3e6 100644 --- a/resources/libraries/python/ssh.py +++ b/resources/libraries/python/ssh.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019 Cisco and/or its affiliates. +# Copyright (c) 2022 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -15,8 +15,9 @@ import socket -import StringIO -from time import time, sleep + +from io import StringIO +from time import monotonic, sleep from paramiko import RSAKey, SSHClient, AutoAddPolicy from paramiko.ssh_exception import SSHException, NoValidConnectionsError @@ -24,23 +25,23 @@ from robot.api import logger from scp import SCPClient, SCPException from resources.libraries.python.OptionString import OptionString -from resources.libraries.python.PythonThree import raise_from -__all__ = ["exec_cmd", "exec_cmd_no_error"] +__all__ = [ + u"exec_cmd", u"exec_cmd_no_error", u"SSH", u"SSHTimeout", u"scp_node" +] # TODO: load priv key class SSHTimeout(Exception): """This exception is raised when a timeout occurs.""" - pass -class SSH(object): +class SSH: """Contains methods for managing and using SSH connections.""" - __MAX_RECV_BUF = 10*1024*1024 - __existing_connections = {} + __MAX_RECV_BUF = 10 * 1024 * 1024 + __existing_connections = dict() def __init__(self): self._ssh = None @@ -55,8 +56,7 @@ class SSH(object): :returns: IP address and port for the specified node. :rtype: int """ - - return hash(frozenset([node['host'], node['port']])) + return hash(frozenset([node[u"host"], node[u"port"]])) def connect(self, node, attempts=5): """Connect to node prior to running exec_command or scp. @@ -74,43 +74,42 @@ class SSH(object): if node_hash in SSH.__existing_connections: self._ssh = SSH.__existing_connections[node_hash] if self._ssh.get_transport().is_active(): - logger.debug('Reusing SSH: {ssh}'.format(ssh=self._ssh)) + logger.debug(f"Reusing SSH: {self._ssh}") else: if attempts > 0: self._reconnect(attempts-1) else: - raise IOError('Cannot connect to {host}'. - format(host=node['host'])) + raise IOError(f"Cannot connect to {node['host']}") else: try: - start = time() + start = monotonic() pkey = None - if 'priv_key' in node: - pkey = RSAKey.from_private_key( - StringIO.StringIO(node['priv_key'])) + if u"priv_key" in node: + pkey = RSAKey.from_private_key(StringIO(node[u"priv_key"])) self._ssh = SSHClient() self._ssh.set_missing_host_key_policy(AutoAddPolicy()) - self._ssh.connect(node['host'], username=node['username'], - password=node.get('password'), pkey=pkey, - port=node['port']) + self._ssh.connect( + node[u"host"], username=node[u"username"], + password=node.get(u"password"), pkey=pkey, + port=node[u"port"] + ) self._ssh.get_transport().set_keepalive(10) SSH.__existing_connections[node_hash] = self._ssh - logger.debug('New SSH to {peer} took {total} seconds: {ssh}'. - format( - peer=self._ssh.get_transport().getpeername(), - total=(time() - start), - ssh=self._ssh)) + logger.debug( + f"New SSH to {self._ssh.get_transport().getpeername()} " + f"took {monotonic() - start} seconds: {self._ssh}" + ) except SSHException as exc: - raise_from(IOError('Cannot connect to {host}'.format( - host=node['host'])), exc) + raise IOError(f"Cannot connect to {node[u'host']}") from exc except NoValidConnectionsError as err: - raise_from(IOError( - 'Unable to connect to port {port} on {host}'.format( - port=node['port'], host=node['host'])), err) + raise IOError( + f"Unable to connect to port {node[u'port']} on " + f"{node[u'host']}" + ) from err def disconnect(self, node=None): """Close SSH connection to the node. @@ -124,8 +123,9 @@ class SSH(object): return node_hash = self._node_hash(node) if node_hash in SSH.__existing_connections: - logger.debug('Disconnecting peer: {host}, {port}'. - format(host=node['host'], port=node['port'])) + logger.debug( + f"Disconnecting peer: {node[u'host']}, {node[u'port']}" + ) ssh = SSH.__existing_connections.pop(node_hash) ssh.close() @@ -138,8 +138,9 @@ class SSH(object): node = self._node self.disconnect(node) self.connect(node, attempts) - logger.debug('Reconnecting peer done: {host}, {port}'. - format(host=node['host'], port=node['port'])) + logger.debug( + f"Reconnecting peer done: {node[u'host']}, {node[u'port']}" + ) def exec_command(self, cmd, timeout=10, log_stdout_err=True): """Execute SSH command on a new channel on the connected Node. @@ -150,6 +151,7 @@ class SSH(object): :param log_stdout_err: If True, stdout and stderr are logged. stdout and stderr are logged also if the return code is not zero independently of the value of log_stdout_err. + Needed for calls outside Robot (e.g. from reservation script). :type cmd: str or OptionString :type timeout: int :type log_stdout_err: bool @@ -160,8 +162,8 @@ class SSH(object): if isinstance(cmd, (list, tuple)): cmd = OptionString(cmd) cmd = str(cmd) - stdout = StringIO.StringIO() - stderr = StringIO.StringIO() + stdout = u"" + stderr = u"" try: chan = self._ssh.get_transport().open_session(timeout=5) peer = self._ssh.get_transport().getpeername() @@ -171,56 +173,66 @@ class SSH(object): peer = self._ssh.get_transport().getpeername() chan.settimeout(timeout) - logger.trace('exec_command on {peer} with timeout {timeout}: {cmd}' - .format(peer=peer, timeout=timeout, cmd=cmd)) + logger.trace(f"exec_command on {peer} with timeout {timeout}: {cmd}") - start = time() + start = monotonic() chan.exec_command(cmd) while not chan.exit_status_ready() and timeout is not None: if chan.recv_ready(): - stdout.write(chan.recv(self.__MAX_RECV_BUF)) + s_out = chan.recv(self.__MAX_RECV_BUF) + stdout += s_out.decode(encoding=u'utf-8', errors=u'ignore') \ + if isinstance(s_out, bytes) else s_out if chan.recv_stderr_ready(): - stderr.write(chan.recv_stderr(self.__MAX_RECV_BUF)) + s_err = chan.recv_stderr(self.__MAX_RECV_BUF) + stderr += s_err.decode(encoding=u'utf-8', errors=u'ignore') \ + if isinstance(s_err, bytes) else s_err - if time() - start > timeout: + duration = monotonic() - start + if duration > timeout: raise SSHTimeout( - 'Timeout exception during execution of command: {cmd}\n' - 'Current contents of stdout buffer: {stdout}\n' - 'Current contents of stderr buffer: {stderr}\n' - .format(cmd=cmd, stdout=stdout.getvalue(), - stderr=stderr.getvalue()) + f"Timeout exception during execution of command: {cmd}\n" + f"Current contents of stdout buffer: " + f"{stdout}\n" + f"Current contents of stderr buffer: " + f"{stderr}\n" ) sleep(0.1) return_code = chan.recv_exit_status() while chan.recv_ready(): - stdout.write(chan.recv(self.__MAX_RECV_BUF)) + s_out = chan.recv(self.__MAX_RECV_BUF) + stdout += s_out.decode(encoding=u'utf-8', errors=u'ignore') \ + if isinstance(s_out, bytes) else s_out while chan.recv_stderr_ready(): - stderr.write(chan.recv_stderr(self.__MAX_RECV_BUF)) + s_err = chan.recv_stderr(self.__MAX_RECV_BUF) + stderr += s_err.decode(encoding=u'utf-8', errors=u'ignore') \ + if isinstance(s_err, bytes) else s_err - end = time() - logger.trace('exec_command on {peer} took {total} seconds'. - format(peer=peer, total=end-start)) + duration = monotonic() - start + logger.trace(f"exec_command on {peer} took {duration} seconds") - logger.trace('return RC {rc}'.format(rc=return_code)) + logger.trace(f"return RC {return_code}") if log_stdout_err or int(return_code): - logger.trace('return STDOUT {stdout}'. - format(stdout=stdout.getvalue())) - logger.trace('return STDERR {stderr}'. - format(stderr=stderr.getvalue())) - return return_code, stdout.getvalue(), stderr.getvalue() - - def exec_command_sudo(self, cmd, cmd_input=None, timeout=30, - log_stdout_err=True): + logger.trace( + f"return STDOUT {stdout}" + ) + logger.trace( + f"return STDERR {stderr}" + ) + return return_code, stdout, stderr + + def exec_command_sudo( + self, cmd, cmd_input=None, timeout=30, log_stdout_err=True): """Execute SSH command with sudo on a new channel on the connected Node. :param cmd: Command to be executed. :param cmd_input: Input redirected to the command. :param timeout: Timeout. :param log_stdout_err: If True, stdout and stderr are logged. + Needed for calls outside Robot (e.g. from reservation script). :type cmd: str :type cmd_input: str :type timeout: int @@ -234,21 +246,22 @@ class SSH(object): >>> ssh = SSH() >>> ssh.connect(node) >>> # Execute command without input (sudo -S cmd) - >>> ssh.exec_command_sudo("ifconfig eth0 down") - >>> # Execute command with input (sudo -S cmd <<< "input") - >>> ssh.exec_command_sudo("vpp_api_test", "dump_interface_table") + >>> ssh.exec_command_sudo(u"ifconfig eth0 down") + >>> # Execute command with input (sudo -S cmd <<< 'input') + >>> ssh.exec_command_sudo(u"vpp_api_test", u"dump_interface_table") """ if isinstance(cmd, (list, tuple)): cmd = OptionString(cmd) if cmd_input is None: - command = 'sudo -E -S {c}'.format(c=cmd) + command = f"sudo -E -S {cmd}" else: - command = 'sudo -E -S {c} <<< "{i}"'.format(c=cmd, i=cmd_input) - return self.exec_command(command, timeout, - log_stdout_err=log_stdout_err) + command = f"sudo -E -S {cmd} <<< \"{cmd_input}\"" + return self.exec_command( + command, timeout, log_stdout_err=log_stdout_err + ) - def exec_command_lxc(self, lxc_cmd, lxc_name, lxc_params='', sudo=True, - timeout=30): + def exec_command_lxc( + self, lxc_cmd, lxc_name, lxc_params=u"", sudo=True, timeout=30): """Execute command in LXC on a new SSH channel on the connected Node. :param lxc_cmd: Command to be executed. @@ -263,11 +276,11 @@ class SSH(object): :type timeout: int :returns: return_code, stdout, stderr """ - command = "lxc-attach {p} --name {n} -- /bin/sh -c '{c}'"\ - .format(p=lxc_params, n=lxc_name, c=lxc_cmd) + command = f"lxc-attach {lxc_params} --name {lxc_name} -- /bin/sh " \ + f"-c \"{lxc_cmd}\"" if sudo: - command = 'sudo -E -S {c}'.format(c=command) + command = f"sudo -E -S {command}" return self.exec_command(command, timeout) def interactive_terminal_open(self, time_out=45): @@ -289,18 +302,19 @@ class SSH(object): chan.settimeout(int(time_out)) chan.set_combine_stderr(True) - buf = '' - while not buf.endswith((":~# ", ":~$ ", "~]$ ", "~]# ")): + buf = u"" + while not buf.endswith((u":~# ", u":~$ ", u"~]$ ", u"~]# ")): try: - chunk = chan.recv(self.__MAX_RECV_BUF) - if not chunk: + s_out = chan.recv(self.__MAX_RECV_BUF) + if not s_out: break - buf += chunk + buf += s_out.decode(encoding=u'utf-8', errors=u'ignore') \ + if isinstance(s_out, bytes) else s_out if chan.exit_status_ready(): - logger.error('Channel exit status ready') + logger.error(u"Channel exit status ready") break except socket.timeout as exc: - raise_from(Exception('Socket timeout: {0}'.format(buf)), exc) + raise Exception(f"Socket timeout: {buf}") from exc return chan def interactive_terminal_exec_command(self, chan, cmd, prompt): @@ -311,7 +325,7 @@ class SSH(object): :param chan: SSH channel with opened terminal. :param cmd: Command to be executed. :param prompt: Command prompt, sequence of characters used to - indicate readiness to accept commands. + indicate readiness to accept commands. :returns: Command output. .. warning:: Interruptingcow is used here, and it uses @@ -321,24 +335,26 @@ class SSH(object): from other threads. You must not use this in a program that uses SIGALRM itself (this includes certain profilers) """ - chan.sendall('{c}\n'.format(c=cmd)) - buf = '' + chan.sendall(f"{cmd}\n") + buf = u"" while not buf.endswith(prompt): try: - chunk = chan.recv(self.__MAX_RECV_BUF) - if not chunk: + s_out = chan.recv(self.__MAX_RECV_BUF) + if not s_out: break - buf += chunk + buf += s_out.decode(encoding=u'utf-8', errors=u'ignore') \ + if isinstance(s_out, bytes) else s_out if chan.exit_status_ready(): - logger.error('Channel exit status ready') + logger.error(u"Channel exit status ready") break except socket.timeout as exc: - raise_from(Exception( - 'Socket timeout during execution of command: ' - '{0}\nBuffer content:\n{1}'.format(cmd, buf)), exc) - tmp = buf.replace(cmd.replace('\n', ''), '') + raise Exception( + f"Socket timeout during execution of command: {cmd}\n" + f"Buffer content:\n{buf}" + ) from exc + tmp = buf.replace(cmd.replace(u"\n", u""), u"") for item in prompt: - tmp.replace(item, '') + tmp.replace(item, u"") return tmp @staticmethod @@ -349,16 +365,17 @@ class SSH(object): """ chan.close() - def scp(self, local_path, remote_path, get=False, timeout=30, + def scp( + self, local_path, remote_path, get=False, timeout=30, wildcard=False): """Copy files from local_path to remote_path or vice versa. connect() method has to be called first! :param local_path: Path to local file that should be uploaded; or - path where to save remote file. + path where to save remote file. :param remote_path: Remote path where to place uploaded file; or - path to remote file which should be downloaded. + path to remote file which should be downloaded. :param get: scp operation to perform. Default is put. :param timeout: Timeout value in seconds. :param wildcard: If path has wildcard characters. Default is false. @@ -369,30 +386,37 @@ class SSH(object): :type wildcard: bool """ if not get: - logger.trace('SCP {0} to {1}:{2}'.format( - local_path, self._ssh.get_transport().getpeername(), - remote_path)) + logger.trace( + f"SCP {local_path} to " + f"{self._ssh.get_transport().getpeername()}:{remote_path}" + ) else: - logger.trace('SCP {0}:{1} to {2}'.format( - self._ssh.get_transport().getpeername(), remote_path, - local_path)) + logger.trace( + f"SCP {self._ssh.get_transport().getpeername()}:{remote_path} " + f"to {local_path}" + ) # SCPCLient takes a paramiko transport as its only argument if not wildcard: scp = SCPClient(self._ssh.get_transport(), socket_timeout=timeout) else: - scp = SCPClient(self._ssh.get_transport(), sanitize=lambda x: x, - socket_timeout=timeout) - start = time() + scp = SCPClient( + self._ssh.get_transport(), sanitize=lambda x: x, + socket_timeout=timeout + ) + start = monotonic() if not get: scp.put(local_path, remote_path) else: scp.get(remote_path, local_path) scp.close() - end = time() - logger.trace('SCP took {0} seconds'.format(end-start)) + duration = monotonic() - start + logger.trace(f"SCP took {duration} seconds") -def exec_cmd(node, cmd, timeout=600, sudo=False, disconnect=False): +def exec_cmd( + node, cmd, timeout=600, sudo=False, disconnect=False, + log_stdout_err=True + ): """Convenience function to ssh/exec/return rc, out & err. Returns (rc, stdout, stderr). @@ -402,59 +426,43 @@ def exec_cmd(node, cmd, timeout=600, sudo=False, disconnect=False): :param timeout: Timeout value in seconds. Default: 600. :param sudo: Sudo privilege execution flag. Default: False. :param disconnect: Close the opened SSH connection if True. + :param log_stdout_err: If True, stdout and stderr are logged. stdout + and stderr are logged also if the return code is not zero + independently of the value of log_stdout_err. + Needed for calls outside Robot (e.g. from reservation script). :type node: dict :type cmd: str or OptionString :type timeout: int :type sudo: bool :type disconnect: bool + :type log_stdout_err: bool :returns: RC, Stdout, Stderr. - :rtype: tuple(int, str, str) + :rtype: Tuple[int, str, str] """ if node is None: - raise TypeError('Node parameter is None') + raise TypeError(u"Node parameter is None") if cmd is None: - raise TypeError('Command parameter is None') + raise TypeError(u"Command parameter is None") if not cmd: - raise ValueError('Empty command parameter') + raise ValueError(u"Empty command parameter") ssh = SSH() - if node.get('host_port') is not None: - ssh_node = dict() - ssh_node['host'] = '127.0.0.1' - ssh_node['port'] = node['port'] - ssh_node['username'] = node['username'] - ssh_node['password'] = node['password'] - import pexpect - options = '-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null' - tnl = '-L {port}:127.0.0.1:{port}'.format(port=node['port']) - ssh_cmd = 'ssh {tnl} {op} {user}@{host} -p {host_port}'.\ - format(tnl=tnl, op=options, user=node['host_username'], - host=node['host'], host_port=node['host_port']) - logger.trace('Initializing local port forwarding:\n{ssh_cmd}'. - format(ssh_cmd=ssh_cmd)) - child = pexpect.spawn(ssh_cmd) - child.expect('.* password: ') - logger.trace(child.after) - child.sendline(node['host_password']) - child.expect('Welcome .*') - logger.trace(child.after) - logger.trace('Local port forwarding finished.') - else: - ssh_node = node - try: - ssh.connect(ssh_node) + ssh.connect(node) except SSHException as err: - logger.error("Failed to connect to node" + repr(err)) + logger.error(f"Failed to connect to node {node[u'host']}\n{err!r}") return None, None, None try: if not sudo: - (ret_code, stdout, stderr) = ssh.exec_command(cmd, timeout=timeout) + ret_code, stdout, stderr = ssh.exec_command( + cmd, timeout=timeout, log_stdout_err=log_stdout_err + ) else: - (ret_code, stdout, stderr) = ssh.exec_command_sudo( - cmd, timeout=timeout) + ret_code, stdout, stderr = ssh.exec_command_sudo( + cmd, timeout=timeout, log_stdout_err=log_stdout_err + ) except SSHException as err: logger.error(repr(err)) return None, None, None @@ -467,7 +475,8 @@ def exec_cmd(node, cmd, timeout=600, sudo=False, disconnect=False): def exec_cmd_no_error( node, cmd, timeout=600, sudo=False, message=None, disconnect=False, - retries=0, include_reason=False): + retries=0, include_reason=False, log_stdout_err=True + ): """Convenience function to ssh/exec/return out & err. Verifies that return code is zero. @@ -483,6 +492,10 @@ def exec_cmd_no_error( :param disconnect: Close the opened SSH connection if True. :param retries: How many times to retry on failure. :param include_reason: Whether default info should be appended to message. + :param log_stdout_err: If True, stdout and stderr are logged. stdout + and stderr are logged also if the return code is not zero + independently of the value of log_stdout_err. + Needed for calls outside Robot thread (e.g. parallel framework setup). :type node: dict :type cmd: str or OptionString :type timeout: int @@ -491,25 +504,24 @@ def exec_cmd_no_error( :type disconnect: bool :type retries: int :type include_reason: bool + :type log_stdout_err: bool :returns: Stdout, Stderr. :rtype: tuple(str, str) :raises RuntimeError: If bash return code is not 0. """ for _ in range(retries + 1): ret_code, stdout, stderr = exec_cmd( - node, cmd, timeout=timeout, sudo=sudo, disconnect=disconnect) + node, cmd, timeout=timeout, sudo=sudo, disconnect=disconnect, + log_stdout_err=log_stdout_err + ) if ret_code == 0: break sleep(1) else: - msg = 'Command execution failed: "{cmd}"\nRC: {rc}\n{stderr}'.format( - cmd=cmd, rc=ret_code, stderr=stderr) + msg = f"Command execution failed: '{cmd}'\nRC: {ret_code}\n{stderr}" logger.info(msg) if message: - if include_reason: - msg = message + '\n' + msg - else: - msg = message + msg = f"{message}\n{msg}" if include_reason else message raise RuntimeError(msg) return stdout, stderr @@ -540,13 +552,11 @@ def scp_node( try: ssh.connect(node) except SSHException as exc: - raise_from(RuntimeError( - 'Failed to connect to {host}!'.format(host=node['host'])), exc) + raise RuntimeError(f"Failed to connect to {node[u'host']}!") from exc try: ssh.scp(local_path, remote_path, get, timeout) except SCPException as exc: - raise_from(RuntimeError( - 'SCP execution failed on {host}!'.format(host=node['host'])), exc) + raise RuntimeError(f"SCP execution failed on {node[u'host']}!") from exc finally: if disconnect: ssh.disconnect()