1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Data pre-processing
16 - extract data from output.xml files generated by Jenkins jobs and store in
18 - provide access to the data.
19 - filter the data using tags,
22 import multiprocessing
28 from robot.api import ExecutionResult, ResultVisitor
29 from robot import errors
30 from collections import OrderedDict
31 from string import replace
33 from jumpavg.AvgStdevMetadataFactory import AvgStdevMetadataFactory
35 from input_data_files import download_and_unzip_data_file
36 from utils import Worker
39 class ExecutionChecker(ResultVisitor):
40 """Class to traverse through the test suite structure.
42 The functionality implemented in this class generates a json structure:
48 "generated": "Timestamp",
49 "version": "SUT version",
50 "job": "Jenkins job name",
51 "build": "Information about the build"
54 "Suite long name 1": {
56 "doc": "Suite 1 documentation",
57 "parent": "Suite 1 parent",
58 "level": "Level of the suite in the suite hierarchy"
60 "Suite long name N": {
62 "doc": "Suite N documentation",
63 "parent": "Suite 2 parent",
64 "level": "Level of the suite in the suite hierarchy"
70 "parent": "Name of the parent of the test",
71 "doc": "Test documentation"
73 "tags": ["tag 1", "tag 2", "tag n"],
74 "type": "PDR" | "NDR" | "TCP" | "MRR" | "BMRR",
75 "throughput": { # Only type: "PDR" | "NDR"
77 "unit": "pps" | "bps" | "percentage"
79 "latency": { # Only type: "PDR" | "NDR"
86 "50": { # Only for NDR
91 "10": { # Only for NDR
103 "50": { # Only for NDR
108 "10": { # Only for NDR
115 "result": { # Only type: "TCP"
117 "unit": "cps" | "rps"
119 "result": { # Only type: "MRR" | "BMRR"
120 "receive-rate": AvgStdevMetadata,
122 "lossTolerance": "lossTolerance", # Only type: "PDR"
123 "vat-history": "DUT1 and DUT2 VAT History"
124 "show-run": "Show Run"
136 "metadata": { # Optional
137 "version": "VPP version",
138 "job": "Jenkins job name",
139 "build": "Information about the build"
143 "doc": "Suite 1 documentation",
144 "parent": "Suite 1 parent",
145 "level": "Level of the suite in the suite hierarchy"
148 "doc": "Suite N documentation",
149 "parent": "Suite 2 parent",
150 "level": "Level of the suite in the suite hierarchy"
156 "parent": "Name of the parent of the test",
157 "doc": "Test documentation"
158 "msg": "Test message"
159 "tags": ["tag 1", "tag 2", "tag n"],
160 "vat-history": "DUT1 and DUT2 VAT History"
161 "show-run": "Show Run"
162 "status": "PASS" | "FAIL"
170 .. note:: ID is the lowercase full path to the test.
173 REGEX_RATE = re.compile(r'^[\D\d]*FINAL_RATE:\s(\d+\.\d+)\s(\w+)')
175 REGEX_LAT_NDR = re.compile(r'^[\D\d]*'
176 r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
177 r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
178 r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
179 r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
180 r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
181 r'\s\'(-?\d+/-?\d+/-?\d+)\'\]')
183 REGEX_LAT_PDR = re.compile(r'^[\D\d]*'
184 r'LAT_\d+%PDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
185 r'\s\'(-?\d+/-?\d+/-?\d+)\'\][\D\d]*')
187 REGEX_TOLERANCE = re.compile(r'^[\D\d]*LOSS_ACCEPTANCE:\s(\d*\.\d*)\s'
190 REGEX_VERSION_VPP = re.compile(r"(return STDOUT Version:\s*)(.*)")
192 REGEX_VERSION_DPDK = re.compile(r"(return STDOUT testpmd)([\d\D\n]*)"
193 r"(RTE Version: 'DPDK )(.*)(')")
195 REGEX_TCP = re.compile(r'Total\s(rps|cps|throughput):\s([0-9]*).*$')
197 REGEX_MRR = re.compile(r'MaxReceivedRate_Results\s\[pkts/(\d*)sec\]:\s'
198 r'tx\s(\d*),\srx\s(\d*)')
200 REGEX_BMRR = re.compile(r'Maximum Receive Rate trial results'
201 r' in packets per second: \[(.*)\]')
203 REGEX_TC_TAG = re.compile(r'\d+[tT]\d+[cC]')
205 REGEX_TC_NAME_OLD = re.compile(r'-\d+[tT]\d+[cC]-')
207 REGEX_TC_NAME_NEW = re.compile(r'-\d+[cC]-')
209 def __init__(self, metadata):
212 :param metadata: Key-value pairs to be included in "metadata" part of
217 # Type of message to parse out from the test messages
218 self._msg_type = None
224 self._timestamp = None
226 # Number of VAT History messages found:
228 # 1 - VAT History of DUT1
229 # 2 - VAT History of DUT2
230 self._lookup_kw_nr = 0
231 self._vat_history_lookup_nr = 0
233 # Number of Show Running messages found
235 # 1 - Show run message found
236 self._show_run_lookup_nr = 0
238 # Test ID of currently processed test- the lowercase full path to the
242 # The main data structure
244 "metadata": OrderedDict(),
245 "suites": OrderedDict(),
246 "tests": OrderedDict()
249 # Save the provided metadata
250 for key, val in metadata.items():
251 self._data["metadata"][key] = val
253 # Dictionary defining the methods used to parse different types of
256 "timestamp": self._get_timestamp,
257 "vpp-version": self._get_vpp_version,
258 "dpdk-version": self._get_dpdk_version,
259 "teardown-vat-history": self._get_vat_history,
260 "test-show-runtime": self._get_show_run
265 """Getter - Data parsed from the XML file.
267 :returns: Data parsed from the XML file.
272 def _get_vpp_version(self, msg):
273 """Called when extraction of VPP version is required.
275 :param msg: Message to process.
280 if msg.message.count("return STDOUT Version:"):
281 self._version = str(re.search(self.REGEX_VERSION_VPP, msg.message).
283 self._data["metadata"]["version"] = self._version
284 self._msg_type = None
286 def _get_dpdk_version(self, msg):
287 """Called when extraction of DPDK version is required.
289 :param msg: Message to process.
294 if msg.message.count("return STDOUT testpmd"):
296 self._version = str(re.search(
297 self.REGEX_VERSION_DPDK, msg.message). group(4))
298 self._data["metadata"]["version"] = self._version
302 self._msg_type = None
304 def _get_timestamp(self, msg):
305 """Called when extraction of timestamp is required.
307 :param msg: Message to process.
312 self._timestamp = msg.timestamp[:14]
313 self._data["metadata"]["generated"] = self._timestamp
314 self._msg_type = None
316 def _get_vat_history(self, msg):
317 """Called when extraction of VAT command history is required.
319 :param msg: Message to process.
323 if msg.message.count("VAT command history:"):
324 self._vat_history_lookup_nr += 1
325 if self._vat_history_lookup_nr == 1:
326 self._data["tests"][self._test_ID]["vat-history"] = str()
328 self._msg_type = None
329 text = re.sub("[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3} "
330 "VAT command history:", "", msg.message, count=1). \
331 replace("\n\n", "\n").replace('\n', ' |br| ').\
332 replace('\r', '').replace('"', "'")
334 self._data["tests"][self._test_ID]["vat-history"] += " |br| "
335 self._data["tests"][self._test_ID]["vat-history"] += \
336 "**DUT" + str(self._vat_history_lookup_nr) + ":** " + text
338 def _get_show_run(self, msg):
339 """Called when extraction of VPP operational data (output of CLI command
340 Show Runtime) is required.
342 :param msg: Message to process.
346 if msg.message.count("return STDOUT Thread "):
347 self._show_run_lookup_nr += 1
348 if self._lookup_kw_nr == 1 and self._show_run_lookup_nr == 1:
349 self._data["tests"][self._test_ID]["show-run"] = str()
350 if self._lookup_kw_nr > 1:
351 self._msg_type = None
352 if self._show_run_lookup_nr == 1:
353 text = msg.message.replace("vat# ", "").\
354 replace("return STDOUT ", "").replace("\n\n", "\n").\
355 replace('\n', ' |br| ').\
356 replace('\r', '').replace('"', "'")
358 self._data["tests"][self._test_ID]["show-run"] += " |br| "
359 self._data["tests"][self._test_ID]["show-run"] += \
360 "**DUT" + str(self._lookup_kw_nr) + ":** |br| " + text
364 def _get_latency(self, msg, test_type):
365 """Get the latency data from the test message.
367 :param msg: Message to be parsed.
368 :param test_type: Type of the test - NDR or PDR.
371 :returns: Latencies parsed from the message.
375 if test_type == "NDR":
376 groups = re.search(self.REGEX_LAT_NDR, msg)
377 groups_range = range(1, 7)
378 elif test_type == "PDR":
379 groups = re.search(self.REGEX_LAT_PDR, msg)
380 groups_range = range(1, 3)
385 for idx in groups_range:
387 lat = [int(item) for item in str(groups.group(idx)).split('/')]
388 except (AttributeError, ValueError):
390 latencies.append(lat)
392 keys = ("min", "avg", "max")
400 latency["direction1"]["100"] = dict(zip(keys, latencies[0]))
401 latency["direction2"]["100"] = dict(zip(keys, latencies[1]))
402 if test_type == "NDR":
403 latency["direction1"]["50"] = dict(zip(keys, latencies[2]))
404 latency["direction2"]["50"] = dict(zip(keys, latencies[3]))
405 latency["direction1"]["10"] = dict(zip(keys, latencies[4]))
406 latency["direction2"]["10"] = dict(zip(keys, latencies[5]))
410 def visit_suite(self, suite):
411 """Implements traversing through the suite and its direct children.
413 :param suite: Suite to process.
417 if self.start_suite(suite) is not False:
418 suite.suites.visit(self)
419 suite.tests.visit(self)
420 self.end_suite(suite)
422 def start_suite(self, suite):
423 """Called when suite starts.
425 :param suite: Suite to process.
431 parent_name = suite.parent.name
432 except AttributeError:
435 doc_str = suite.doc.replace('"', "'").replace('\n', ' ').\
436 replace('\r', '').replace('*[', ' |br| *[').replace("*", "**")
437 doc_str = replace(doc_str, ' |br| *[', '*[', maxreplace=1)
439 self._data["suites"][suite.longname.lower().replace('"', "'").
440 replace(" ", "_")] = {
441 "name": suite.name.lower(),
443 "parent": parent_name,
444 "level": len(suite.longname.split("."))
447 suite.keywords.visit(self)
449 def end_suite(self, suite):
450 """Called when suite ends.
452 :param suite: Suite to process.
458 def visit_test(self, test):
459 """Implements traversing through the test.
461 :param test: Test to process.
465 if self.start_test(test) is not False:
466 test.keywords.visit(self)
469 def start_test(self, test):
470 """Called when test starts.
472 :param test: Test to process.
477 tags = [str(tag) for tag in test.tags]
479 test_result["name"] = test.name.lower()
480 test_result["parent"] = test.parent.name.lower()
481 test_result["tags"] = tags
482 doc_str = test.doc.replace('"', "'").replace('\n', ' '). \
483 replace('\r', '').replace('[', ' |br| [')
484 test_result["doc"] = replace(doc_str, ' |br| [', '[', maxreplace=1)
485 test_result["msg"] = test.message.replace('\n', ' |br| '). \
486 replace('\r', '').replace('"', "'")
487 test_result["status"] = test.status
488 self._test_ID = test.longname.lower()
489 if test.status == "PASS" and ("NDRPDRDISC" in tags or
493 if "NDRDISC" in tags:
495 elif "PDRDISC" in tags:
501 elif "FRMOBL" in tags or "BMRR" in tags:
504 test_result["status"] = "FAIL"
505 self._data["tests"][self._test_ID] = test_result
508 test_result["type"] = test_type
510 # Replace info about cores (e.g. -1c-) with the info about threads
511 # and cores (e.g. -1t1c-) in the long test case names and in the
512 # test case names if necessary.
513 groups = re.search(self.REGEX_TC_NAME_OLD, self._test_ID)
516 for tag in test_result["tags"]:
517 groups = re.search(self.REGEX_TC_TAG, tag)
523 self._test_ID = re.sub(self.REGEX_TC_NAME_NEW,
524 "-{0}-".format(tag_tc.lower()),
527 test_result["name"] = re.sub(self.REGEX_TC_NAME_NEW,
528 "-{0}-".format(tag_tc.lower()),
532 test_result["status"] = "FAIL"
533 self._data["tests"][self._test_ID] = test_result
534 logging.error("The test '{0}' has no or more than one "
535 "multi-threading tags.".format(self._test_ID))
538 if test_type in ("NDR", "PDR"):
540 rate_value = str(re.search(
541 self.REGEX_RATE, test.message).group(1))
542 except AttributeError:
545 rate_unit = str(re.search(
546 self.REGEX_RATE, test.message).group(2))
547 except AttributeError:
550 test_result["throughput"] = dict()
551 test_result["throughput"]["value"] = \
552 int(rate_value.split('.')[0])
553 test_result["throughput"]["unit"] = rate_unit
554 test_result["latency"] = \
555 self._get_latency(test.message, test_type)
556 if test_type == "PDR":
557 test_result["lossTolerance"] = str(re.search(
558 self.REGEX_TOLERANCE, test.message).group(1))
560 elif test_type in ("TCP", ):
561 groups = re.search(self.REGEX_TCP, test.message)
562 test_result["result"] = dict()
563 test_result["result"]["value"] = int(groups.group(2))
564 test_result["result"]["unit"] = groups.group(1)
566 elif test_type in ("MRR", "BMRR"):
567 test_result["result"] = dict()
568 groups = re.search(self.REGEX_BMRR, test.message)
569 if groups is not None:
570 items_str = groups.group(1)
571 items_float = [float(item.strip()) for item
572 in items_str.split(",")]
573 test_result["result"]["receive-rate"] = \
574 AvgStdevMetadataFactory.from_data(items_float)
576 groups = re.search(self.REGEX_MRR, test.message)
577 test_result["result"]["receive-rate"] = \
578 AvgStdevMetadataFactory.from_data([
579 float(groups.group(3)) / float(groups.group(1)), ])
581 self._data["tests"][self._test_ID] = test_result
583 def end_test(self, test):
584 """Called when test ends.
586 :param test: Test to process.
592 def visit_keyword(self, keyword):
593 """Implements traversing through the keyword and its child keywords.
595 :param keyword: Keyword to process.
596 :type keyword: Keyword
599 if self.start_keyword(keyword) is not False:
600 self.end_keyword(keyword)
602 def start_keyword(self, keyword):
603 """Called when keyword starts. Default implementation does nothing.
605 :param keyword: Keyword to process.
606 :type keyword: Keyword
610 if keyword.type == "setup":
611 self.visit_setup_kw(keyword)
612 elif keyword.type == "teardown":
613 self._lookup_kw_nr = 0
614 self.visit_teardown_kw(keyword)
616 self._lookup_kw_nr = 0
617 self.visit_test_kw(keyword)
618 except AttributeError:
621 def end_keyword(self, keyword):
622 """Called when keyword ends. Default implementation does nothing.
624 :param keyword: Keyword to process.
625 :type keyword: Keyword
630 def visit_test_kw(self, test_kw):
631 """Implements traversing through the test keyword and its child
634 :param test_kw: Keyword to process.
635 :type test_kw: Keyword
638 for keyword in test_kw.keywords:
639 if self.start_test_kw(keyword) is not False:
640 self.visit_test_kw(keyword)
641 self.end_test_kw(keyword)
643 def start_test_kw(self, test_kw):
644 """Called when test keyword starts. Default implementation does
647 :param test_kw: Keyword to process.
648 :type test_kw: Keyword
651 if test_kw.name.count("Show Runtime Counters On All Duts"):
652 self._lookup_kw_nr += 1
653 self._show_run_lookup_nr = 0
654 self._msg_type = "test-show-runtime"
655 elif test_kw.name.count("Start The L2fwd Test") and not self._version:
656 self._msg_type = "dpdk-version"
659 test_kw.messages.visit(self)
661 def end_test_kw(self, test_kw):
662 """Called when keyword ends. Default implementation does nothing.
664 :param test_kw: Keyword to process.
665 :type test_kw: Keyword
670 def visit_setup_kw(self, setup_kw):
671 """Implements traversing through the teardown keyword and its child
674 :param setup_kw: Keyword to process.
675 :type setup_kw: Keyword
678 for keyword in setup_kw.keywords:
679 if self.start_setup_kw(keyword) is not False:
680 self.visit_setup_kw(keyword)
681 self.end_setup_kw(keyword)
683 def start_setup_kw(self, setup_kw):
684 """Called when teardown keyword starts. Default implementation does
687 :param setup_kw: Keyword to process.
688 :type setup_kw: Keyword
691 if setup_kw.name.count("Show Vpp Version On All Duts") \
692 and not self._version:
693 self._msg_type = "vpp-version"
695 elif setup_kw.name.count("Setup performance global Variables") \
696 and not self._timestamp:
697 self._msg_type = "timestamp"
700 setup_kw.messages.visit(self)
702 def end_setup_kw(self, setup_kw):
703 """Called when keyword ends. Default implementation does nothing.
705 :param setup_kw: Keyword to process.
706 :type setup_kw: Keyword
711 def visit_teardown_kw(self, teardown_kw):
712 """Implements traversing through the teardown keyword and its child
715 :param teardown_kw: Keyword to process.
716 :type teardown_kw: Keyword
719 for keyword in teardown_kw.keywords:
720 if self.start_teardown_kw(keyword) is not False:
721 self.visit_teardown_kw(keyword)
722 self.end_teardown_kw(keyword)
724 def start_teardown_kw(self, teardown_kw):
725 """Called when teardown keyword starts. Default implementation does
728 :param teardown_kw: Keyword to process.
729 :type teardown_kw: Keyword
733 if teardown_kw.name.count("Show Vat History On All Duts"):
734 self._vat_history_lookup_nr = 0
735 self._msg_type = "teardown-vat-history"
736 teardown_kw.messages.visit(self)
738 def end_teardown_kw(self, teardown_kw):
739 """Called when keyword ends. Default implementation does nothing.
741 :param teardown_kw: Keyword to process.
742 :type teardown_kw: Keyword
747 def visit_message(self, msg):
748 """Implements visiting the message.
750 :param msg: Message to process.
754 if self.start_message(msg) is not False:
755 self.end_message(msg)
757 def start_message(self, msg):
758 """Called when message starts. Get required information from messages:
761 :param msg: Message to process.
767 self.parse_msg[self._msg_type](msg)
769 def end_message(self, msg):
770 """Called when message ends. Default implementation does nothing.
772 :param msg: Message to process.
779 class InputData(object):
782 The data is extracted from output.xml files generated by Jenkins jobs and
783 stored in pandas' DataFrames.
789 (as described in ExecutionChecker documentation)
791 (as described in ExecutionChecker documentation)
793 (as described in ExecutionChecker documentation)
796 def __init__(self, spec):
799 :param spec: Specification.
800 :type spec: Specification
807 self._input_data = pd.Series()
811 """Getter - Input data.
814 :rtype: pandas.Series
816 return self._input_data
818 def metadata(self, job, build):
821 :param job: Job which metadata we want.
822 :param build: Build which metadata we want.
826 :rtype: pandas.Series
829 return self.data[job][build]["metadata"]
831 def suites(self, job, build):
834 :param job: Job which suites we want.
835 :param build: Build which suites we want.
839 :rtype: pandas.Series
842 return self.data[job][str(build)]["suites"]
844 def tests(self, job, build):
847 :param job: Job which tests we want.
848 :param build: Build which tests we want.
852 :rtype: pandas.Series
855 return self.data[job][build]["tests"]
858 def _parse_tests(job, build, log):
859 """Process data from robot output.xml file and return JSON structured
862 :param job: The name of job which build output data will be processed.
863 :param build: The build which output data will be processed.
864 :param log: List of log messages.
867 :type log: list of tuples (severity, msg)
868 :returns: JSON data structure.
877 with open(build["file-name"], 'r') as data_file:
879 result = ExecutionResult(data_file)
880 except errors.DataError as err:
881 log.append(("ERROR", "Error occurred while parsing output.xml: "
884 checker = ExecutionChecker(metadata)
885 result.visit(checker)
889 def _download_and_parse_build(self, pid, data_queue, job, build, repeat):
890 """Download and parse the input data file.
892 :param pid: PID of the process executing this method.
893 :param data_queue: Shared memory between processes. Queue which keeps
894 the result data. This data is then read by the main process and used
895 in further processing.
896 :param job: Name of the Jenkins job which generated the processed input
898 :param build: Information about the Jenkins build which generated the
899 processed input file.
900 :param repeat: Repeat the download specified number of times if not
903 :type data_queue: multiprocessing.Manager().Queue()
911 logging.info(" Processing the job/build: {0}: {1}".
912 format(job, build["build"]))
914 logs.append(("INFO", " Processing the job/build: {0}: {1}".
915 format(job, build["build"])))
922 success = download_and_unzip_data_file(self._cfg, job, build, pid,
928 logs.append(("ERROR", "It is not possible to download the input "
929 "data file from the job '{job}', build "
930 "'{build}', or it is damaged. Skipped.".
931 format(job=job, build=build["build"])))
933 logs.append(("INFO", " Processing data from the build '{0}' ...".
934 format(build["build"])))
935 data = InputData._parse_tests(job, build, logs)
937 logs.append(("ERROR", "Input data file from the job '{job}', "
938 "build '{build}' is damaged. Skipped.".
939 format(job=job, build=build["build"])))
944 remove(build["file-name"])
945 except OSError as err:
946 logs.append(("ERROR", "Cannot remove the file '{0}': {1}".
947 format(build["file-name"], err)))
948 logs.append(("INFO", " Done."))
957 data_queue.put(result)
959 def download_and_parse_data(self, repeat=1):
960 """Download the input data files, parse input data from input files and
961 store in pandas' Series.
963 :param repeat: Repeat the download specified number of times if not
968 logging.info("Downloading and parsing input files ...")
970 work_queue = multiprocessing.JoinableQueue()
971 manager = multiprocessing.Manager()
972 data_queue = manager.Queue()
973 cpus = multiprocessing.cpu_count()
976 for cpu in range(cpus):
977 worker = Worker(work_queue,
979 self._download_and_parse_build)
982 workers.append(worker)
983 os.system("taskset -p -c {0} {1} > /dev/null 2>&1".
984 format(cpu, worker.pid))
986 for job, builds in self._cfg.builds.items():
988 work_queue.put((job, build, repeat))
992 logging.info("Done.")
994 while not data_queue.empty():
995 result = data_queue.get()
998 build_nr = result["build"]["build"]
1001 data = result["data"]
1002 build_data = pd.Series({
1003 "metadata": pd.Series(data["metadata"].values(),
1004 index=data["metadata"].keys()),
1005 "suites": pd.Series(data["suites"].values(),
1006 index=data["suites"].keys()),
1007 "tests": pd.Series(data["tests"].values(),
1008 index=data["tests"].keys())})
1010 if self._input_data.get(job, None) is None:
1011 self._input_data[job] = pd.Series()
1012 self._input_data[job][str(build_nr)] = build_data
1014 self._cfg.set_input_file_name(job, build_nr,
1015 result["build"]["file-name"])
1017 self._cfg.set_input_state(job, build_nr, result["state"])
1019 for item in result["logs"]:
1020 if item[0] == "INFO":
1021 logging.info(item[1])
1022 elif item[0] == "ERROR":
1023 logging.error(item[1])
1024 elif item[0] == "DEBUG":
1025 logging.debug(item[1])
1026 elif item[0] == "CRITICAL":
1027 logging.critical(item[1])
1028 elif item[0] == "WARNING":
1029 logging.warning(item[1])
1033 # Terminate all workers
1034 for worker in workers:
1038 logging.info("Done.")
1041 def _end_of_tag(tag_filter, start=0, closer="'"):
1042 """Return the index of character in the string which is the end of tag.
1044 :param tag_filter: The string where the end of tag is being searched.
1045 :param start: The index where the searching is stated.
1046 :param closer: The character which is the tag closer.
1047 :type tag_filter: str
1050 :returns: The index of the tag closer.
1055 idx_opener = tag_filter.index(closer, start)
1056 return tag_filter.index(closer, idx_opener + 1)
1061 def _condition(tag_filter):
1062 """Create a conditional statement from the given tag filter.
1064 :param tag_filter: Filter based on tags from the element specification.
1065 :type tag_filter: str
1066 :returns: Conditional statement which can be evaluated.
1072 index = InputData._end_of_tag(tag_filter, index)
1076 tag_filter = tag_filter[:index] + " in tags" + tag_filter[index:]
1078 def filter_data(self, element, params=None, data_set="tests",
1079 continue_on_error=False):
1080 """Filter required data from the given jobs and builds.
1082 The output data structure is:
1086 - test (or suite) 1 ID:
1092 - test (or suite) n ID:
1099 :param element: Element which will use the filtered data.
1100 :param params: Parameters which will be included in the output. If None,
1101 all parameters are included.
1102 :param data_set: The set of data to be filtered: tests, suites,
1104 :param continue_on_error: Continue if there is error while reading the
1105 data. The Item will be empty then
1106 :type element: pandas.Series
1109 :type continue_on_error: bool
1110 :returns: Filtered data.
1111 :rtype pandas.Series
1115 if element["filter"] in ("all", "template"):
1118 cond = InputData._condition(element["filter"])
1119 logging.debug(" Filter: {0}".format(cond))
1121 logging.error(" No filter defined.")
1125 params = element.get("parameters", None)
1129 for job, builds in element["data"].items():
1130 data[job] = pd.Series()
1131 for build in builds:
1132 data[job][str(build)] = pd.Series()
1134 data_iter = self.data[job][str(build)][data_set].\
1137 if continue_on_error:
1141 for test_ID, test_data in data_iter:
1142 if eval(cond, {"tags": test_data.get("tags", "")}):
1143 data[job][str(build)][test_ID] = pd.Series()
1145 for param, val in test_data.items():
1146 data[job][str(build)][test_ID][param] = val
1148 for param in params:
1150 data[job][str(build)][test_ID][param] =\
1153 data[job][str(build)][test_ID][param] =\
1157 except (KeyError, IndexError, ValueError) as err:
1158 logging.error(" Missing mandatory parameter in the element "
1159 "specification: {0}".format(err))
1161 except AttributeError:
1164 logging.error(" The filter '{0}' is not correct. Check if all "
1165 "tags are enclosed by apostrophes.".format(cond))
1169 def merge_data(data):
1170 """Merge data from more jobs and builds to a simple data structure.
1172 The output data structure is:
1174 - test (suite) 1 ID:
1180 - test (suite) n ID:
1183 :param data: Data to merge.
1184 :type data: pandas.Series
1185 :returns: Merged data.
1186 :rtype: pandas.Series
1189 logging.info(" Merging data ...")
1191 merged_data = pd.Series()
1192 for _, builds in data.iteritems():
1193 for _, item in builds.iteritems():
1194 for ID, item_data in item.iteritems():
1195 merged_data[ID] = item_data