1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Data pre-processing
16 - extract data from output.xml files generated by Jenkins jobs and store in
18 - provide access to the data.
21 import multiprocessing
27 from robot.api import ExecutionResult, ResultVisitor
28 from robot import errors
29 from collections import OrderedDict
30 from string import replace
33 from input_data_files import download_and_unzip_data_file
36 class ExecutionChecker(ResultVisitor):
37 """Class to traverse through the test suite structure.
39 The functionality implemented in this class generates a json structure:
44 "metadata": { # Optional
45 "version": "VPP version",
46 "job": "Jenkins job name",
47 "build": "Information about the build"
51 "doc": "Suite 1 documentation",
52 "parent": "Suite 1 parent",
53 "level": "Level of the suite in the suite hierarchy"
56 "doc": "Suite N documentation",
57 "parent": "Suite 2 parent",
58 "level": "Level of the suite in the suite hierarchy"
64 "parent": "Name of the parent of the test",
65 "doc": "Test documentation"
67 "tags": ["tag 1", "tag 2", "tag n"],
68 "type": "PDR" | "NDR",
71 "unit": "pps" | "bps" | "percentage"
80 "50": { # Only for NDR
85 "10": { # Only for NDR
97 "50": { # Only for NDR
102 "10": { # Only for NDR
109 "lossTolerance": "lossTolerance", # Only for PDR
110 "vat-history": "DUT1 and DUT2 VAT History"
112 "show-run": "Show Run"
124 "metadata": { # Optional
125 "version": "VPP version",
126 "job": "Jenkins job name",
127 "build": "Information about the build"
131 "doc": "Suite 1 documentation",
132 "parent": "Suite 1 parent",
133 "level": "Level of the suite in the suite hierarchy"
136 "doc": "Suite N documentation",
137 "parent": "Suite 2 parent",
138 "level": "Level of the suite in the suite hierarchy"
144 "parent": "Name of the parent of the test",
145 "doc": "Test documentation"
146 "msg": "Test message"
147 "tags": ["tag 1", "tag 2", "tag n"],
148 "vat-history": "DUT1 and DUT2 VAT History"
149 "show-run": "Show Run"
150 "status": "PASS" | "FAIL"
158 .. note:: ID is the lowercase full path to the test.
161 REGEX_RATE = re.compile(r'^[\D\d]*FINAL_RATE:\s(\d+\.\d+)\s(\w+)')
163 REGEX_LAT_NDR = re.compile(r'^[\D\d]*'
164 r'LAT_\d+%NDR:\s\[\'(-?\d+\/-?\d+/-?\d+)\','
165 r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
166 r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
167 r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
168 r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
169 r'\s\'(-?\d+/-?\d+/-?\d+)\'\]')
171 REGEX_LAT_PDR = re.compile(r'^[\D\d]*'
172 r'LAT_\d+%PDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
173 r'\s\'(-?\d+/-?\d+/-?\d+)\'\][\D\d]*')
175 REGEX_TOLERANCE = re.compile(r'^[\D\d]*LOSS_ACCEPTANCE:\s(\d*\.\d*)\s'
178 REGEX_VERSION = re.compile(r"(return STDOUT Version:\s*)(.*)")
180 REGEX_TCP = re.compile(r'Total\s(rps|cps|throughput):\s([0-9]*).*$')
182 REGEX_MRR = re.compile(r'MaxReceivedRate_Results\s\[pkts/(\d*)sec\]:\s'
183 r'tx\s(\d*),\srx\s(\d*)')
185 def __init__(self, metadata):
188 :param metadata: Key-value pairs to be included in "metadata" part of
193 # Type of message to parse out from the test messages
194 self._msg_type = None
199 # Number of VAT History messages found:
201 # 1 - VAT History of DUT1
202 # 2 - VAT History of DUT2
203 self._lookup_kw_nr = 0
204 self._vat_history_lookup_nr = 0
206 # Number of Show Running messages found
208 # 1 - Show run message found
209 self._show_run_lookup_nr = 0
211 # Test ID of currently processed test- the lowercase full path to the
215 # The main data structure
217 "metadata": OrderedDict(),
218 "suites": OrderedDict(),
219 "tests": OrderedDict()
222 # Save the provided metadata
223 for key, val in metadata.items():
224 self._data["metadata"][key] = val
226 # Dictionary defining the methods used to parse different types of
229 "setup-version": self._get_version,
230 "teardown-vat-history": self._get_vat_history,
231 "test-show-runtime": self._get_show_run
236 """Getter - Data parsed from the XML file.
238 :returns: Data parsed from the XML file.
243 def _get_version(self, msg):
244 """Called when extraction of VPP version is required.
246 :param msg: Message to process.
251 if msg.message.count("return STDOUT Version:"):
252 self._version = str(re.search(self.REGEX_VERSION, msg.message).
254 self._data["metadata"]["version"] = self._version
255 self._data["metadata"]["generated"] = msg.timestamp
256 self._msg_type = None
258 def _get_vat_history(self, msg):
259 """Called when extraction of VAT command history is required.
261 :param msg: Message to process.
265 if msg.message.count("VAT command history:"):
266 self._vat_history_lookup_nr += 1
267 if self._vat_history_lookup_nr == 1:
268 self._data["tests"][self._test_ID]["vat-history"] = str()
270 self._msg_type = None
271 text = re.sub("[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3} "
272 "VAT command history:", "", msg.message, count=1). \
273 replace("\n\n", "\n").replace('\n', ' |br| ').\
274 replace('\r', '').replace('"', "'")
276 self._data["tests"][self._test_ID]["vat-history"] += " |br| "
277 self._data["tests"][self._test_ID]["vat-history"] += \
278 "**DUT" + str(self._vat_history_lookup_nr) + ":** " + text
280 def _get_show_run(self, msg):
281 """Called when extraction of VPP operational data (output of CLI command
282 Show Runtime) is required.
284 :param msg: Message to process.
288 if msg.message.count("return STDOUT Thread "):
289 self._show_run_lookup_nr += 1
290 if self._lookup_kw_nr == 1 and self._show_run_lookup_nr == 1:
291 self._data["tests"][self._test_ID]["show-run"] = str()
292 if self._lookup_kw_nr > 1:
293 self._msg_type = None
294 if self._show_run_lookup_nr == 1:
295 text = msg.message.replace("vat# ", "").\
296 replace("return STDOUT ", "").replace("\n\n", "\n").\
297 replace('\n', ' |br| ').\
298 replace('\r', '').replace('"', "'")
300 self._data["tests"][self._test_ID]["show-run"] += " |br| "
301 self._data["tests"][self._test_ID]["show-run"] += \
302 "**DUT" + str(self._lookup_kw_nr) + ":** |br| " + text
306 def _get_latency(self, msg, test_type):
307 """Get the latency data from the test message.
309 :param msg: Message to be parsed.
310 :param test_type: Type of the test - NDR or PDR.
313 :returns: Latencies parsed from the message.
317 if test_type == "NDR":
318 groups = re.search(self.REGEX_LAT_NDR, msg)
319 groups_range = range(1, 7)
320 elif test_type == "PDR":
321 groups = re.search(self.REGEX_LAT_PDR, msg)
322 groups_range = range(1, 3)
327 for idx in groups_range:
329 lat = [int(item) for item in str(groups.group(idx)).split('/')]
330 except (AttributeError, ValueError):
332 latencies.append(lat)
334 keys = ("min", "avg", "max")
342 latency["direction1"]["100"] = dict(zip(keys, latencies[0]))
343 latency["direction2"]["100"] = dict(zip(keys, latencies[1]))
344 if test_type == "NDR":
345 latency["direction1"]["50"] = dict(zip(keys, latencies[2]))
346 latency["direction2"]["50"] = dict(zip(keys, latencies[3]))
347 latency["direction1"]["10"] = dict(zip(keys, latencies[4]))
348 latency["direction2"]["10"] = dict(zip(keys, latencies[5]))
352 def visit_suite(self, suite):
353 """Implements traversing through the suite and its direct children.
355 :param suite: Suite to process.
359 if self.start_suite(suite) is not False:
360 suite.suites.visit(self)
361 suite.tests.visit(self)
362 self.end_suite(suite)
364 def start_suite(self, suite):
365 """Called when suite starts.
367 :param suite: Suite to process.
373 parent_name = suite.parent.name
374 except AttributeError:
377 doc_str = suite.doc.replace('"', "'").replace('\n', ' ').\
378 replace('\r', '').replace('*[', ' |br| *[').replace("*", "**")
379 doc_str = replace(doc_str, ' |br| *[', '*[', maxreplace=1)
381 self._data["suites"][suite.longname.lower().replace('"', "'").
382 replace(" ", "_")] = {
383 "name": suite.name.lower(),
385 "parent": parent_name,
386 "level": len(suite.longname.split("."))
389 suite.keywords.visit(self)
391 def end_suite(self, suite):
392 """Called when suite ends.
394 :param suite: Suite to process.
400 def visit_test(self, test):
401 """Implements traversing through the test.
403 :param test: Test to process.
407 if self.start_test(test) is not False:
408 test.keywords.visit(self)
411 def start_test(self, test):
412 """Called when test starts.
414 :param test: Test to process.
419 tags = [str(tag) for tag in test.tags]
421 test_result["name"] = test.name.lower()
422 test_result["parent"] = test.parent.name.lower()
423 test_result["tags"] = tags
424 doc_str = test.doc.replace('"', "'").replace('\n', ' '). \
425 replace('\r', '').replace('[', ' |br| [')
426 test_result["doc"] = replace(doc_str, ' |br| [', '[', maxreplace=1)
427 test_result["msg"] = test.message.replace('\n', ' |br| '). \
428 replace('\r', '').replace('"', "'")
429 if test.status == "PASS" and ("NDRPDRDISC" in tags or
432 if "NDRDISC" in tags:
434 elif "PDRDISC" in tags:
443 test_result["type"] = test_type
445 if test_type in ("NDR", "PDR"):
447 rate_value = str(re.search(
448 self.REGEX_RATE, test.message).group(1))
449 except AttributeError:
452 rate_unit = str(re.search(
453 self.REGEX_RATE, test.message).group(2))
454 except AttributeError:
457 test_result["throughput"] = dict()
458 test_result["throughput"]["value"] = \
459 int(rate_value.split('.')[0])
460 test_result["throughput"]["unit"] = rate_unit
461 test_result["latency"] = \
462 self._get_latency(test.message, test_type)
463 if test_type == "PDR":
464 test_result["lossTolerance"] = str(re.search(
465 self.REGEX_TOLERANCE, test.message).group(1))
467 elif test_type in ("TCP", ):
468 groups = re.search(self.REGEX_TCP, test.message)
469 test_result["result"] = dict()
470 test_result["result"]["value"] = int(groups.group(2))
471 test_result["result"]["unit"] = groups.group(1)
472 elif test_type in ("MRR", ):
473 groups = re.search(self.REGEX_MRR, test.message)
474 test_result["result"] = dict()
475 test_result["result"]["duration"] = int(groups.group(1))
476 test_result["result"]["tx"] = int(groups.group(2))
477 test_result["result"]["rx"] = int(groups.group(3))
478 test_result["result"]["throughput"] = int(
479 test_result["result"]["rx"] /
480 test_result["result"]["duration"])
482 test_result["status"] = test.status
484 self._test_ID = test.longname.lower()
485 self._data["tests"][self._test_ID] = test_result
487 def end_test(self, test):
488 """Called when test ends.
490 :param test: Test to process.
496 def visit_keyword(self, keyword):
497 """Implements traversing through the keyword and its child keywords.
499 :param keyword: Keyword to process.
500 :type keyword: Keyword
503 if self.start_keyword(keyword) is not False:
504 self.end_keyword(keyword)
506 def start_keyword(self, keyword):
507 """Called when keyword starts. Default implementation does nothing.
509 :param keyword: Keyword to process.
510 :type keyword: Keyword
514 if keyword.type == "setup":
515 self.visit_setup_kw(keyword)
516 elif keyword.type == "teardown":
517 self._lookup_kw_nr = 0
518 self.visit_teardown_kw(keyword)
520 self._lookup_kw_nr = 0
521 self.visit_test_kw(keyword)
522 except AttributeError:
525 def end_keyword(self, keyword):
526 """Called when keyword ends. Default implementation does nothing.
528 :param keyword: Keyword to process.
529 :type keyword: Keyword
534 def visit_test_kw(self, test_kw):
535 """Implements traversing through the test keyword and its child
538 :param test_kw: Keyword to process.
539 :type test_kw: Keyword
542 for keyword in test_kw.keywords:
543 if self.start_test_kw(keyword) is not False:
544 self.visit_test_kw(keyword)
545 self.end_test_kw(keyword)
547 def start_test_kw(self, test_kw):
548 """Called when test keyword starts. Default implementation does
551 :param test_kw: Keyword to process.
552 :type test_kw: Keyword
555 if test_kw.name.count("Show Runtime Counters On All Duts"):
556 self._lookup_kw_nr += 1
557 self._show_run_lookup_nr = 0
558 self._msg_type = "test-show-runtime"
559 test_kw.messages.visit(self)
561 def end_test_kw(self, test_kw):
562 """Called when keyword ends. Default implementation does nothing.
564 :param test_kw: Keyword to process.
565 :type test_kw: Keyword
570 def visit_setup_kw(self, setup_kw):
571 """Implements traversing through the teardown keyword and its child
574 :param setup_kw: Keyword to process.
575 :type setup_kw: Keyword
578 for keyword in setup_kw.keywords:
579 if self.start_setup_kw(keyword) is not False:
580 self.visit_setup_kw(keyword)
581 self.end_setup_kw(keyword)
583 def start_setup_kw(self, setup_kw):
584 """Called when teardown keyword starts. Default implementation does
587 :param setup_kw: Keyword to process.
588 :type setup_kw: Keyword
591 if setup_kw.name.count("Show Vpp Version On All Duts") \
592 and not self._version:
593 self._msg_type = "setup-version"
594 setup_kw.messages.visit(self)
596 def end_setup_kw(self, setup_kw):
597 """Called when keyword ends. Default implementation does nothing.
599 :param setup_kw: Keyword to process.
600 :type setup_kw: Keyword
605 def visit_teardown_kw(self, teardown_kw):
606 """Implements traversing through the teardown keyword and its child
609 :param teardown_kw: Keyword to process.
610 :type teardown_kw: Keyword
613 for keyword in teardown_kw.keywords:
614 if self.start_teardown_kw(keyword) is not False:
615 self.visit_teardown_kw(keyword)
616 self.end_teardown_kw(keyword)
618 def start_teardown_kw(self, teardown_kw):
619 """Called when teardown keyword starts. Default implementation does
622 :param teardown_kw: Keyword to process.
623 :type teardown_kw: Keyword
627 if teardown_kw.name.count("Show Vat History On All Duts"):
628 self._vat_history_lookup_nr = 0
629 self._msg_type = "teardown-vat-history"
630 teardown_kw.messages.visit(self)
632 def end_teardown_kw(self, teardown_kw):
633 """Called when keyword ends. Default implementation does nothing.
635 :param teardown_kw: Keyword to process.
636 :type teardown_kw: Keyword
641 def visit_message(self, msg):
642 """Implements visiting the message.
644 :param msg: Message to process.
648 if self.start_message(msg) is not False:
649 self.end_message(msg)
651 def start_message(self, msg):
652 """Called when message starts. Get required information from messages:
655 :param msg: Message to process.
661 self.parse_msg[self._msg_type](msg)
663 def end_message(self, msg):
664 """Called when message ends. Default implementation does nothing.
666 :param msg: Message to process.
673 class InputData(object):
676 The data is extracted from output.xml files generated by Jenkins jobs and
677 stored in pandas' DataFrames.
688 - ID: test data (as described in ExecutionChecker documentation)
691 def __init__(self, spec):
694 :param spec: Specification.
695 :type spec: Specification
702 self._input_data = pd.Series()
706 """Getter - Input data.
709 :rtype: pandas.Series
711 return self._input_data
713 def metadata(self, job, build):
716 :param job: Job which metadata we want.
717 :param build: Build which metadata we want.
721 :rtype: pandas.Series
724 return self.data[job][build]["metadata"]
726 def suites(self, job, build):
729 :param job: Job which suites we want.
730 :param build: Build which suites we want.
734 :rtype: pandas.Series
737 return self.data[job][str(build)]["suites"]
739 def tests(self, job, build):
742 :param job: Job which tests we want.
743 :param build: Build which tests we want.
747 :rtype: pandas.Series
750 return self.data[job][build]["tests"]
753 def _parse_tests(job, build, log):
754 """Process data from robot output.xml file and return JSON structured
757 :param job: The name of job which build output data will be processed.
758 :param build: The build which output data will be processed.
759 :param log: List of log messages.
762 :type log: list of tuples (severity, msg)
763 :returns: JSON data structure.
772 with open(build["file-name"], 'r') as data_file:
774 result = ExecutionResult(data_file)
775 except errors.DataError as err:
776 log.append(("ERROR", "Error occurred while parsing output.xml: "
779 checker = ExecutionChecker(metadata)
780 result.visit(checker)
784 def _download_and_parse_build(self, pid, data_queue, job, build, repeat):
785 """Download and parse the input data file.
787 :param pid: PID of the process executing this method.
788 :param data_queue: Shared memory between processes. Queue which keeps
789 the result data. This data is then read by the main process and used
790 in further processing.
791 :param job: Name of the Jenkins job which generated the processed input
793 :param build: Information about the Jenkins build which generated the
794 processed input file.
795 :param repeat: Repeat the download specified number of times if not
798 :type data_queue: multiprocessing.Manager().Queue()
806 logging.info(" Processing the job/build: {0}: {1}".
807 format(job, build["build"]))
809 logs.append(("INFO", " Processing the job/build: {0}: {1}".
810 format(job, build["build"])))
817 success = download_and_unzip_data_file(self._cfg, job, build, pid,
823 logs.append(("ERROR", "It is not possible to download the input "
824 "data file from the job '{job}', build "
825 "'{build}', or it is damaged. Skipped.".
826 format(job=job, build=build["build"])))
828 logs.append(("INFO", " Processing data from the build '{0}' ...".
829 format(build["build"])))
830 data = InputData._parse_tests(job, build, logs)
832 logs.append(("ERROR", "Input data file from the job '{job}', "
833 "build '{build}' is damaged. Skipped.".
834 format(job=job, build=build["build"])))
839 remove(build["file-name"])
840 except OSError as err:
841 logs.append(("ERROR", "Cannot remove the file '{0}': {1}".
842 format(build["file-name"], err)))
843 logs.append(("INFO", " Done."))
852 data_queue.put(result)
854 def download_and_parse_data(self, repeat=1):
855 """Download the input data files, parse input data from input files and
856 store in pandas' Series.
858 :param repeat: Repeat the download specified number of times if not
863 logging.info("Downloading and parsing input files ...")
865 work_queue = multiprocessing.JoinableQueue()
867 manager = multiprocessing.Manager()
869 data_queue = manager.Queue()
871 cpus = multiprocessing.cpu_count()
873 for cpu in range(cpus):
874 worker = Worker(work_queue,
876 self._download_and_parse_build)
879 workers.append(worker)
880 os.system("taskset -p -c {0} {1} > /dev/null 2>&1".
881 format(cpu, worker.pid))
883 for job, builds in self._cfg.builds.items():
885 work_queue.put((job, build, repeat))
889 logging.info("Done.")
891 while not data_queue.empty():
892 result = data_queue.get()
895 build_nr = result["build"]["build"]
898 data = result["data"]
899 build_data = pd.Series({
900 "metadata": pd.Series(data["metadata"].values(),
901 index=data["metadata"].keys()),
902 "suites": pd.Series(data["suites"].values(),
903 index=data["suites"].keys()),
904 "tests": pd.Series(data["tests"].values(),
905 index=data["tests"].keys())})
907 if self._input_data.get(job, None) is None:
908 self._input_data[job] = pd.Series()
909 self._input_data[job][str(build_nr)] = build_data
911 self._cfg.set_input_file_name(job, build_nr,
912 result["build"]["file-name"])
914 self._cfg.set_input_state(job, build_nr, result["state"])
916 for item in result["logs"]:
917 if item[0] == "INFO":
918 logging.info(item[1])
919 elif item[0] == "ERROR":
920 logging.error(item[1])
921 elif item[0] == "DEBUG":
922 logging.debug(item[1])
923 elif item[0] == "CRITICAL":
924 logging.critical(item[1])
925 elif item[0] == "WARNING":
926 logging.warning(item[1])
930 # Terminate all workers
931 for worker in workers:
935 logging.info("Done.")
938 def _end_of_tag(tag_filter, start=0, closer="'"):
939 """Return the index of character in the string which is the end of tag.
941 :param tag_filter: The string where the end of tag is being searched.
942 :param start: The index where the searching is stated.
943 :param closer: The character which is the tag closer.
944 :type tag_filter: str
947 :returns: The index of the tag closer.
952 idx_opener = tag_filter.index(closer, start)
953 return tag_filter.index(closer, idx_opener + 1)
958 def _condition(tag_filter):
959 """Create a conditional statement from the given tag filter.
961 :param tag_filter: Filter based on tags from the element specification.
962 :type tag_filter: str
963 :returns: Conditional statement which can be evaluated.
969 index = InputData._end_of_tag(tag_filter, index)
973 tag_filter = tag_filter[:index] + " in tags" + tag_filter[index:]
975 def filter_data(self, element, params=None, data_set="tests",
976 continue_on_error=False):
977 """Filter required data from the given jobs and builds.
979 The output data structure is:
996 :param element: Element which will use the filtered data.
997 :param params: Parameters which will be included in the output. If None,
998 all parameters are included.
999 :param data_set: The set of data to be filtered: tests, suites,
1001 :param continue_on_error: Continue if there is error while reading the
1002 data. The Item will be empty then
1003 :type element: pandas.Series
1006 :type continue_on_error: bool
1007 :returns: Filtered data.
1008 :rtype pandas.Series
1011 logging.info(" Creating the data set for the {0} '{1}'.".
1012 format(element.get("type", ""), element.get("title", "")))
1015 if element["filter"] in ("all", "template"):
1018 cond = InputData._condition(element["filter"])
1019 logging.debug(" Filter: {0}".format(cond))
1021 logging.error(" No filter defined.")
1025 params = element.get("parameters", None)
1029 for job, builds in element["data"].items():
1030 data[job] = pd.Series()
1031 for build in builds:
1032 data[job][str(build)] = pd.Series()
1034 data_iter = self.data[job][str(build)][data_set].\
1037 if continue_on_error:
1041 for test_ID, test_data in data_iter:
1042 if eval(cond, {"tags": test_data.get("tags", "")}):
1043 data[job][str(build)][test_ID] = pd.Series()
1045 for param, val in test_data.items():
1046 data[job][str(build)][test_ID][param] = val
1048 for param in params:
1050 data[job][str(build)][test_ID][param] =\
1053 data[job][str(build)][test_ID][param] =\
1057 except (KeyError, IndexError, ValueError) as err:
1058 logging.error(" Missing mandatory parameter in the element "
1059 "specification: {0}".format(err))
1061 except AttributeError:
1064 logging.error(" The filter '{0}' is not correct. Check if all "
1065 "tags are enclosed by apostrophes.".format(cond))
1069 def merge_data(data):
1070 """Merge data from more jobs and builds to a simple data structure.
1072 The output data structure is:
1074 - test (suite) 1 ID:
1080 - test (suite) n ID:
1083 :param data: Data to merge.
1084 :type data: pandas.Series
1085 :returns: Merged data.
1086 :rtype: pandas.Series
1089 logging.info(" Merging data ...")
1091 merged_data = pd.Series()
1092 for _, builds in data.iteritems():
1093 for _, item in builds.iteritems():
1094 for ID, item_data in item.iteritems():
1095 merged_data[ID] = item_data
1100 class Worker(multiprocessing.Process):
1101 """Worker class used to download and process input files in separate
1105 def __init__(self, work_queue, data_queue, func):
1108 :param work_queue: Queue with items to process.
1109 :param data_queue: Shared memory between processes. Queue which keeps
1110 the result data. This data is then read by the main process and used
1111 in further processing.
1112 :param func: Function which is executed by the worker.
1113 :type work_queue: multiprocessing.JoinableQueue
1114 :type data_queue: multiprocessing.Manager().Queue()
1115 :type func: Callable object
1117 super(Worker, self).__init__()
1118 self._work_queue = work_queue
1119 self._data_queue = data_queue
1123 """Method representing the process's activity.
1128 self.process(self._work_queue.get())
1130 self._work_queue.task_done()
1132 def process(self, item_to_process):
1133 """Method executed by the runner.
1135 :param item_to_process: Data to be processed by the function.
1136 :type item_to_process: tuple
1138 self._func(self.pid, self._data_queue, *item_to_process)