CSIT-1104: Trending: Speed-up plots generation
[csit.git] / resources / tools / presentation / utils.py
index 95faffd..f32019d 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2017 Cisco and/or its affiliates.
+# Copyright (c) 2018 Cisco and/or its affiliates.
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
@@ -14,6 +14,7 @@
 """General purpose utilities.
 """
 
+import multiprocessing
 import subprocess
 import numpy as np
 import pandas as pd
@@ -198,8 +199,10 @@ def execute_command(cmd):
 
     stdout, stderr = proc.communicate()
 
-    logging.info(stdout)
-    logging.info(stderr)
+    if stdout:
+        logging.info(stdout)
+    if stderr:
+        logging.info(stderr)
 
     if proc.returncode != 0:
         logging.error("    Command execution failed.")
@@ -250,10 +253,7 @@ def archive_input_data(spec):
 
     logging.info("    Archiving the input data files ...")
 
-    if spec.is_debug:
-        extension = spec.debug["input-format"]
-    else:
-        extension = spec.input["file-format"]
+    extension = spec.input["file-format"]
     data_files = get_files(spec.environment["paths"]["DIR[WORKING,DATA]"],
                            extension=extension)
     dst = spec.environment["paths"]["DIR[STATIC,ARCH]"]
@@ -272,3 +272,43 @@ def archive_input_data(spec):
                                 str(err))
 
     logging.info("    Done.")
+
+
+class Worker(multiprocessing.Process):
+    """Worker class used to process tasks in separate parallel processes.
+    """
+
+    def __init__(self, work_queue, data_queue, func):
+        """Initialization.
+
+        :param work_queue: Queue with items to process.
+        :param data_queue: Shared memory between processes. Queue which keeps
+            the result data. This data is then read by the main process and used
+            in further processing.
+        :param func: Function which is executed by the worker.
+        :type work_queue: multiprocessing.JoinableQueue
+        :type data_queue: multiprocessing.Manager().Queue()
+        :type func: Callable object
+        """
+        super(Worker, self).__init__()
+        self._work_queue = work_queue
+        self._data_queue = data_queue
+        self._func = func
+
+    def run(self):
+        """Method representing the process's activity.
+        """
+
+        while True:
+            try:
+                self.process(self._work_queue.get())
+            finally:
+                self._work_queue.task_done()
+
+    def process(self, item_to_process):
+        """Method executed by the runner.
+
+        :param item_to_process: Data to be processed by the function.
+        :type item_to_process: tuple
+        """
+        self._func(self.pid, self._data_queue, *item_to_process)