From cf64b95272442e47a2d0fe7eed89b4e99c1f6bb5 Mon Sep 17 00:00:00 2001 From: "juraj.linkes" Date: Fri, 24 May 2019 10:58:52 +0200 Subject: [PATCH] Use threads for fw setup and cleanup Using multiprocessing sometimes causes scp to hang when copying files. Threading is also more lightweight. Change-Id: I047b4835bbf1584c80469b27af5394d89087e8a9 Signed-off-by: juraj.linkes --- resources/libraries/python/SetupFramework.py | 116 ++++++++++++++------------- 1 file changed, 62 insertions(+), 54 deletions(-) diff --git a/resources/libraries/python/SetupFramework.py b/resources/libraries/python/SetupFramework.py index 9b50b90bc5..c9207d9658 100644 --- a/resources/libraries/python/SetupFramework.py +++ b/resources/libraries/python/SetupFramework.py @@ -16,15 +16,15 @@ nodes. All tasks required to be run before the actual tests are started is supposed to end up here. """ +import datetime +from os import environ, remove +from os.path import basename from shlex import split -from subprocess import Popen, PIPE, call -from multiprocessing import Pool +from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile -from os.path import basename -from os import environ +import threading from robot.api import logger -from robot.libraries.BuiltIn import BuiltIn from resources.libraries.python.ssh import SSH from resources.libraries.python.Constants import Constants as con @@ -138,18 +138,20 @@ def create_env_directory_at_node(node): logger.console('Virtualenv on {0} created'.format(node['host'])) -def setup_node(args): - """Run all set-up methods for a node. - - This method is used as map_async parameter. It receives tuple with all - parameters as passed to map_async function. +def setup_node(node, tarball, remote_tarball, results=None): + """Copy a tarball to a node and extract it. - :param args: All parameters needed to setup one node. - :type args: tuple + :param node: A node where the tarball will be copied and extracted. + :param tarball: Local path of tarball to be copied. + :param remote_tarball: Remote path of the tarball. + :param results: A list where to store the result of node setup, optional. + :type node: dict + :type tarball: str + :type remote_tarball: str + :type results: list :returns: True - success, False - error :rtype: bool """ - tarball, remote_tarball, node = args try: copy_tarball_to_node(tarball, node) extract_tarball_at_node(remote_tarball, node) @@ -158,20 +160,24 @@ def setup_node(args): except RuntimeError as exc: logger.error("Node {0} setup failed, error:'{1}'" .format(node['host'], exc.message)) - return False + result = False else: logger.console('Setup of node {0} done'.format(node['host'])) - return True + result = True + + if isinstance(results, list): + results.append(result) + return result def delete_local_tarball(tarball): """Delete local tarball to prevent disk pollution. - :param tarball: Path to tarball to upload. + :param tarball: Path of local tarball to delete. :type tarball: str :returns: nothing """ - call(split('sh -c "rm {0} > /dev/null 2>&1"'.format(tarball))) + remove(tarball) def delete_framework_dir(node): @@ -192,14 +198,13 @@ def delete_framework_dir(node): .format(node)) -def cleanup_node(node): - """Run all clean-up methods for a node. - - This method is used as map_async parameter. It receives tuple with all - parameters as passed to map_async function. +def cleanup_node(node, results=None): + """Delete a tarball from a node. - :param node: Node to do cleanup on. + :param node: A node where the tarball will be delete. + :param results: A list where to store the result of node cleanup, optional. :type node: dict + :type results: list :returns: True - success, False - error :rtype: bool """ @@ -207,10 +212,14 @@ def cleanup_node(node): delete_framework_dir(node) except RuntimeError: logger.error("Cleanup of node {0} failed".format(node['host'])) - return False + result = False else: logger.console('Cleanup of node {0} done'.format(node['host'])) - return True + result = True + + if isinstance(results, list): + results.append(result) + return result class SetupFramework(object): @@ -236,27 +245,27 @@ class SetupFramework(object): logger.trace(msg) remote_tarball = "/tmp/{0}".format(basename(tarball)) - # Turn off logging since we use multiprocessing - log_level = BuiltIn().set_log_level('NONE') - params = ((tarball, remote_tarball, node) for node in nodes.values()) - pool = Pool(processes=len(nodes)) - result = pool.map_async(setup_node, params) - pool.close() - pool.join() + results = [] + threads = [] - # Turn on logging - BuiltIn().set_log_level(log_level) + for node in nodes.values(): + thread = threading.Thread(target=setup_node, args=(tarball, + remote_tarball, + node, + results)) + thread.start() + threads.append(thread) logger.info( - 'Executing node setups in parallel, waiting for processes to end') - result.wait() + 'Executing node setups in parallel, waiting for threads to end') + + for thread in threads: + thread.join() - results = result.get() - node_success = all(results) logger.info('Results: {0}'.format(results)) delete_local_tarball(tarball) - if node_success: + if all(results): logger.console('All nodes are ready') else: raise RuntimeError('Failed to setup framework') @@ -267,32 +276,31 @@ class CleanupFramework(object): @staticmethod def cleanup_framework(nodes): - """Perform cleaning on each node. + """Perform cleanup on each node. :param nodes: Topology nodes. :type nodes: dict :raises RuntimeError: If cleanup framework failed. """ - # Turn off logging since we use multiprocessing - log_level = BuiltIn().set_log_level('NONE') - params = (node for node in nodes.values()) - pool = Pool(processes=len(nodes)) - result = pool.map_async(cleanup_node, params) - pool.close() - pool.join() - # Turn on logging - BuiltIn().set_log_level(log_level) + results = [] + threads = [] + + for node in nodes.values(): + thread = threading.Thread(target=cleanup_node, + args=(node, results)) + thread.start() + threads.append(thread) logger.info( - 'Executing node cleanups in parallel, waiting for processes to end') - result.wait() + 'Executing node cleanups in parallel, waiting for threads to end') + + for thread in threads: + thread.join() - results = result.get() - node_success = all(results) logger.info('Results: {0}'.format(results)) - if node_success: + if all(results): logger.console('All nodes cleaned up') else: raise RuntimeError('Failed to cleaned up framework') -- 2.16.6