X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Ftools%2Fpresentation%2Futils.py;fp=resources%2Ftools%2Fpresentation%2Futils.py;h=f32019dc2e9256b4a495a9b2e4e6eadc35ea1fa2;hp=ab86bafdd7c4574046010f02571576afc07f86f3;hb=6f5de201aadfbb31419c05dfae6495107a745899;hpb=4324c8b7dd1fe1ba8296168f4c4dd43a8c3e6cc0 diff --git a/resources/tools/presentation/utils.py b/resources/tools/presentation/utils.py index ab86bafdd7..f32019dc2e 100644 --- a/resources/tools/presentation/utils.py +++ b/resources/tools/presentation/utils.py @@ -14,6 +14,7 @@ """General purpose utilities. """ +import multiprocessing import subprocess import numpy as np import pandas as pd @@ -271,3 +272,43 @@ def archive_input_data(spec): str(err)) logging.info(" Done.") + + +class Worker(multiprocessing.Process): + """Worker class used to process tasks in separate parallel processes. + """ + + def __init__(self, work_queue, data_queue, func): + """Initialization. + + :param work_queue: Queue with items to process. + :param data_queue: Shared memory between processes. Queue which keeps + the result data. This data is then read by the main process and used + in further processing. + :param func: Function which is executed by the worker. + :type work_queue: multiprocessing.JoinableQueue + :type data_queue: multiprocessing.Manager().Queue() + :type func: Callable object + """ + super(Worker, self).__init__() + self._work_queue = work_queue + self._data_queue = data_queue + self._func = func + + def run(self): + """Method representing the process's activity. + """ + + while True: + try: + self.process(self._work_queue.get()) + finally: + self._work_queue.task_done() + + def process(self, item_to_process): + """Method executed by the runner. + + :param item_to_process: Data to be processed by the function. + :type item_to_process: tuple + """ + self._func(self.pid, self._data_queue, *item_to_process)