X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Flibraries%2Fpython%2Fssh.py;h=5359a6e5fb6303a430a0a6fc083ee0ab8089cbdb;hp=1908da41539b5e8ac0b06985f1b2dc9bd8e321d4;hb=d68951ac245150eeefa6e0f4156e4c1b5c9e9325;hpb=ed0258a440cfad7023d643f717ab78ac568dc59b diff --git a/resources/libraries/python/ssh.py b/resources/libraries/python/ssh.py index 1908da4153..5359a6e5fb 100644 --- a/resources/libraries/python/ssh.py +++ b/resources/libraries/python/ssh.py @@ -15,7 +15,8 @@ import socket -import StringIO + +from io import StringIO from time import time, sleep from paramiko import RSAKey, SSHClient, AutoAddPolicy @@ -24,23 +25,23 @@ from robot.api import logger from scp import SCPClient, SCPException from resources.libraries.python.OptionString import OptionString -from resources.libraries.python.PythonThree import raise_from -__all__ = ["exec_cmd", "exec_cmd_no_error"] +__all__ = [ + u"exec_cmd", u"exec_cmd_no_error", u"SSH", u"SSHTimeout", u"scp_node" +] # TODO: load priv key class SSHTimeout(Exception): """This exception is raised when a timeout occurs.""" - pass -class SSH(object): +class SSH: """Contains methods for managing and using SSH connections.""" - __MAX_RECV_BUF = 10*1024*1024 - __existing_connections = {} + __MAX_RECV_BUF = 10 * 1024 * 1024 + __existing_connections = dict() def __init__(self): self._ssh = None @@ -55,8 +56,7 @@ class SSH(object): :returns: IP address and port for the specified node. :rtype: int """ - - return hash(frozenset([node['host'], node['port']])) + return hash(frozenset([node[u"host"], node[u"port"]])) def connect(self, node, attempts=5): """Connect to node prior to running exec_command or scp. @@ -74,43 +74,42 @@ class SSH(object): if node_hash in SSH.__existing_connections: self._ssh = SSH.__existing_connections[node_hash] if self._ssh.get_transport().is_active(): - logger.debug('Reusing SSH: {ssh}'.format(ssh=self._ssh)) + logger.debug(f"Reusing SSH: {self._ssh}") else: if attempts > 0: self._reconnect(attempts-1) else: - raise IOError('Cannot connect to {host}'. - format(host=node['host'])) + raise IOError(f"Cannot connect to {node['host']}") else: try: start = time() pkey = None - if 'priv_key' in node: - pkey = RSAKey.from_private_key( - StringIO.StringIO(node['priv_key'])) + if u"priv_key" in node: + pkey = RSAKey.from_private_key(StringIO(node[u"priv_key"])) self._ssh = SSHClient() self._ssh.set_missing_host_key_policy(AutoAddPolicy()) - self._ssh.connect(node['host'], username=node['username'], - password=node.get('password'), pkey=pkey, - port=node['port']) + self._ssh.connect( + node[u"host"], username=node[u"username"], + password=node.get(u"password"), pkey=pkey, + port=node[u"port"] + ) self._ssh.get_transport().set_keepalive(10) SSH.__existing_connections[node_hash] = self._ssh - logger.debug('New SSH to {peer} took {total} seconds: {ssh}'. - format( - peer=self._ssh.get_transport().getpeername(), - total=(time() - start), - ssh=self._ssh)) + logger.debug( + f"New SSH to {self._ssh.get_transport().getpeername()} " + f"took {time() - start} seconds: {self._ssh}" + ) except SSHException as exc: - raise_from(IOError('Cannot connect to {host}'.format( - host=node['host'])), exc) + raise IOError(f"Cannot connect to {node[u'host']}") from exc except NoValidConnectionsError as err: - raise_from(IOError( - 'Unable to connect to port {port} on {host}'.format( - port=node['port'], host=node['host'])), err) + raise IOError( + f"Unable to connect to port {node[u'port']} on " + f"{node[u'host']}" + ) from err def disconnect(self, node=None): """Close SSH connection to the node. @@ -124,8 +123,9 @@ class SSH(object): return node_hash = self._node_hash(node) if node_hash in SSH.__existing_connections: - logger.debug('Disconnecting peer: {host}, {port}'. - format(host=node['host'], port=node['port'])) + logger.debug( + f"Disconnecting peer: {node[u'host']}, {node[u'port']}" + ) ssh = SSH.__existing_connections.pop(node_hash) ssh.close() @@ -138,8 +138,9 @@ class SSH(object): node = self._node self.disconnect(node) self.connect(node, attempts) - logger.debug('Reconnecting peer done: {host}, {port}'. - format(host=node['host'], port=node['port'])) + logger.debug( + f"Reconnecting peer done: {node[u'host']}, {node[u'port']}" + ) def exec_command(self, cmd, timeout=10, log_stdout_err=True): """Execute SSH command on a new channel on the connected Node. @@ -160,8 +161,8 @@ class SSH(object): if isinstance(cmd, (list, tuple)): cmd = OptionString(cmd) cmd = str(cmd) - stdout = StringIO.StringIO() - stderr = StringIO.StringIO() + stdout = u"" + stderr = u"" try: chan = self._ssh.get_transport().open_session(timeout=5) peer = self._ssh.get_transport().getpeername() @@ -171,50 +172,58 @@ class SSH(object): peer = self._ssh.get_transport().getpeername() chan.settimeout(timeout) - logger.trace('exec_command on {peer} with timeout {timeout}: {cmd}' - .format(peer=peer, timeout=timeout, cmd=cmd)) + logger.trace(f"exec_command on {peer} with timeout {timeout}: {cmd}") start = time() chan.exec_command(cmd) while not chan.exit_status_ready() and timeout is not None: if chan.recv_ready(): - stdout.write(chan.recv(self.__MAX_RECV_BUF)) + s_out = chan.recv(self.__MAX_RECV_BUF) + stdout += s_out.decode(encoding=u'utf-8', errors=u'ignore') \ + if isinstance(s_out, bytes) else s_out if chan.recv_stderr_ready(): - stderr.write(chan.recv_stderr(self.__MAX_RECV_BUF)) + s_err = chan.recv_stderr(self.__MAX_RECV_BUF) + stderr += s_err.decode(encoding=u'utf-8', errors=u'ignore') \ + if isinstance(s_err, bytes) else s_err if time() - start > timeout: raise SSHTimeout( - 'Timeout exception during execution of command: {cmd}\n' - 'Current contents of stdout buffer: {stdout}\n' - 'Current contents of stderr buffer: {stderr}\n' - .format(cmd=cmd, stdout=stdout.getvalue(), - stderr=stderr.getvalue()) + f"Timeout exception during execution of command: {cmd}\n" + f"Current contents of stdout buffer: " + f"{stdout}\n" + f"Current contents of stderr buffer: " + f"{stderr}\n" ) sleep(0.1) return_code = chan.recv_exit_status() while chan.recv_ready(): - stdout.write(chan.recv(self.__MAX_RECV_BUF)) + s_out = chan.recv(self.__MAX_RECV_BUF) + stdout += s_out.decode(encoding=u'utf-8', errors=u'ignore') \ + if isinstance(s_out, bytes) else s_out while chan.recv_stderr_ready(): - stderr.write(chan.recv_stderr(self.__MAX_RECV_BUF)) + s_err = chan.recv_stderr(self.__MAX_RECV_BUF) + stderr += s_err.decode(encoding=u'utf-8', errors=u'ignore') \ + if isinstance(s_err, bytes) else s_err end = time() - logger.trace('exec_command on {peer} took {total} seconds'. - format(peer=peer, total=end-start)) + logger.trace(f"exec_command on {peer} took {end-start} seconds") - logger.trace('return RC {rc}'.format(rc=return_code)) + logger.trace(f"return RC {return_code}") if log_stdout_err or int(return_code): - logger.trace('return STDOUT {stdout}'. - format(stdout=stdout.getvalue())) - logger.trace('return STDERR {stderr}'. - format(stderr=stderr.getvalue())) - return return_code, stdout.getvalue(), stderr.getvalue() - - def exec_command_sudo(self, cmd, cmd_input=None, timeout=30, - log_stdout_err=True): + logger.trace( + f"return STDOUT {stdout}" + ) + logger.trace( + f"return STDERR {stderr}" + ) + return return_code, stdout, stderr + + def exec_command_sudo( + self, cmd, cmd_input=None, timeout=30, log_stdout_err=True): """Execute SSH command with sudo on a new channel on the connected Node. :param cmd: Command to be executed. @@ -234,21 +243,22 @@ class SSH(object): >>> ssh = SSH() >>> ssh.connect(node) >>> # Execute command without input (sudo -S cmd) - >>> ssh.exec_command_sudo("ifconfig eth0 down") - >>> # Execute command with input (sudo -S cmd <<< "input") - >>> ssh.exec_command_sudo("vpp_api_test", "dump_interface_table") + >>> ssh.exec_command_sudo(u"ifconfig eth0 down") + >>> # Execute command with input (sudo -S cmd <<< 'input') + >>> ssh.exec_command_sudo(u"vpp_api_test", u"dump_interface_table") """ if isinstance(cmd, (list, tuple)): cmd = OptionString(cmd) if cmd_input is None: - command = 'sudo -E -S {c}'.format(c=cmd) + command = f"sudo -E -S {cmd}" else: - command = 'sudo -E -S {c} <<< "{i}"'.format(c=cmd, i=cmd_input) - return self.exec_command(command, timeout, - log_stdout_err=log_stdout_err) + command = f"sudo -E -S {cmd} <<< \"{cmd_input}\"" + return self.exec_command( + command, timeout, log_stdout_err=log_stdout_err + ) - def exec_command_lxc(self, lxc_cmd, lxc_name, lxc_params='', sudo=True, - timeout=30): + def exec_command_lxc( + self, lxc_cmd, lxc_name, lxc_params=u"", sudo=True, timeout=30): """Execute command in LXC on a new SSH channel on the connected Node. :param lxc_cmd: Command to be executed. @@ -263,11 +273,11 @@ class SSH(object): :type timeout: int :returns: return_code, stdout, stderr """ - command = "lxc-attach {p} --name {n} -- /bin/sh -c '{c}'"\ - .format(p=lxc_params, n=lxc_name, c=lxc_cmd) + command = f"lxc-attach {lxc_params} --name {lxc_name} -- /bin/sh " \ + f"-c \"{lxc_cmd}\"" if sudo: - command = 'sudo -E -S {c}'.format(c=command) + command = f"sudo -E -S {command}" return self.exec_command(command, timeout) def interactive_terminal_open(self, time_out=45): @@ -289,18 +299,18 @@ class SSH(object): chan.settimeout(int(time_out)) chan.set_combine_stderr(True) - buf = '' - while not buf.endswith((":~# ", ":~$ ", "~]$ ", "~]# ")): + buf = u"" + while not buf.endswith((u":~# ", u":~$ ", u"~]$ ", u"~]# ")): try: chunk = chan.recv(self.__MAX_RECV_BUF) if not chunk: break buf += chunk if chan.exit_status_ready(): - logger.error('Channel exit status ready') + logger.error(u"Channel exit status ready") break except socket.timeout as exc: - raise_from(Exception('Socket timeout: {0}'.format(buf)), exc) + raise Exception(f"Socket timeout: {buf}") from exc return chan def interactive_terminal_exec_command(self, chan, cmd, prompt): @@ -321,8 +331,8 @@ class SSH(object): from other threads. You must not use this in a program that uses SIGALRM itself (this includes certain profilers) """ - chan.sendall('{c}\n'.format(c=cmd)) - buf = '' + chan.sendall(f"{cmd}\n") + buf = u"" while not buf.endswith(prompt): try: chunk = chan.recv(self.__MAX_RECV_BUF) @@ -330,15 +340,16 @@ class SSH(object): break buf += chunk if chan.exit_status_ready(): - logger.error('Channel exit status ready') + logger.error(u"Channel exit status ready") break except socket.timeout as exc: - raise_from(Exception( - 'Socket timeout during execution of command: ' - '{0}\nBuffer content:\n{1}'.format(cmd, buf)), exc) - tmp = buf.replace(cmd.replace('\n', ''), '') + raise Exception( + f"Socket timeout during execution of command: {cmd}\n" + f"Buffer content:\n{buf}" + ) from exc + tmp = buf.replace(cmd.replace(u"\n", u""), u"") for item in prompt: - tmp.replace(item, '') + tmp.replace(item, u"") return tmp @staticmethod @@ -349,7 +360,8 @@ class SSH(object): """ chan.close() - def scp(self, local_path, remote_path, get=False, timeout=30, + def scp( + self, local_path, remote_path, get=False, timeout=30, wildcard=False): """Copy files from local_path to remote_path or vice versa. @@ -369,19 +381,23 @@ class SSH(object): :type wildcard: bool """ if not get: - logger.trace('SCP {0} to {1}:{2}'.format( - local_path, self._ssh.get_transport().getpeername(), - remote_path)) + logger.trace( + f"SCP {local_path} to " + f"{self._ssh.get_transport().getpeername()}:{remote_path}" + ) else: - logger.trace('SCP {0}:{1} to {2}'.format( - self._ssh.get_transport().getpeername(), remote_path, - local_path)) + logger.trace( + f"SCP {self._ssh.get_transport().getpeername()}:{remote_path} " + f"to {local_path}" + ) # SCPCLient takes a paramiko transport as its only argument if not wildcard: scp = SCPClient(self._ssh.get_transport(), socket_timeout=timeout) else: - scp = SCPClient(self._ssh.get_transport(), sanitize=lambda x: x, - socket_timeout=timeout) + scp = SCPClient( + self._ssh.get_transport(), sanitize=lambda x: x, + socket_timeout=timeout + ) start = time() if not get: scp.put(local_path, remote_path) @@ -389,7 +405,7 @@ class SSH(object): scp.get(remote_path, local_path) scp.close() end = time() - logger.trace('SCP took {0} seconds'.format(end-start)) + logger.trace(f"SCP took {end-start} seconds") def exec_cmd(node, cmd, timeout=600, sudo=False, disconnect=False): @@ -411,50 +427,27 @@ def exec_cmd(node, cmd, timeout=600, sudo=False, disconnect=False): :rtype: tuple(int, str, str) """ if node is None: - raise TypeError('Node parameter is None') + raise TypeError(u"Node parameter is None") if cmd is None: - raise TypeError('Command parameter is None') + raise TypeError(u"Command parameter is None") if not cmd: - raise ValueError('Empty command parameter') + raise ValueError(u"Empty command parameter") ssh = SSH() - if node.get('host_port') is not None: - ssh_node = dict() - ssh_node['host'] = '127.0.0.1' - ssh_node['port'] = node['port'] - ssh_node['username'] = node['username'] - ssh_node['password'] = node['password'] - import pexpect - options = '-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null' - tnl = '-L {port}:127.0.0.1:{port}'.format(port=node['port']) - ssh_cmd = 'ssh {tnl} {op} {user}@{host} -p {host_port}'.\ - format(tnl=tnl, op=options, user=node['host_username'], - host=node['host'], host_port=node['host_port']) - logger.trace('Initializing local port forwarding:\n{ssh_cmd}'. - format(ssh_cmd=ssh_cmd)) - child = pexpect.spawn(ssh_cmd) - child.expect('.* password: ') - logger.trace(child.after) - child.sendline(node['host_password']) - child.expect('Welcome .*') - logger.trace(child.after) - logger.trace('Local port forwarding finished.') - else: - ssh_node = node - try: - ssh.connect(ssh_node) + ssh.connect(node) except SSHException as err: - logger.error("Failed to connect to node" + repr(err)) + logger.error(f"Failed to connect to node {node[u'host']}\n{err!r}") return None, None, None try: if not sudo: - (ret_code, stdout, stderr) = ssh.exec_command(cmd, timeout=timeout) + ret_code, stdout, stderr = ssh.exec_command(cmd, timeout=timeout) else: - (ret_code, stdout, stderr) = ssh.exec_command_sudo( - cmd, timeout=timeout) + ret_code, stdout, stderr = ssh.exec_command_sudo( + cmd, timeout=timeout + ) except SSHException as err: logger.error(repr(err)) return None, None, None @@ -497,19 +490,16 @@ def exec_cmd_no_error( """ for _ in range(retries + 1): ret_code, stdout, stderr = exec_cmd( - node, cmd, timeout=timeout, sudo=sudo, disconnect=disconnect) + node, cmd, timeout=timeout, sudo=sudo, disconnect=disconnect + ) if ret_code == 0: break sleep(1) else: - msg = 'Command execution failed: "{cmd}"\nRC: {rc}\n{stderr}'.format( - cmd=cmd, rc=ret_code, stderr=stderr) + msg = f"Command execution failed: '{cmd}'\nRC: {ret_code}\n{stderr}" logger.info(msg) if message: - if include_reason: - msg = message + '\n' + msg - else: - msg = message + msg = f"{message}\n{msg}" if include_reason else message raise RuntimeError(msg) return stdout, stderr @@ -540,13 +530,11 @@ def scp_node( try: ssh.connect(node) except SSHException as exc: - raise_from(RuntimeError( - 'Failed to connect to {host}!'.format(host=node['host'])), exc) + raise RuntimeError(f"Failed to connect to {node[u'host']}!") from exc try: ssh.scp(local_path, remote_path, get, timeout) except SCPException as exc: - raise_from(RuntimeError( - 'SCP execution failed on {host}!'.format(host=node['host'])), exc) + raise RuntimeError(f"SCP execution failed on {node[u'host']}!") from exc finally: if disconnect: ssh.disconnect()