X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Ftools%2Fpresentation%2Finput_data_parser.py;h=beec34c1067e2006327fec139597b4bfb70e0b0c;hp=9428b2cdc9695fe66b8a171d43a242657b017ccb;hb=6f5de201aadfbb31419c05dfae6495107a745899;hpb=372eab0eac428149d547b2d6eb2ce43cd0d750f6 diff --git a/resources/tools/presentation/input_data_parser.py b/resources/tools/presentation/input_data_parser.py index 9428b2cdc9..beec34c106 100644 --- a/resources/tools/presentation/input_data_parser.py +++ b/resources/tools/presentation/input_data_parser.py @@ -31,6 +31,7 @@ from string import replace from os import remove from input_data_files import download_and_unzip_data_file +from utils import Worker class ExecutionChecker(ResultVisitor): @@ -863,12 +864,10 @@ class InputData(object): logging.info("Downloading and parsing input files ...") work_queue = multiprocessing.JoinableQueue() - manager = multiprocessing.Manager() - data_queue = manager.Queue() - cpus = multiprocessing.cpu_count() + workers = list() for cpu in range(cpus): worker = Worker(work_queue, @@ -1008,9 +1007,6 @@ class InputData(object): :rtype pandas.Series """ - logging.info(" Creating the data set for the {0} '{1}'.". - format(element.get("type", ""), element.get("title", ""))) - try: if element["filter"] in ("all", "template"): cond = "True" @@ -1095,44 +1091,3 @@ class InputData(object): merged_data[ID] = item_data return merged_data - - -class Worker(multiprocessing.Process): - """Worker class used to download and process input files in separate - parallel processes. - """ - - def __init__(self, work_queue, data_queue, func): - """Initialization. - - :param work_queue: Queue with items to process. - :param data_queue: Shared memory between processes. Queue which keeps - the result data. This data is then read by the main process and used - in further processing. - :param func: Function which is executed by the worker. - :type work_queue: multiprocessing.JoinableQueue - :type data_queue: multiprocessing.Manager().Queue() - :type func: Callable object - """ - super(Worker, self).__init__() - self._work_queue = work_queue - self._data_queue = data_queue - self._func = func - - def run(self): - """Method representing the process's activity. - """ - - while True: - try: - self.process(self._work_queue.get()) - finally: - self._work_queue.task_done() - - def process(self, item_to_process): - """Method executed by the runner. - - :param item_to_process: Data to be processed by the function. - :type item_to_process: tuple - """ - self._func(self.pid, self._data_queue, *item_to_process)