1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Data pre-processing
16 - extract data from output.xml files generated by Jenkins jobs and store in
18 - provide access to the data.
19 - filter the data using tags,
22 import multiprocessing
28 from robot.api import ExecutionResult, ResultVisitor
29 from robot import errors
30 from collections import OrderedDict
31 from string import replace
33 from jumpavg.AvgStdevMetadataFactory import AvgStdevMetadataFactory
35 from input_data_files import download_and_unzip_data_file
36 from utils import Worker
39 class ExecutionChecker(ResultVisitor):
40 """Class to traverse through the test suite structure.
42 The functionality implemented in this class generates a json structure:
48 "generated": "Timestamp",
49 "version": "SUT version",
50 "job": "Jenkins job name",
51 "build": "Information about the build"
54 "Suite long name 1": {
56 "doc": "Suite 1 documentation",
57 "parent": "Suite 1 parent",
58 "level": "Level of the suite in the suite hierarchy"
60 "Suite long name N": {
62 "doc": "Suite N documentation",
63 "parent": "Suite 2 parent",
64 "level": "Level of the suite in the suite hierarchy"
71 "parent": "Name of the parent of the test",
72 "doc": "Test documentation",
73 "msg": "Test message",
74 "vat-history": "DUT1 and DUT2 VAT History",
75 "show-run": "Show Run",
76 "tags": ["tag 1", "tag 2", "tag n"],
78 "status": "PASS" | "FAIL",
120 "parent": "Name of the parent of the test",
121 "doc": "Test documentation",
122 "msg": "Test message",
123 "tags": ["tag 1", "tag 2", "tag n"],
125 "status": "PASS" | "FAIL",
132 "parent": "Name of the parent of the test",
133 "doc": "Test documentation",
134 "msg": "Test message",
135 "tags": ["tag 1", "tag 2", "tag n"],
136 "type": "MRR" | "BMRR",
137 "status": "PASS" | "FAIL",
139 "receive-rate": AvgStdevMetadata,
143 # TODO: Remove when definitely no NDRPDRDISC tests are used:
147 "parent": "Name of the parent of the test",
148 "doc": "Test documentation",
149 "msg": "Test message",
150 "tags": ["tag 1", "tag 2", "tag n"],
151 "type": "PDR" | "NDR",
152 "status": "PASS" | "FAIL",
153 "throughput": { # Only type: "PDR" | "NDR"
155 "unit": "pps" | "bps" | "percentage"
157 "latency": { # Only type: "PDR" | "NDR"
164 "50": { # Only for NDR
169 "10": { # Only for NDR
181 "50": { # Only for NDR
186 "10": { # Only for NDR
193 "lossTolerance": "lossTolerance", # Only type: "PDR"
194 "vat-history": "DUT1 and DUT2 VAT History"
195 "show-run": "Show Run"
207 "metadata": { # Optional
208 "version": "VPP version",
209 "job": "Jenkins job name",
210 "build": "Information about the build"
214 "doc": "Suite 1 documentation",
215 "parent": "Suite 1 parent",
216 "level": "Level of the suite in the suite hierarchy"
219 "doc": "Suite N documentation",
220 "parent": "Suite 2 parent",
221 "level": "Level of the suite in the suite hierarchy"
227 "parent": "Name of the parent of the test",
228 "doc": "Test documentation"
229 "msg": "Test message"
230 "tags": ["tag 1", "tag 2", "tag n"],
231 "vat-history": "DUT1 and DUT2 VAT History"
232 "show-run": "Show Run"
233 "status": "PASS" | "FAIL"
241 .. note:: ID is the lowercase full path to the test.
244 # TODO: Remove when definitely no NDRPDRDISC tests are used:
245 REGEX_RATE = re.compile(r'^[\D\d]*FINAL_RATE:\s(\d+\.\d+)\s(\w+)')
247 REGEX_NDRPDR_RATE = re.compile(r'NDR_LOWER:\s(\d+.\d+).*\n.*\n'
248 r'NDR_UPPER:\s(\d+.\d+).*\n'
249 r'PDR_LOWER:\s(\d+.\d+).*\n.*\n'
250 r'PDR_UPPER:\s(\d+.\d+)')
252 # TODO: Remove when definitely no NDRPDRDISC tests are used:
253 REGEX_LAT_NDR = re.compile(r'^[\D\d]*'
254 r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
255 r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
256 r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
257 r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
258 r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
259 r'\s\'(-?\d+/-?\d+/-?\d+)\'\]')
261 REGEX_LAT_PDR = re.compile(r'^[\D\d]*'
262 r'LAT_\d+%PDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
263 r'\s\'(-?\d+/-?\d+/-?\d+)\'\][\D\d]*')
265 REGEX_NDRPDR_LAT = re.compile(r'LATENCY.*\[\'(.*)\', \'(.*)\'\]\s\n.*\n.*\n'
266 r'LATENCY.*\[\'(.*)\', \'(.*)\'\]')
268 REGEX_TOLERANCE = re.compile(r'^[\D\d]*LOSS_ACCEPTANCE:\s(\d*\.\d*)\s'
271 REGEX_VERSION_VPP = re.compile(r"(return STDOUT Version:\s*)(.*)")
273 REGEX_VERSION_DPDK = re.compile(r"(return STDOUT testpmd)([\d\D\n]*)"
274 r"(RTE Version: 'DPDK )(.*)(')")
276 REGEX_TCP = re.compile(r'Total\s(rps|cps|throughput):\s([0-9]*).*$')
278 REGEX_MRR = re.compile(r'MaxReceivedRate_Results\s\[pkts/(\d*)sec\]:\s'
279 r'tx\s(\d*),\srx\s(\d*)')
281 REGEX_BMRR = re.compile(r'Maximum Receive Rate trial results'
282 r' in packets per second: \[(.*)\]')
284 REGEX_TC_TAG = re.compile(r'\d+[tT]\d+[cC]')
286 REGEX_TC_NAME_OLD = re.compile(r'-\d+[tT]\d+[cC]-')
288 REGEX_TC_NAME_NEW = re.compile(r'-\d+[cC]-')
290 REGEX_TC_NUMBER = re.compile(r'tc[0-9]{2}-')
292 def __init__(self, metadata, mapping, ignore):
295 :param metadata: Key-value pairs to be included in "metadata" part of
297 :param mapping: Mapping of the old names of test cases to the new
299 :param ignore: List of TCs to be ignored.
305 # Type of message to parse out from the test messages
306 self._msg_type = None
312 self._timestamp = None
314 # Mapping of TCs long names
315 self._mapping = mapping
318 self._ignore = ignore
320 # Number of VAT History messages found:
322 # 1 - VAT History of DUT1
323 # 2 - VAT History of DUT2
324 self._lookup_kw_nr = 0
325 self._vat_history_lookup_nr = 0
327 # Number of Show Running messages found
329 # 1 - Show run message found
330 self._show_run_lookup_nr = 0
332 # Test ID of currently processed test- the lowercase full path to the
336 # The main data structure
338 "metadata": OrderedDict(),
339 "suites": OrderedDict(),
340 "tests": OrderedDict()
343 # Save the provided metadata
344 for key, val in metadata.items():
345 self._data["metadata"][key] = val
347 # Dictionary defining the methods used to parse different types of
350 "timestamp": self._get_timestamp,
351 "vpp-version": self._get_vpp_version,
352 "dpdk-version": self._get_dpdk_version,
353 "teardown-vat-history": self._get_vat_history,
354 "test-show-runtime": self._get_show_run
359 """Getter - Data parsed from the XML file.
361 :returns: Data parsed from the XML file.
366 def _get_vpp_version(self, msg):
367 """Called when extraction of VPP version is required.
369 :param msg: Message to process.
374 if msg.message.count("return STDOUT Version:"):
375 self._version = str(re.search(self.REGEX_VERSION_VPP, msg.message).
377 self._data["metadata"]["version"] = self._version
378 self._msg_type = None
380 def _get_dpdk_version(self, msg):
381 """Called when extraction of DPDK version is required.
383 :param msg: Message to process.
388 if msg.message.count("return STDOUT testpmd"):
390 self._version = str(re.search(
391 self.REGEX_VERSION_DPDK, msg.message). group(4))
392 self._data["metadata"]["version"] = self._version
396 self._msg_type = None
398 def _get_timestamp(self, msg):
399 """Called when extraction of timestamp is required.
401 :param msg: Message to process.
406 self._timestamp = msg.timestamp[:14]
407 self._data["metadata"]["generated"] = self._timestamp
408 self._msg_type = None
410 def _get_vat_history(self, msg):
411 """Called when extraction of VAT command history is required.
413 :param msg: Message to process.
417 if msg.message.count("VAT command history:"):
418 self._vat_history_lookup_nr += 1
419 if self._vat_history_lookup_nr == 1:
420 self._data["tests"][self._test_ID]["vat-history"] = str()
422 self._msg_type = None
423 text = re.sub("[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3} "
424 "VAT command history:", "", msg.message, count=1). \
425 replace("\n\n", "\n").replace('\n', ' |br| ').\
426 replace('\r', '').replace('"', "'")
428 self._data["tests"][self._test_ID]["vat-history"] += " |br| "
429 self._data["tests"][self._test_ID]["vat-history"] += \
430 "**DUT" + str(self._vat_history_lookup_nr) + ":** " + text
432 def _get_show_run(self, msg):
433 """Called when extraction of VPP operational data (output of CLI command
434 Show Runtime) is required.
436 :param msg: Message to process.
440 if msg.message.count("return STDOUT Thread "):
441 self._show_run_lookup_nr += 1
442 if self._lookup_kw_nr == 1 and self._show_run_lookup_nr == 1:
443 self._data["tests"][self._test_ID]["show-run"] = str()
444 if self._lookup_kw_nr > 1:
445 self._msg_type = None
446 if self._show_run_lookup_nr == 1:
447 text = msg.message.replace("vat# ", "").\
448 replace("return STDOUT ", "").replace("\n\n", "\n").\
449 replace('\n', ' |br| ').\
450 replace('\r', '').replace('"', "'")
452 self._data["tests"][self._test_ID]["show-run"] += " |br| "
453 self._data["tests"][self._test_ID]["show-run"] += \
454 "**DUT" + str(self._lookup_kw_nr) + ":** |br| " + text
458 # TODO: Remove when definitely no NDRPDRDISC tests are used:
459 def _get_latency(self, msg, test_type):
460 """Get the latency data from the test message.
462 :param msg: Message to be parsed.
463 :param test_type: Type of the test - NDR or PDR.
466 :returns: Latencies parsed from the message.
470 if test_type == "NDR":
471 groups = re.search(self.REGEX_LAT_NDR, msg)
472 groups_range = range(1, 7)
473 elif test_type == "PDR":
474 groups = re.search(self.REGEX_LAT_PDR, msg)
475 groups_range = range(1, 3)
480 for idx in groups_range:
482 lat = [int(item) for item in str(groups.group(idx)).split('/')]
483 except (AttributeError, ValueError):
485 latencies.append(lat)
487 keys = ("min", "avg", "max")
495 latency["direction1"]["100"] = dict(zip(keys, latencies[0]))
496 latency["direction2"]["100"] = dict(zip(keys, latencies[1]))
497 if test_type == "NDR":
498 latency["direction1"]["50"] = dict(zip(keys, latencies[2]))
499 latency["direction2"]["50"] = dict(zip(keys, latencies[3]))
500 latency["direction1"]["10"] = dict(zip(keys, latencies[4]))
501 latency["direction2"]["10"] = dict(zip(keys, latencies[5]))
505 def _get_ndrpdr_throughput(self, msg):
506 """Get NDR_LOWER, NDR_UPPER, PDR_LOWER and PDR_UPPER from the test
509 :param msg: The test message to be parsed.
511 :returns: Parsed data as a dict and the status (PASS/FAIL).
512 :rtype: tuple(dict, str)
516 "NDR": {"LOWER": -1.0, "UPPER": -1.0},
517 "PDR": {"LOWER": -1.0, "UPPER": -1.0}
520 groups = re.search(self.REGEX_NDRPDR_RATE, msg)
522 if groups is not None:
524 throughput["NDR"]["LOWER"] = float(groups.group(1))
525 throughput["NDR"]["UPPER"] = float(groups.group(2))
526 throughput["PDR"]["LOWER"] = float(groups.group(3))
527 throughput["PDR"]["UPPER"] = float(groups.group(4))
529 except (IndexError, ValueError):
532 return throughput, status
534 def _get_ndrpdr_latency(self, msg):
535 """Get LATENCY from the test message.
537 :param msg: The test message to be parsed.
539 :returns: Parsed data as a dict and the status (PASS/FAIL).
540 :rtype: tuple(dict, str)
545 "direction1": {"min": -1.0, "avg": -1.0, "max": -1.0},
546 "direction2": {"min": -1.0, "avg": -1.0, "max": -1.0}
549 "direction1": {"min": -1.0, "avg": -1.0, "max": -1.0},
550 "direction2": {"min": -1.0, "avg": -1.0, "max": -1.0}
554 groups = re.search(self.REGEX_NDRPDR_LAT, msg)
556 if groups is not None:
557 keys = ("min", "avg", "max")
559 latency["NDR"]["direction1"] = dict(
560 zip(keys, [float(l) for l in groups.group(1).split('/')]))
561 latency["NDR"]["direction2"] = dict(
562 zip(keys, [float(l) for l in groups.group(2).split('/')]))
563 latency["PDR"]["direction1"] = dict(
564 zip(keys, [float(l) for l in groups.group(3).split('/')]))
565 latency["PDR"]["direction2"] = dict(
566 zip(keys, [float(l) for l in groups.group(4).split('/')]))
568 except (IndexError, ValueError):
571 return latency, status
573 def visit_suite(self, suite):
574 """Implements traversing through the suite and its direct children.
576 :param suite: Suite to process.
580 if self.start_suite(suite) is not False:
581 suite.suites.visit(self)
582 suite.tests.visit(self)
583 self.end_suite(suite)
585 def start_suite(self, suite):
586 """Called when suite starts.
588 :param suite: Suite to process.
594 parent_name = suite.parent.name
595 except AttributeError:
598 doc_str = suite.doc.replace('"', "'").replace('\n', ' ').\
599 replace('\r', '').replace('*[', ' |br| *[').replace("*", "**")
600 doc_str = replace(doc_str, ' |br| *[', '*[', maxreplace=1)
602 self._data["suites"][suite.longname.lower().replace('"', "'").
603 replace(" ", "_")] = {
604 "name": suite.name.lower(),
606 "parent": parent_name,
607 "level": len(suite.longname.split("."))
610 suite.keywords.visit(self)
612 def end_suite(self, suite):
613 """Called when suite ends.
615 :param suite: Suite to process.
621 def visit_test(self, test):
622 """Implements traversing through the test.
624 :param test: Test to process.
628 if self.start_test(test) is not False:
629 test.keywords.visit(self)
632 def start_test(self, test):
633 """Called when test starts.
635 :param test: Test to process.
640 longname_orig = test.longname.lower()
642 # Check the ignore list
643 if longname_orig in self._ignore:
646 tags = [str(tag) for tag in test.tags]
649 # Change the TC long name and name if defined in the mapping table
650 longname = self._mapping.get(longname_orig, None)
651 if longname is not None:
652 name = longname.split('.')[-1]
653 logging.debug("{0}\n{1}\n{2}\n{3}".format(
654 self._data["metadata"], longname_orig, longname, name))
656 longname = longname_orig
657 name = test.name.lower()
659 # Remove TC number from the TC long name (backward compatibility):
660 self._test_ID = re.sub(self.REGEX_TC_NUMBER, "", longname)
661 # Remove TC number from the TC name (not needed):
662 test_result["name"] = re.sub(self.REGEX_TC_NUMBER, "", name)
664 test_result["parent"] = test.parent.name.lower()
665 test_result["tags"] = tags
666 doc_str = test.doc.replace('"', "'").replace('\n', ' '). \
667 replace('\r', '').replace('[', ' |br| [')
668 test_result["doc"] = replace(doc_str, ' |br| [', '[', maxreplace=1)
669 test_result["msg"] = test.message.replace('\n', ' |br| '). \
670 replace('\r', '').replace('"', "'")
671 test_result["type"] = "FUNC"
672 test_result["status"] = test.status
674 if "PERFTEST" in tags:
675 # Replace info about cores (e.g. -1c-) with the info about threads
676 # and cores (e.g. -1t1c-) in the long test case names and in the
677 # test case names if necessary.
678 groups = re.search(self.REGEX_TC_NAME_OLD, self._test_ID)
681 for tag in test_result["tags"]:
682 groups = re.search(self.REGEX_TC_TAG, tag)
688 self._test_ID = re.sub(self.REGEX_TC_NAME_NEW,
689 "-{0}-".format(tag_tc.lower()),
692 test_result["name"] = re.sub(self.REGEX_TC_NAME_NEW,
693 "-{0}-".format(tag_tc.lower()),
697 test_result["status"] = "FAIL"
698 self._data["tests"][self._test_ID] = test_result
699 logging.error("The test '{0}' has no or more than one "
700 "multi-threading tags.".format(self._test_ID))
703 if test.status == "PASS" and ("NDRPDRDISC" in tags or
708 # TODO: Remove when definitely no NDRPDRDISC tests are used:
709 if "NDRDISC" in tags:
710 test_result["type"] = "NDR"
711 # TODO: Remove when definitely no NDRPDRDISC tests are used:
712 elif "PDRDISC" in tags:
713 test_result["type"] = "PDR"
714 elif "NDRPDR" in tags:
715 test_result["type"] = "NDRPDR"
717 test_result["type"] = "TCP"
719 test_result["type"] = "MRR"
720 elif "FRMOBL" in tags or "BMRR" in tags:
721 test_result["type"] = "BMRR"
723 test_result["status"] = "FAIL"
724 self._data["tests"][self._test_ID] = test_result
727 # TODO: Remove when definitely no NDRPDRDISC tests are used:
728 if test_result["type"] in ("NDR", "PDR"):
730 rate_value = str(re.search(
731 self.REGEX_RATE, test.message).group(1))
732 except AttributeError:
735 rate_unit = str(re.search(
736 self.REGEX_RATE, test.message).group(2))
737 except AttributeError:
740 test_result["throughput"] = dict()
741 test_result["throughput"]["value"] = \
742 int(rate_value.split('.')[0])
743 test_result["throughput"]["unit"] = rate_unit
744 test_result["latency"] = \
745 self._get_latency(test.message, test_result["type"])
746 if test_result["type"] == "PDR":
747 test_result["lossTolerance"] = str(re.search(
748 self.REGEX_TOLERANCE, test.message).group(1))
750 elif test_result["type"] in ("NDRPDR", ):
751 test_result["throughput"], test_result["status"] = \
752 self._get_ndrpdr_throughput(test.message)
753 test_result["latency"], test_result["status"] = \
754 self._get_ndrpdr_latency(test.message)
756 elif test_result["type"] in ("TCP", ):
757 groups = re.search(self.REGEX_TCP, test.message)
758 test_result["result"] = int(groups.group(2))
760 elif test_result["type"] in ("MRR", "BMRR"):
761 test_result["result"] = dict()
762 groups = re.search(self.REGEX_BMRR, test.message)
763 if groups is not None:
764 items_str = groups.group(1)
765 items_float = [float(item.strip()) for item
766 in items_str.split(",")]
767 test_result["result"]["receive-rate"] = \
768 AvgStdevMetadataFactory.from_data(items_float)
770 groups = re.search(self.REGEX_MRR, test.message)
771 test_result["result"]["receive-rate"] = \
772 AvgStdevMetadataFactory.from_data([
773 float(groups.group(3)) / float(groups.group(1)), ])
775 self._data["tests"][self._test_ID] = test_result
777 def end_test(self, test):
778 """Called when test ends.
780 :param test: Test to process.
786 def visit_keyword(self, keyword):
787 """Implements traversing through the keyword and its child keywords.
789 :param keyword: Keyword to process.
790 :type keyword: Keyword
793 if self.start_keyword(keyword) is not False:
794 self.end_keyword(keyword)
796 def start_keyword(self, keyword):
797 """Called when keyword starts. Default implementation does nothing.
799 :param keyword: Keyword to process.
800 :type keyword: Keyword
804 if keyword.type == "setup":
805 self.visit_setup_kw(keyword)
806 elif keyword.type == "teardown":
807 self._lookup_kw_nr = 0
808 self.visit_teardown_kw(keyword)
810 self._lookup_kw_nr = 0
811 self.visit_test_kw(keyword)
812 except AttributeError:
815 def end_keyword(self, keyword):
816 """Called when keyword ends. Default implementation does nothing.
818 :param keyword: Keyword to process.
819 :type keyword: Keyword
824 def visit_test_kw(self, test_kw):
825 """Implements traversing through the test keyword and its child
828 :param test_kw: Keyword to process.
829 :type test_kw: Keyword
832 for keyword in test_kw.keywords:
833 if self.start_test_kw(keyword) is not False:
834 self.visit_test_kw(keyword)
835 self.end_test_kw(keyword)
837 def start_test_kw(self, test_kw):
838 """Called when test keyword starts. Default implementation does
841 :param test_kw: Keyword to process.
842 :type test_kw: Keyword
845 if test_kw.name.count("Show Runtime Counters On All Duts"):
846 self._lookup_kw_nr += 1
847 self._show_run_lookup_nr = 0
848 self._msg_type = "test-show-runtime"
849 elif test_kw.name.count("Start The L2fwd Test") and not self._version:
850 self._msg_type = "dpdk-version"
853 test_kw.messages.visit(self)
855 def end_test_kw(self, test_kw):
856 """Called when keyword ends. Default implementation does nothing.
858 :param test_kw: Keyword to process.
859 :type test_kw: Keyword
864 def visit_setup_kw(self, setup_kw):
865 """Implements traversing through the teardown keyword and its child
868 :param setup_kw: Keyword to process.
869 :type setup_kw: Keyword
872 for keyword in setup_kw.keywords:
873 if self.start_setup_kw(keyword) is not False:
874 self.visit_setup_kw(keyword)
875 self.end_setup_kw(keyword)
877 def start_setup_kw(self, setup_kw):
878 """Called when teardown keyword starts. Default implementation does
881 :param setup_kw: Keyword to process.
882 :type setup_kw: Keyword
885 if setup_kw.name.count("Show Vpp Version On All Duts") \
886 and not self._version:
887 self._msg_type = "vpp-version"
889 elif setup_kw.name.count("Setup performance global Variables") \
890 and not self._timestamp:
891 self._msg_type = "timestamp"
894 setup_kw.messages.visit(self)
896 def end_setup_kw(self, setup_kw):
897 """Called when keyword ends. Default implementation does nothing.
899 :param setup_kw: Keyword to process.
900 :type setup_kw: Keyword
905 def visit_teardown_kw(self, teardown_kw):
906 """Implements traversing through the teardown keyword and its child
909 :param teardown_kw: Keyword to process.
910 :type teardown_kw: Keyword
913 for keyword in teardown_kw.keywords:
914 if self.start_teardown_kw(keyword) is not False:
915 self.visit_teardown_kw(keyword)
916 self.end_teardown_kw(keyword)
918 def start_teardown_kw(self, teardown_kw):
919 """Called when teardown keyword starts. Default implementation does
922 :param teardown_kw: Keyword to process.
923 :type teardown_kw: Keyword
927 if teardown_kw.name.count("Show Vat History On All Duts"):
928 self._vat_history_lookup_nr = 0
929 self._msg_type = "teardown-vat-history"
930 teardown_kw.messages.visit(self)
932 def end_teardown_kw(self, teardown_kw):
933 """Called when keyword ends. Default implementation does nothing.
935 :param teardown_kw: Keyword to process.
936 :type teardown_kw: Keyword
941 def visit_message(self, msg):
942 """Implements visiting the message.
944 :param msg: Message to process.
948 if self.start_message(msg) is not False:
949 self.end_message(msg)
951 def start_message(self, msg):
952 """Called when message starts. Get required information from messages:
955 :param msg: Message to process.
961 self.parse_msg[self._msg_type](msg)
963 def end_message(self, msg):
964 """Called when message ends. Default implementation does nothing.
966 :param msg: Message to process.
973 class InputData(object):
976 The data is extracted from output.xml files generated by Jenkins jobs and
977 stored in pandas' DataFrames.
983 (as described in ExecutionChecker documentation)
985 (as described in ExecutionChecker documentation)
987 (as described in ExecutionChecker documentation)
990 def __init__(self, spec):
993 :param spec: Specification.
994 :type spec: Specification
1001 self._input_data = pd.Series()
1005 """Getter - Input data.
1007 :returns: Input data
1008 :rtype: pandas.Series
1010 return self._input_data
1012 def metadata(self, job, build):
1013 """Getter - metadata
1015 :param job: Job which metadata we want.
1016 :param build: Build which metadata we want.
1020 :rtype: pandas.Series
1023 return self.data[job][build]["metadata"]
1025 def suites(self, job, build):
1028 :param job: Job which suites we want.
1029 :param build: Build which suites we want.
1033 :rtype: pandas.Series
1036 return self.data[job][str(build)]["suites"]
1038 def tests(self, job, build):
1041 :param job: Job which tests we want.
1042 :param build: Build which tests we want.
1046 :rtype: pandas.Series
1049 return self.data[job][build]["tests"]
1051 def _parse_tests(self, job, build, log):
1052 """Process data from robot output.xml file and return JSON structured
1055 :param job: The name of job which build output data will be processed.
1056 :param build: The build which output data will be processed.
1057 :param log: List of log messages.
1060 :type log: list of tuples (severity, msg)
1061 :returns: JSON data structure.
1070 with open(build["file-name"], 'r') as data_file:
1072 result = ExecutionResult(data_file)
1073 except errors.DataError as err:
1074 log.append(("ERROR", "Error occurred while parsing output.xml: "
1077 checker = ExecutionChecker(metadata, self._cfg.mapping,
1079 result.visit(checker)
1083 def _download_and_parse_build(self, pid, data_queue, job, build, repeat):
1084 """Download and parse the input data file.
1086 :param pid: PID of the process executing this method.
1087 :param data_queue: Shared memory between processes. Queue which keeps
1088 the result data. This data is then read by the main process and used
1089 in further processing.
1090 :param job: Name of the Jenkins job which generated the processed input
1092 :param build: Information about the Jenkins build which generated the
1093 processed input file.
1094 :param repeat: Repeat the download specified number of times if not
1097 :type data_queue: multiprocessing.Manager().Queue()
1105 logging.info(" Processing the job/build: {0}: {1}".
1106 format(job, build["build"]))
1108 logs.append(("INFO", " Processing the job/build: {0}: {1}".
1109 format(job, build["build"])))
1116 success = download_and_unzip_data_file(self._cfg, job, build, pid,
1122 logs.append(("ERROR", "It is not possible to download the input "
1123 "data file from the job '{job}', build "
1124 "'{build}', or it is damaged. Skipped.".
1125 format(job=job, build=build["build"])))
1127 logs.append(("INFO", " Processing data from the build '{0}' ...".
1128 format(build["build"])))
1129 data = self._parse_tests(job, build, logs)
1131 logs.append(("ERROR", "Input data file from the job '{job}', "
1132 "build '{build}' is damaged. Skipped.".
1133 format(job=job, build=build["build"])))
1138 remove(build["file-name"])
1139 except OSError as err:
1140 logs.append(("ERROR", "Cannot remove the file '{0}': {1}".
1141 format(build["file-name"], err)))
1142 logs.append(("INFO", " Done."))
1151 data_queue.put(result)
1153 def download_and_parse_data(self, repeat=1):
1154 """Download the input data files, parse input data from input files and
1155 store in pandas' Series.
1157 :param repeat: Repeat the download specified number of times if not
1162 logging.info("Downloading and parsing input files ...")
1164 work_queue = multiprocessing.JoinableQueue()
1165 manager = multiprocessing.Manager()
1166 data_queue = manager.Queue()
1167 cpus = multiprocessing.cpu_count()
1170 for cpu in range(cpus):
1171 worker = Worker(work_queue,
1173 self._download_and_parse_build)
1174 worker.daemon = True
1176 workers.append(worker)
1177 os.system("taskset -p -c {0} {1} > /dev/null 2>&1".
1178 format(cpu, worker.pid))
1180 for job, builds in self._cfg.builds.items():
1181 for build in builds:
1182 work_queue.put((job, build, repeat))
1186 logging.info("Done.")
1188 while not data_queue.empty():
1189 result = data_queue.get()
1192 build_nr = result["build"]["build"]
1195 data = result["data"]
1196 build_data = pd.Series({
1197 "metadata": pd.Series(data["metadata"].values(),
1198 index=data["metadata"].keys()),
1199 "suites": pd.Series(data["suites"].values(),
1200 index=data["suites"].keys()),
1201 "tests": pd.Series(data["tests"].values(),
1202 index=data["tests"].keys())})
1204 if self._input_data.get(job, None) is None:
1205 self._input_data[job] = pd.Series()
1206 self._input_data[job][str(build_nr)] = build_data
1208 self._cfg.set_input_file_name(job, build_nr,
1209 result["build"]["file-name"])
1211 self._cfg.set_input_state(job, build_nr, result["state"])
1213 for item in result["logs"]:
1214 if item[0] == "INFO":
1215 logging.info(item[1])
1216 elif item[0] == "ERROR":
1217 logging.error(item[1])
1218 elif item[0] == "DEBUG":
1219 logging.debug(item[1])
1220 elif item[0] == "CRITICAL":
1221 logging.critical(item[1])
1222 elif item[0] == "WARNING":
1223 logging.warning(item[1])
1227 # Terminate all workers
1228 for worker in workers:
1232 logging.info("Done.")
1235 def _end_of_tag(tag_filter, start=0, closer="'"):
1236 """Return the index of character in the string which is the end of tag.
1238 :param tag_filter: The string where the end of tag is being searched.
1239 :param start: The index where the searching is stated.
1240 :param closer: The character which is the tag closer.
1241 :type tag_filter: str
1244 :returns: The index of the tag closer.
1249 idx_opener = tag_filter.index(closer, start)
1250 return tag_filter.index(closer, idx_opener + 1)
1255 def _condition(tag_filter):
1256 """Create a conditional statement from the given tag filter.
1258 :param tag_filter: Filter based on tags from the element specification.
1259 :type tag_filter: str
1260 :returns: Conditional statement which can be evaluated.
1266 index = InputData._end_of_tag(tag_filter, index)
1270 tag_filter = tag_filter[:index] + " in tags" + tag_filter[index:]
1272 def filter_data(self, element, params=None, data_set="tests",
1273 continue_on_error=False):
1274 """Filter required data from the given jobs and builds.
1276 The output data structure is:
1280 - test (or suite) 1 ID:
1286 - test (or suite) n ID:
1293 :param element: Element which will use the filtered data.
1294 :param params: Parameters which will be included in the output. If None,
1295 all parameters are included.
1296 :param data_set: The set of data to be filtered: tests, suites,
1298 :param continue_on_error: Continue if there is error while reading the
1299 data. The Item will be empty then
1300 :type element: pandas.Series
1303 :type continue_on_error: bool
1304 :returns: Filtered data.
1305 :rtype pandas.Series
1309 if element["filter"] in ("all", "template"):
1312 cond = InputData._condition(element["filter"])
1313 logging.debug(" Filter: {0}".format(cond))
1315 logging.error(" No filter defined.")
1319 params = element.get("parameters", None)
1321 params.append("type")
1325 for job, builds in element["data"].items():
1326 data[job] = pd.Series()
1327 for build in builds:
1328 data[job][str(build)] = pd.Series()
1330 data_iter = self.data[job][str(build)][data_set].\
1333 if continue_on_error:
1337 for test_ID, test_data in data_iter:
1338 if eval(cond, {"tags": test_data.get("tags", "")}):
1339 data[job][str(build)][test_ID] = pd.Series()
1341 for param, val in test_data.items():
1342 data[job][str(build)][test_ID][param] = val
1344 for param in params:
1346 data[job][str(build)][test_ID][param] =\
1349 data[job][str(build)][test_ID][param] =\
1353 except (KeyError, IndexError, ValueError) as err:
1354 logging.error(" Missing mandatory parameter in the element "
1355 "specification: {0}".format(err))
1357 except AttributeError:
1360 logging.error(" The filter '{0}' is not correct. Check if all "
1361 "tags are enclosed by apostrophes.".format(cond))
1365 def merge_data(data):
1366 """Merge data from more jobs and builds to a simple data structure.
1368 The output data structure is:
1370 - test (suite) 1 ID:
1376 - test (suite) n ID:
1379 :param data: Data to merge.
1380 :type data: pandas.Series
1381 :returns: Merged data.
1382 :rtype: pandas.Series
1385 logging.info(" Merging data ...")
1387 merged_data = pd.Series()
1388 for _, builds in data.iteritems():
1389 for _, item in builds.iteritems():
1390 for ID, item_data in item.iteritems():
1391 merged_data[ID] = item_data