X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=resources%2Ftools%2Fpresentation%2Futils.py;h=f32019dc2e9256b4a495a9b2e4e6eadc35ea1fa2;hb=6f5de201aadfbb31419c05dfae6495107a745899;hp=59dbfa634bb4532be404adfae669a77416bb02f9;hpb=5646509aea6b43ef1efb282aad908289cc005b26;p=csit.git diff --git a/resources/tools/presentation/utils.py b/resources/tools/presentation/utils.py index 59dbfa634b..f32019dc2e 100644 --- a/resources/tools/presentation/utils.py +++ b/resources/tools/presentation/utils.py @@ -14,6 +14,7 @@ """General purpose utilities. """ +import multiprocessing import subprocess import numpy as np import pandas as pd @@ -198,8 +199,10 @@ def execute_command(cmd): stdout, stderr = proc.communicate() - logging.debug(stdout) - logging.debug(stderr) + if stdout: + logging.info(stdout) + if stderr: + logging.info(stderr) if proc.returncode != 0: logging.error(" Command execution failed.") @@ -269,3 +272,43 @@ def archive_input_data(spec): str(err)) logging.info(" Done.") + + +class Worker(multiprocessing.Process): + """Worker class used to process tasks in separate parallel processes. + """ + + def __init__(self, work_queue, data_queue, func): + """Initialization. + + :param work_queue: Queue with items to process. + :param data_queue: Shared memory between processes. Queue which keeps + the result data. This data is then read by the main process and used + in further processing. + :param func: Function which is executed by the worker. + :type work_queue: multiprocessing.JoinableQueue + :type data_queue: multiprocessing.Manager().Queue() + :type func: Callable object + """ + super(Worker, self).__init__() + self._work_queue = work_queue + self._data_queue = data_queue + self._func = func + + def run(self): + """Method representing the process's activity. + """ + + while True: + try: + self.process(self._work_queue.get()) + finally: + self._work_queue.task_done() + + def process(self, item_to_process): + """Method executed by the runner. + + :param item_to_process: Data to be processed by the function. + :type item_to_process: tuple + """ + self._func(self.pid, self._data_queue, *item_to_process)