X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Flibraries%2Fpython%2FSetupFramework.py;h=a1e4e7a679e68b58ee21b6c24eb41134e645e2b7;hp=9b50b90bc5aa31ab92ba423438990a5c29af1251;hb=cd635521219e7d7988ccfd9d0a173ba07217cd00;hpb=da799981f5373b09398319df12e77e2efc75caa6 diff --git a/resources/libraries/python/SetupFramework.py b/resources/libraries/python/SetupFramework.py index 9b50b90bc5..a1e4e7a679 100644 --- a/resources/libraries/python/SetupFramework.py +++ b/resources/libraries/python/SetupFramework.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 Cisco and/or its affiliates. +# Copyright (c) 2019 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -16,18 +16,16 @@ nodes. All tasks required to be run before the actual tests are started is supposed to end up here. """ -from shlex import split -from subprocess import Popen, PIPE, call -from multiprocessing import Pool -from tempfile import NamedTemporaryFile +from os import environ, remove from os.path import basename -from os import environ +from tempfile import NamedTemporaryFile +import threading from robot.api import logger -from robot.libraries.BuiltIn import BuiltIn -from resources.libraries.python.ssh import SSH from resources.libraries.python.Constants import Constants as con +from resources.libraries.python.ssh import exec_cmd_no_error, scp_node +from resources.libraries.python.LocalExecution import run from resources.libraries.python.topology import NodeType __all__ = ["SetupFramework"] @@ -54,18 +52,9 @@ def pack_framework_dir(): file_name = tmpfile.name tmpfile.close() - proc = Popen( - split("tar --sparse --exclude-vcs --exclude=output*.xml " - "--exclude=./tmp -zcf {0} ." - .format(file_name)), stdout=PIPE, stderr=PIPE) - (stdout, stderr) = proc.communicate() - - logger.debug(stdout) - logger.debug(stderr) - - return_code = proc.wait() - if return_code != 0: - raise RuntimeError("Could not pack testing framework.") + run(["tar", "--sparse", "--exclude-vcs", "--exclude=output*.xml", + "--exclude=./tmp", "-zcf", file_name, "."], + check=True, msg="Could not pack testing framework") return file_name @@ -79,12 +68,10 @@ def copy_tarball_to_node(tarball, node): :type node: dict :returns: nothing """ - logger.console('Copying tarball to {0}'.format(node['host'])) - ssh = SSH() - ssh.connect(node) - - ssh.scp(tarball, "/tmp/") - logger.console('Copying tarball to {0} done'.format(node['host'])) + host = node['host'] + logger.console('Copying tarball to {0} starts.'.format(host)) + scp_node(node, tarball, "/tmp/") + logger.console('Copying tarball to {0} done.'.format(host)) def extract_tarball_at_node(tarball, node): @@ -99,18 +86,16 @@ def extract_tarball_at_node(tarball, node): :returns: nothing :raises RuntimeError: When failed to unpack tarball. """ - logger.console('Extracting tarball to {0} on {1}' - .format(con.REMOTE_FW_DIR, node['host'])) - ssh = SSH() - ssh.connect(node) - (ret_code, _, _) = ssh.exec_command( - 'sudo rm -rf {1}; mkdir {1} ; tar -zxf {0} -C {1}; rm -f {0}' - .format(tarball, con.REMOTE_FW_DIR), timeout=30) - if ret_code != 0: - raise RuntimeError('Failed to extract {0} at node {1}' - .format(tarball, node['host'])) - logger.console('Extracting tarball to {0} on {1} done' - .format(con.REMOTE_FW_DIR, node['host'])) + host = node['host'] + logger.console('Extracting tarball to {0} on {1} starts.' + .format(con.REMOTE_FW_DIR, host)) + exec_cmd_no_error( + node, "sudo rm -rf {1}; mkdir {1}; tar -zxf {0} -C {1};" + " rm -f {0}".format(tarball, con.REMOTE_FW_DIR), + message='Failed to extract {0} at node {1}'.format(tarball, host), + timeout=30, include_reason=True) + logger.console('Extracting tarball to {0} on {1} done.' + .format(con.REMOTE_FW_DIR, host)) def create_env_directory_at_node(node): @@ -121,57 +106,59 @@ def create_env_directory_at_node(node): :returns: nothing :raises RuntimeError: When failed to setup virtualenv. """ - logger.console('Virtualenv setup including requirements.txt on {0}' - .format(node['host'])) - ssh = SSH() - ssh.connect(node) - (ret_code, _, _) = ssh.exec_command( - 'cd {0} && rm -rf env && ' - 'virtualenv --system-site-packages --never-download env && ' - '. env/bin/activate && ' - 'pip install -r requirements.txt' - .format(con.REMOTE_FW_DIR), timeout=100) - if ret_code != 0: - raise RuntimeError('Virtualenv setup including requirements.txt on {0}' - .format(node['host'])) - - logger.console('Virtualenv on {0} created'.format(node['host'])) - - -def setup_node(args): - """Run all set-up methods for a node. - - This method is used as map_async parameter. It receives tuple with all - parameters as passed to map_async function. - - :param args: All parameters needed to setup one node. - :type args: tuple + host = node['host'] + logger.console('Virtualenv setup including requirements.txt on {0} starts.' + .format(host)) + exec_cmd_no_error( + node, 'cd {0} && rm -rf env' + ' && virtualenv --system-site-packages --never-download env' + ' && source env/bin/activate && pip install -r requirements.txt' + .format(con.REMOTE_FW_DIR), timeout=100, include_reason=True, + message="Failed install at node {host}".format(host=host)) + logger.console('Virtualenv setup on {0} done.'.format(host)) + + +def setup_node(node, tarball, remote_tarball, results=None): + """Copy a tarball to a node and extract it. + + :param node: A node where the tarball will be copied and extracted. + :param tarball: Local path of tarball to be copied. + :param remote_tarball: Remote path of the tarball. + :param results: A list where to store the result of node setup, optional. + :type node: dict + :type tarball: str + :type remote_tarball: str + :type results: list :returns: True - success, False - error :rtype: bool """ - tarball, remote_tarball, node = args + host = node['host'] try: copy_tarball_to_node(tarball, node) extract_tarball_at_node(remote_tarball, node) if node['type'] == NodeType.TG: create_env_directory_at_node(node) except RuntimeError as exc: - logger.error("Node {0} setup failed, error:'{1}'" - .format(node['host'], exc.message)) - return False + logger.console("Node {node} setup failed, error: {err!r}".format( + node=host, err=exc)) + result = False else: - logger.console('Setup of node {0} done'.format(node['host'])) - return True + logger.console('Setup of node {0} done.'.format(host)) + result = True + + if isinstance(results, list): + results.append(result) + return result def delete_local_tarball(tarball): """Delete local tarball to prevent disk pollution. - :param tarball: Path to tarball to upload. + :param tarball: Path of local tarball to delete. :type tarball: str :returns: nothing """ - call(split('sh -c "rm {0} > /dev/null 2>&1"'.format(tarball))) + remove(tarball) def delete_framework_dir(node): @@ -180,37 +167,40 @@ def delete_framework_dir(node): :param node: Node to delete framework directory on. :type node: dict """ - logger.console('Deleting framework directory on {0}' - .format(node['host'])) - ssh = SSH() - ssh.connect(node) - (ret_code, _, _) = ssh.exec_command( - 'sudo rm -rf {0}' - .format(con.REMOTE_FW_DIR), timeout=100) - if ret_code != 0: - raise RuntimeError('Deleting framework directory on {0} failed' - .format(node)) - - -def cleanup_node(node): - """Run all clean-up methods for a node. - - This method is used as map_async parameter. It receives tuple with all - parameters as passed to map_async function. - - :param node: Node to do cleanup on. + host = node['host'] + logger.console( + 'Deleting framework directory on {0} starts.'.format(host)) + exec_cmd_no_error( + node, 'sudo rm -rf {0}'.format(con.REMOTE_FW_DIR), + message="Framework delete failed at node {host}".format(host=host), + timeout=100, include_reason=True) + logger.console( + 'Deleting framework directory on {0} done.'.format(host)) + + +def cleanup_node(node, results=None): + """Delete a tarball from a node. + + :param node: A node where the tarball will be delete. + :param results: A list where to store the result of node cleanup, optional. :type node: dict + :type results: list :returns: True - success, False - error :rtype: bool """ + host = node['host'] try: delete_framework_dir(node) except RuntimeError: - logger.error("Cleanup of node {0} failed".format(node['host'])) - return False + logger.error("Cleanup of node {0} failed.".format(host)) + result = False else: - logger.console('Cleanup of node {0} done'.format(node['host'])) - return True + logger.console('Cleanup of node {0} done.'.format(host)) + result = True + + if isinstance(results, list): + results.append(result) + return result class SetupFramework(object): @@ -236,30 +226,28 @@ class SetupFramework(object): logger.trace(msg) remote_tarball = "/tmp/{0}".format(basename(tarball)) - # Turn off logging since we use multiprocessing - log_level = BuiltIn().set_log_level('NONE') - params = ((tarball, remote_tarball, node) for node in nodes.values()) - pool = Pool(processes=len(nodes)) - result = pool.map_async(setup_node, params) - pool.close() - pool.join() + results = [] + threads = [] - # Turn on logging - BuiltIn().set_log_level(log_level) + for node in nodes.values(): + args = node, tarball, remote_tarball, results + thread = threading.Thread(target=setup_node, args=args) + thread.start() + threads.append(thread) logger.info( - 'Executing node setups in parallel, waiting for processes to end') - result.wait() + 'Executing node setups in parallel, waiting for threads to end') + + for thread in threads: + thread.join() - results = result.get() - node_success = all(results) logger.info('Results: {0}'.format(results)) delete_local_tarball(tarball) - if node_success: - logger.console('All nodes are ready') + if all(results): + logger.console('All nodes are ready.') else: - raise RuntimeError('Failed to setup framework') + raise RuntimeError('Failed to setup framework.') class CleanupFramework(object): @@ -267,32 +255,31 @@ class CleanupFramework(object): @staticmethod def cleanup_framework(nodes): - """Perform cleaning on each node. + """Perform cleanup on each node. :param nodes: Topology nodes. :type nodes: dict :raises RuntimeError: If cleanup framework failed. """ - # Turn off logging since we use multiprocessing - log_level = BuiltIn().set_log_level('NONE') - params = (node for node in nodes.values()) - pool = Pool(processes=len(nodes)) - result = pool.map_async(cleanup_node, params) - pool.close() - pool.join() - # Turn on logging - BuiltIn().set_log_level(log_level) + results = [] + threads = [] + + for node in nodes.values(): + thread = threading.Thread(target=cleanup_node, + args=(node, results)) + thread.start() + threads.append(thread) logger.info( - 'Executing node cleanups in parallel, waiting for processes to end') - result.wait() + 'Executing node cleanups in parallel, waiting for threads to end.') + + for thread in threads: + thread.join() - results = result.get() - node_success = all(results) logger.info('Results: {0}'.format(results)) - if node_success: - logger.console('All nodes cleaned up') + if all(results): + logger.console('All nodes cleaned up.') else: - raise RuntimeError('Failed to cleaned up framework') + raise RuntimeError('Failed to cleaned up framework.')