Use threads for fw setup and cleanup 10/19810/6
authorjuraj.linkes <juraj.linkes@pantheon.tech>
Fri, 24 May 2019 08:58:52 +0000 (10:58 +0200)
committerPeter Mikus <pmikus@cisco.com>
Tue, 28 May 2019 13:59:12 +0000 (13:59 +0000)
Using multiprocessing sometimes causes scp to hang when copying files.
Threading is also more lightweight.

Change-Id: I047b4835bbf1584c80469b27af5394d89087e8a9
Signed-off-by: juraj.linkes <juraj.linkes@pantheon.tech>
resources/libraries/python/SetupFramework.py

index 9b50b90..c9207d9 100644 (file)
@@ -16,15 +16,15 @@ nodes. All tasks required to be run before the actual tests are started is
 supposed to end up here.
 """
 
+import datetime
+from os import environ, remove
+from os.path import basename
 from shlex import split
-from subprocess import Popen, PIPE, call
-from multiprocessing import Pool
+from subprocess import Popen, PIPE
 from tempfile import NamedTemporaryFile
-from os.path import basename
-from os import environ
+import threading
 
 from robot.api import logger
-from robot.libraries.BuiltIn import BuiltIn
 
 from resources.libraries.python.ssh import SSH
 from resources.libraries.python.Constants import Constants as con
@@ -138,18 +138,20 @@ def create_env_directory_at_node(node):
     logger.console('Virtualenv on {0} created'.format(node['host']))
 
 
-def setup_node(args):
-    """Run all set-up methods for a node.
-
-    This method is used as map_async parameter. It receives tuple with all
-    parameters as passed to map_async function.
+def setup_node(node, tarball, remote_tarball, results=None):
+    """Copy a tarball to a node and extract it.
 
-    :param args: All parameters needed to setup one node.
-    :type args: tuple
+    :param node: A node where the tarball will be copied and extracted.
+    :param tarball: Local path of tarball to be copied.
+    :param remote_tarball: Remote path of the tarball.
+    :param results: A list where to store the result of node setup, optional.
+    :type node: dict
+    :type tarball: str
+    :type remote_tarball: str
+    :type results: list
     :returns: True - success, False - error
     :rtype: bool
     """
-    tarball, remote_tarball, node = args
     try:
         copy_tarball_to_node(tarball, node)
         extract_tarball_at_node(remote_tarball, node)
@@ -158,20 +160,24 @@ def setup_node(args):
     except RuntimeError as exc:
         logger.error("Node {0} setup failed, error:'{1}'"
                      .format(node['host'], exc.message))
-        return False
+        result = False
     else:
         logger.console('Setup of node {0} done'.format(node['host']))
-        return True
+        result = True
+
+    if isinstance(results, list):
+        results.append(result)
+    return result
 
 
 def delete_local_tarball(tarball):
     """Delete local tarball to prevent disk pollution.
 
-    :param tarball: Path to tarball to upload.
+    :param tarball: Path of local tarball to delete.
     :type tarball: str
     :returns: nothing
     """
-    call(split('sh -c "rm {0} > /dev/null 2>&1"'.format(tarball)))
+    remove(tarball)
 
 
 def delete_framework_dir(node):
@@ -192,14 +198,13 @@ def delete_framework_dir(node):
                            .format(node))
 
 
-def cleanup_node(node):
-    """Run all clean-up methods for a node.
-
-    This method is used as map_async parameter. It receives tuple with all
-    parameters as passed to map_async function.
+def cleanup_node(node, results=None):
+    """Delete a tarball from a node.
 
-    :param node: Node to do cleanup on.
+    :param node: A node where the tarball will be delete.
+    :param results: A list where to store the result of node cleanup, optional.
     :type node: dict
+    :type results: list
     :returns: True - success, False - error
     :rtype: bool
     """
@@ -207,10 +212,14 @@ def cleanup_node(node):
         delete_framework_dir(node)
     except RuntimeError:
         logger.error("Cleanup of node {0} failed".format(node['host']))
-        return False
+        result = False
     else:
         logger.console('Cleanup of node {0} done'.format(node['host']))
-        return True
+        result = True
+
+    if isinstance(results, list):
+        results.append(result)
+    return result
 
 
 class SetupFramework(object):
@@ -236,27 +245,27 @@ class SetupFramework(object):
         logger.trace(msg)
         remote_tarball = "/tmp/{0}".format(basename(tarball))
 
-        # Turn off logging since we use multiprocessing
-        log_level = BuiltIn().set_log_level('NONE')
-        params = ((tarball, remote_tarball, node) for node in nodes.values())
-        pool = Pool(processes=len(nodes))
-        result = pool.map_async(setup_node, params)
-        pool.close()
-        pool.join()
+        results = []
+        threads = []
 
-        # Turn on logging
-        BuiltIn().set_log_level(log_level)
+        for node in nodes.values():
+            thread = threading.Thread(target=setup_node, args=(tarball,
+                                                               remote_tarball,
+                                                               node,
+                                                               results))
+            thread.start()
+            threads.append(thread)
 
         logger.info(
-            'Executing node setups in parallel, waiting for processes to end')
-        result.wait()
+            'Executing node setups in parallel, waiting for threads to end')
+
+        for thread in threads:
+            thread.join()
 
-        results = result.get()
-        node_success = all(results)
         logger.info('Results: {0}'.format(results))
 
         delete_local_tarball(tarball)
-        if node_success:
+        if all(results):
             logger.console('All nodes are ready')
         else:
             raise RuntimeError('Failed to setup framework')
@@ -267,32 +276,31 @@ class CleanupFramework(object):
 
     @staticmethod
     def cleanup_framework(nodes):
-        """Perform cleaning on each node.
+        """Perform cleanup on each node.
 
         :param nodes: Topology nodes.
         :type nodes: dict
         :raises RuntimeError: If cleanup framework failed.
         """
-        # Turn off logging since we use multiprocessing
-        log_level = BuiltIn().set_log_level('NONE')
-        params = (node for node in nodes.values())
-        pool = Pool(processes=len(nodes))
-        result = pool.map_async(cleanup_node, params)
-        pool.close()
-        pool.join()
 
-        # Turn on logging
-        BuiltIn().set_log_level(log_level)
+        results = []
+        threads = []
+
+        for node in nodes.values():
+            thread = threading.Thread(target=cleanup_node,
+                                      args=(node, results))
+            thread.start()
+            threads.append(thread)
 
         logger.info(
-            'Executing node cleanups in parallel, waiting for processes to end')
-        result.wait()
+            'Executing node cleanups in parallel, waiting for threads to end')
+
+        for thread in threads:
+            thread.join()
 
-        results = result.get()
-        node_success = all(results)
         logger.info('Results: {0}'.format(results))
 
-        if node_success:
+        if all(results):
             logger.console('All nodes cleaned up')
         else:
             raise RuntimeError('Failed to cleaned up framework')

©2016 FD.io a Linux Foundation Collaborative Project. All Rights Reserved.
Linux Foundation is a registered trademark of The Linux Foundation. Linux is a registered trademark of Linus Torvalds.
Please see our privacy policy and terms of use.