from os import remove
from input_data_files import download_and_unzip_data_file
+from utils import Worker
class ExecutionChecker(ResultVisitor):
logging.info("Downloading and parsing input files ...")
work_queue = multiprocessing.JoinableQueue()
-
manager = multiprocessing.Manager()
-
data_queue = manager.Queue()
-
cpus = multiprocessing.cpu_count()
+
workers = list()
for cpu in range(cpus):
worker = Worker(work_queue,
:rtype pandas.Series
"""
- logging.info(" Creating the data set for the {0} '{1}'.".
- format(element.get("type", ""), element.get("title", "")))
-
try:
if element["filter"] in ("all", "template"):
cond = "True"
merged_data[ID] = item_data
return merged_data
-
-
-class Worker(multiprocessing.Process):
- """Worker class used to download and process input files in separate
- parallel processes.
- """
-
- def __init__(self, work_queue, data_queue, func):
- """Initialization.
-
- :param work_queue: Queue with items to process.
- :param data_queue: Shared memory between processes. Queue which keeps
- the result data. This data is then read by the main process and used
- in further processing.
- :param func: Function which is executed by the worker.
- :type work_queue: multiprocessing.JoinableQueue
- :type data_queue: multiprocessing.Manager().Queue()
- :type func: Callable object
- """
- super(Worker, self).__init__()
- self._work_queue = work_queue
- self._data_queue = data_queue
- self._func = func
-
- def run(self):
- """Method representing the process's activity.
- """
-
- while True:
- try:
- self.process(self._work_queue.get())
- finally:
- self._work_queue.task_done()
-
- def process(self, item_to_process):
- """Method executed by the runner.
-
- :param item_to_process: Data to be processed by the function.
- :type item_to_process: tuple
- """
- self._func(self.pid, self._data_queue, *item_to_process)