CSIT-1104: Trending: Speed-up plots generation
[csit.git] / resources / tools / presentation / input_data_parser.py
index 9428b2c..beec34c 100644 (file)
@@ -31,6 +31,7 @@ from string import replace
 from os import remove
 
 from input_data_files import download_and_unzip_data_file
+from utils import Worker
 
 
 class ExecutionChecker(ResultVisitor):
@@ -863,12 +864,10 @@ class InputData(object):
         logging.info("Downloading and parsing input files ...")
 
         work_queue = multiprocessing.JoinableQueue()
-
         manager = multiprocessing.Manager()
-
         data_queue = manager.Queue()
-
         cpus = multiprocessing.cpu_count()
+
         workers = list()
         for cpu in range(cpus):
             worker = Worker(work_queue,
@@ -1008,9 +1007,6 @@ class InputData(object):
         :rtype pandas.Series
         """
 
-        logging.info("    Creating the data set for the {0} '{1}'.".
-                     format(element.get("type", ""), element.get("title", "")))
-
         try:
             if element["filter"] in ("all", "template"):
                 cond = "True"
@@ -1095,44 +1091,3 @@ class InputData(object):
                     merged_data[ID] = item_data
 
         return merged_data
-
-
-class Worker(multiprocessing.Process):
-    """Worker class used to download and process input files in separate
-    parallel processes.
-    """
-
-    def __init__(self, work_queue, data_queue, func):
-        """Initialization.
-
-        :param work_queue: Queue with items to process.
-        :param data_queue: Shared memory between processes. Queue which keeps
-            the result data. This data is then read by the main process and used
-            in further processing.
-        :param func: Function which is executed by the worker.
-        :type work_queue: multiprocessing.JoinableQueue
-        :type data_queue: multiprocessing.Manager().Queue()
-        :type func: Callable object
-        """
-        super(Worker, self).__init__()
-        self._work_queue = work_queue
-        self._data_queue = data_queue
-        self._func = func
-
-    def run(self):
-        """Method representing the process's activity.
-        """
-
-        while True:
-            try:
-                self.process(self._work_queue.get())
-            finally:
-                self._work_queue.task_done()
-
-    def process(self, item_to_process):
-        """Method executed by the runner.
-
-        :param item_to_process: Data to be processed by the function.
-        :type item_to_process: tuple
-        """
-        self._func(self.pid, self._data_queue, *item_to_process)