1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Data pre-processing
16 - extract data from output.xml files generated by Jenkins jobs and store in
18 - provide access to the data.
19 - filter the data using tags,
22 import multiprocessing
28 from robot.api import ExecutionResult, ResultVisitor
29 from robot import errors
30 from collections import OrderedDict
31 from string import replace
33 from os.path import join
34 from datetime import datetime as dt
35 from datetime import timedelta
36 from jumpavg.AvgStdevMetadataFactory import AvgStdevMetadataFactory
38 from input_data_files import download_and_unzip_data_file
39 from utils import Worker
42 # Separator used in file names
46 class ExecutionChecker(ResultVisitor):
47 """Class to traverse through the test suite structure.
49 The functionality implemented in this class generates a json structure:
55 "generated": "Timestamp",
56 "version": "SUT version",
57 "job": "Jenkins job name",
58 "build": "Information about the build"
61 "Suite long name 1": {
63 "doc": "Suite 1 documentation",
64 "parent": "Suite 1 parent",
65 "level": "Level of the suite in the suite hierarchy"
67 "Suite long name N": {
69 "doc": "Suite N documentation",
70 "parent": "Suite 2 parent",
71 "level": "Level of the suite in the suite hierarchy"
78 "parent": "Name of the parent of the test",
79 "doc": "Test documentation",
80 "msg": "Test message",
81 "vat-history": "DUT1 and DUT2 VAT History",
82 "show-run": "Show Run",
83 "tags": ["tag 1", "tag 2", "tag n"],
85 "status": "PASS" | "FAIL",
127 "parent": "Name of the parent of the test",
128 "doc": "Test documentation",
129 "msg": "Test message",
130 "tags": ["tag 1", "tag 2", "tag n"],
132 "status": "PASS" | "FAIL",
139 "parent": "Name of the parent of the test",
140 "doc": "Test documentation",
141 "msg": "Test message",
142 "tags": ["tag 1", "tag 2", "tag n"],
143 "type": "MRR" | "BMRR",
144 "status": "PASS" | "FAIL",
146 "receive-rate": AvgStdevMetadata,
150 # TODO: Remove when definitely no NDRPDRDISC tests are used:
154 "parent": "Name of the parent of the test",
155 "doc": "Test documentation",
156 "msg": "Test message",
157 "tags": ["tag 1", "tag 2", "tag n"],
158 "type": "PDR" | "NDR",
159 "status": "PASS" | "FAIL",
160 "throughput": { # Only type: "PDR" | "NDR"
162 "unit": "pps" | "bps" | "percentage"
164 "latency": { # Only type: "PDR" | "NDR"
171 "50": { # Only for NDR
176 "10": { # Only for NDR
188 "50": { # Only for NDR
193 "10": { # Only for NDR
200 "lossTolerance": "lossTolerance", # Only type: "PDR"
201 "vat-history": "DUT1 and DUT2 VAT History"
202 "show-run": "Show Run"
214 "metadata": { # Optional
215 "version": "VPP version",
216 "job": "Jenkins job name",
217 "build": "Information about the build"
221 "doc": "Suite 1 documentation",
222 "parent": "Suite 1 parent",
223 "level": "Level of the suite in the suite hierarchy"
226 "doc": "Suite N documentation",
227 "parent": "Suite 2 parent",
228 "level": "Level of the suite in the suite hierarchy"
234 "parent": "Name of the parent of the test",
235 "doc": "Test documentation"
236 "msg": "Test message"
237 "tags": ["tag 1", "tag 2", "tag n"],
238 "vat-history": "DUT1 and DUT2 VAT History"
239 "show-run": "Show Run"
240 "status": "PASS" | "FAIL"
248 .. note:: ID is the lowercase full path to the test.
251 # TODO: Remove when definitely no NDRPDRDISC tests are used:
252 REGEX_RATE = re.compile(r'^[\D\d]*FINAL_RATE:\s(\d+\.\d+)\s(\w+)')
254 REGEX_NDRPDR_RATE = re.compile(r'NDR_LOWER:\s(\d+.\d+).*\n.*\n'
255 r'NDR_UPPER:\s(\d+.\d+).*\n'
256 r'PDR_LOWER:\s(\d+.\d+).*\n.*\n'
257 r'PDR_UPPER:\s(\d+.\d+)')
259 # TODO: Remove when definitely no NDRPDRDISC tests are used:
260 REGEX_LAT_NDR = re.compile(r'^[\D\d]*'
261 r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
262 r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
263 r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
264 r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
265 r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
266 r'\s\'(-?\d+/-?\d+/-?\d+)\'\]')
268 REGEX_LAT_PDR = re.compile(r'^[\D\d]*'
269 r'LAT_\d+%PDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
270 r'\s\'(-?\d+/-?\d+/-?\d+)\'\][\D\d]*')
272 REGEX_NDRPDR_LAT = re.compile(r'LATENCY.*\[\'(.*)\', \'(.*)\'\]\s\n.*\n.*\n'
273 r'LATENCY.*\[\'(.*)\', \'(.*)\'\]')
275 REGEX_TOLERANCE = re.compile(r'^[\D\d]*LOSS_ACCEPTANCE:\s(\d*\.\d*)\s'
278 REGEX_VERSION_VPP = re.compile(r"(return STDOUT Version:\s*)(.*)")
280 REGEX_VERSION_DPDK = re.compile(r"(return STDOUT testpmd)([\d\D\n]*)"
281 r"(RTE Version: 'DPDK )(.*)(')")
283 REGEX_TCP = re.compile(r'Total\s(rps|cps|throughput):\s([0-9]*).*$')
285 REGEX_MRR = re.compile(r'MaxReceivedRate_Results\s\[pkts/(\d*)sec\]:\s'
286 r'tx\s(\d*),\srx\s(\d*)')
288 REGEX_BMRR = re.compile(r'Maximum Receive Rate trial results'
289 r' in packets per second: \[(.*)\]')
291 REGEX_TC_TAG = re.compile(r'\d+[tT]\d+[cC]')
293 REGEX_TC_NAME_OLD = re.compile(r'-\d+[tT]\d+[cC]-')
295 REGEX_TC_NAME_NEW = re.compile(r'-\d+[cC]-')
297 REGEX_TC_NUMBER = re.compile(r'tc[0-9]{2}-')
299 def __init__(self, metadata, mapping, ignore):
302 :param metadata: Key-value pairs to be included in "metadata" part of
304 :param mapping: Mapping of the old names of test cases to the new
306 :param ignore: List of TCs to be ignored.
312 # Type of message to parse out from the test messages
313 self._msg_type = None
319 self._timestamp = None
321 # Mapping of TCs long names
322 self._mapping = mapping
325 self._ignore = ignore
327 # Number of VAT History messages found:
329 # 1 - VAT History of DUT1
330 # 2 - VAT History of DUT2
331 self._lookup_kw_nr = 0
332 self._vat_history_lookup_nr = 0
334 # Number of Show Running messages found
336 # 1 - Show run message found
337 self._show_run_lookup_nr = 0
339 # Test ID of currently processed test- the lowercase full path to the
343 # The main data structure
345 "metadata": OrderedDict(),
346 "suites": OrderedDict(),
347 "tests": OrderedDict()
350 # Save the provided metadata
351 for key, val in metadata.items():
352 self._data["metadata"][key] = val
354 # Dictionary defining the methods used to parse different types of
357 "timestamp": self._get_timestamp,
358 "vpp-version": self._get_vpp_version,
359 "dpdk-version": self._get_dpdk_version,
360 "teardown-vat-history": self._get_vat_history,
361 "test-show-runtime": self._get_show_run
366 """Getter - Data parsed from the XML file.
368 :returns: Data parsed from the XML file.
373 def _get_vpp_version(self, msg):
374 """Called when extraction of VPP version is required.
376 :param msg: Message to process.
381 if msg.message.count("return STDOUT Version:"):
382 self._version = str(re.search(self.REGEX_VERSION_VPP, msg.message).
384 self._data["metadata"]["version"] = self._version
385 self._msg_type = None
387 def _get_dpdk_version(self, msg):
388 """Called when extraction of DPDK version is required.
390 :param msg: Message to process.
395 if msg.message.count("return STDOUT testpmd"):
397 self._version = str(re.search(
398 self.REGEX_VERSION_DPDK, msg.message). group(4))
399 self._data["metadata"]["version"] = self._version
403 self._msg_type = None
405 def _get_timestamp(self, msg):
406 """Called when extraction of timestamp is required.
408 :param msg: Message to process.
413 self._timestamp = msg.timestamp[:14]
414 self._data["metadata"]["generated"] = self._timestamp
415 self._msg_type = None
417 def _get_vat_history(self, msg):
418 """Called when extraction of VAT command history is required.
420 :param msg: Message to process.
424 if msg.message.count("VAT command history:"):
425 self._vat_history_lookup_nr += 1
426 if self._vat_history_lookup_nr == 1:
427 self._data["tests"][self._test_ID]["vat-history"] = str()
429 self._msg_type = None
430 text = re.sub("[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3} "
431 "VAT command history:", "", msg.message, count=1). \
432 replace("\n\n", "\n").replace('\n', ' |br| ').\
433 replace('\r', '').replace('"', "'")
435 self._data["tests"][self._test_ID]["vat-history"] += " |br| "
436 self._data["tests"][self._test_ID]["vat-history"] += \
437 "**DUT" + str(self._vat_history_lookup_nr) + ":** " + text
439 def _get_show_run(self, msg):
440 """Called when extraction of VPP operational data (output of CLI command
441 Show Runtime) is required.
443 :param msg: Message to process.
447 if msg.message.count("return STDOUT Thread "):
448 self._show_run_lookup_nr += 1
449 if self._lookup_kw_nr == 1 and self._show_run_lookup_nr == 1:
450 self._data["tests"][self._test_ID]["show-run"] = str()
451 if self._lookup_kw_nr > 1:
452 self._msg_type = None
453 if self._show_run_lookup_nr == 1:
454 text = msg.message.replace("vat# ", "").\
455 replace("return STDOUT ", "").replace("\n\n", "\n").\
456 replace('\n', ' |br| ').\
457 replace('\r', '').replace('"', "'")
459 self._data["tests"][self._test_ID]["show-run"] += " |br| "
460 self._data["tests"][self._test_ID]["show-run"] += \
461 "**DUT" + str(self._lookup_kw_nr) + ":** |br| " + text
465 # TODO: Remove when definitely no NDRPDRDISC tests are used:
466 def _get_latency(self, msg, test_type):
467 """Get the latency data from the test message.
469 :param msg: Message to be parsed.
470 :param test_type: Type of the test - NDR or PDR.
473 :returns: Latencies parsed from the message.
477 if test_type == "NDR":
478 groups = re.search(self.REGEX_LAT_NDR, msg)
479 groups_range = range(1, 7)
480 elif test_type == "PDR":
481 groups = re.search(self.REGEX_LAT_PDR, msg)
482 groups_range = range(1, 3)
487 for idx in groups_range:
489 lat = [int(item) for item in str(groups.group(idx)).split('/')]
490 except (AttributeError, ValueError):
492 latencies.append(lat)
494 keys = ("min", "avg", "max")
502 latency["direction1"]["100"] = dict(zip(keys, latencies[0]))
503 latency["direction2"]["100"] = dict(zip(keys, latencies[1]))
504 if test_type == "NDR":
505 latency["direction1"]["50"] = dict(zip(keys, latencies[2]))
506 latency["direction2"]["50"] = dict(zip(keys, latencies[3]))
507 latency["direction1"]["10"] = dict(zip(keys, latencies[4]))
508 latency["direction2"]["10"] = dict(zip(keys, latencies[5]))
512 def _get_ndrpdr_throughput(self, msg):
513 """Get NDR_LOWER, NDR_UPPER, PDR_LOWER and PDR_UPPER from the test
516 :param msg: The test message to be parsed.
518 :returns: Parsed data as a dict and the status (PASS/FAIL).
519 :rtype: tuple(dict, str)
523 "NDR": {"LOWER": -1.0, "UPPER": -1.0},
524 "PDR": {"LOWER": -1.0, "UPPER": -1.0}
527 groups = re.search(self.REGEX_NDRPDR_RATE, msg)
529 if groups is not None:
531 throughput["NDR"]["LOWER"] = float(groups.group(1))
532 throughput["NDR"]["UPPER"] = float(groups.group(2))
533 throughput["PDR"]["LOWER"] = float(groups.group(3))
534 throughput["PDR"]["UPPER"] = float(groups.group(4))
536 except (IndexError, ValueError):
539 return throughput, status
541 def _get_ndrpdr_latency(self, msg):
542 """Get LATENCY from the test message.
544 :param msg: The test message to be parsed.
546 :returns: Parsed data as a dict and the status (PASS/FAIL).
547 :rtype: tuple(dict, str)
552 "direction1": {"min": -1.0, "avg": -1.0, "max": -1.0},
553 "direction2": {"min": -1.0, "avg": -1.0, "max": -1.0}
556 "direction1": {"min": -1.0, "avg": -1.0, "max": -1.0},
557 "direction2": {"min": -1.0, "avg": -1.0, "max": -1.0}
561 groups = re.search(self.REGEX_NDRPDR_LAT, msg)
563 if groups is not None:
564 keys = ("min", "avg", "max")
566 latency["NDR"]["direction1"] = dict(
567 zip(keys, [float(l) for l in groups.group(1).split('/')]))
568 latency["NDR"]["direction2"] = dict(
569 zip(keys, [float(l) for l in groups.group(2).split('/')]))
570 latency["PDR"]["direction1"] = dict(
571 zip(keys, [float(l) for l in groups.group(3).split('/')]))
572 latency["PDR"]["direction2"] = dict(
573 zip(keys, [float(l) for l in groups.group(4).split('/')]))
575 except (IndexError, ValueError):
578 return latency, status
580 def visit_suite(self, suite):
581 """Implements traversing through the suite and its direct children.
583 :param suite: Suite to process.
587 if self.start_suite(suite) is not False:
588 suite.suites.visit(self)
589 suite.tests.visit(self)
590 self.end_suite(suite)
592 def start_suite(self, suite):
593 """Called when suite starts.
595 :param suite: Suite to process.
601 parent_name = suite.parent.name
602 except AttributeError:
605 doc_str = suite.doc.replace('"', "'").replace('\n', ' ').\
606 replace('\r', '').replace('*[', ' |br| *[').replace("*", "**")
607 doc_str = replace(doc_str, ' |br| *[', '*[', maxreplace=1)
609 self._data["suites"][suite.longname.lower().replace('"', "'").
610 replace(" ", "_")] = {
611 "name": suite.name.lower(),
613 "parent": parent_name,
614 "level": len(suite.longname.split("."))
617 suite.keywords.visit(self)
619 def end_suite(self, suite):
620 """Called when suite ends.
622 :param suite: Suite to process.
628 def visit_test(self, test):
629 """Implements traversing through the test.
631 :param test: Test to process.
635 if self.start_test(test) is not False:
636 test.keywords.visit(self)
639 def start_test(self, test):
640 """Called when test starts.
642 :param test: Test to process.
647 longname_orig = test.longname.lower()
649 # Check the ignore list
650 if longname_orig in self._ignore:
653 tags = [str(tag) for tag in test.tags]
656 # Change the TC long name and name if defined in the mapping table
657 longname = self._mapping.get(longname_orig, None)
658 if longname is not None:
659 name = longname.split('.')[-1]
660 logging.debug("{0}\n{1}\n{2}\n{3}".format(
661 self._data["metadata"], longname_orig, longname, name))
663 longname = longname_orig
664 name = test.name.lower()
666 # Remove TC number from the TC long name (backward compatibility):
667 self._test_ID = re.sub(self.REGEX_TC_NUMBER, "", longname)
668 # Remove TC number from the TC name (not needed):
669 test_result["name"] = re.sub(self.REGEX_TC_NUMBER, "", name)
671 test_result["parent"] = test.parent.name.lower()
672 test_result["tags"] = tags
673 doc_str = test.doc.replace('"', "'").replace('\n', ' '). \
674 replace('\r', '').replace('[', ' |br| [')
675 test_result["doc"] = replace(doc_str, ' |br| [', '[', maxreplace=1)
676 test_result["msg"] = test.message.replace('\n', ' |br| '). \
677 replace('\r', '').replace('"', "'")
678 test_result["type"] = "FUNC"
679 test_result["status"] = test.status
681 if "PERFTEST" in tags:
682 # Replace info about cores (e.g. -1c-) with the info about threads
683 # and cores (e.g. -1t1c-) in the long test case names and in the
684 # test case names if necessary.
685 groups = re.search(self.REGEX_TC_NAME_OLD, self._test_ID)
688 for tag in test_result["tags"]:
689 groups = re.search(self.REGEX_TC_TAG, tag)
695 self._test_ID = re.sub(self.REGEX_TC_NAME_NEW,
696 "-{0}-".format(tag_tc.lower()),
699 test_result["name"] = re.sub(self.REGEX_TC_NAME_NEW,
700 "-{0}-".format(tag_tc.lower()),
704 test_result["status"] = "FAIL"
705 self._data["tests"][self._test_ID] = test_result
706 logging.debug("The test '{0}' has no or more than one "
707 "multi-threading tags.".format(self._test_ID))
708 logging.debug("Tags: {0}".format(test_result["tags"]))
711 if test.status == "PASS" and ("NDRPDRDISC" in tags or
716 # TODO: Remove when definitely no NDRPDRDISC tests are used:
717 if "NDRDISC" in tags:
718 test_result["type"] = "NDR"
719 # TODO: Remove when definitely no NDRPDRDISC tests are used:
720 elif "PDRDISC" in tags:
721 test_result["type"] = "PDR"
722 elif "NDRPDR" in tags:
723 test_result["type"] = "NDRPDR"
725 test_result["type"] = "TCP"
727 test_result["type"] = "MRR"
728 elif "FRMOBL" in tags or "BMRR" in tags:
729 test_result["type"] = "BMRR"
731 test_result["status"] = "FAIL"
732 self._data["tests"][self._test_ID] = test_result
735 # TODO: Remove when definitely no NDRPDRDISC tests are used:
736 if test_result["type"] in ("NDR", "PDR"):
738 rate_value = str(re.search(
739 self.REGEX_RATE, test.message).group(1))
740 except AttributeError:
743 rate_unit = str(re.search(
744 self.REGEX_RATE, test.message).group(2))
745 except AttributeError:
748 test_result["throughput"] = dict()
749 test_result["throughput"]["value"] = \
750 int(rate_value.split('.')[0])
751 test_result["throughput"]["unit"] = rate_unit
752 test_result["latency"] = \
753 self._get_latency(test.message, test_result["type"])
754 if test_result["type"] == "PDR":
755 test_result["lossTolerance"] = str(re.search(
756 self.REGEX_TOLERANCE, test.message).group(1))
758 elif test_result["type"] in ("NDRPDR", ):
759 test_result["throughput"], test_result["status"] = \
760 self._get_ndrpdr_throughput(test.message)
761 test_result["latency"], test_result["status"] = \
762 self._get_ndrpdr_latency(test.message)
764 elif test_result["type"] in ("TCP", ):
765 groups = re.search(self.REGEX_TCP, test.message)
766 test_result["result"] = int(groups.group(2))
768 elif test_result["type"] in ("MRR", "BMRR"):
769 test_result["result"] = dict()
770 groups = re.search(self.REGEX_BMRR, test.message)
771 if groups is not None:
772 items_str = groups.group(1)
773 items_float = [float(item.strip()) for item
774 in items_str.split(",")]
775 metadata = AvgStdevMetadataFactory.from_data(items_float)
776 # Next two lines have been introduced in CSIT-1179,
777 # to be removed in CSIT-1180.
780 test_result["result"]["receive-rate"] = metadata
782 groups = re.search(self.REGEX_MRR, test.message)
783 test_result["result"]["receive-rate"] = \
784 AvgStdevMetadataFactory.from_data([
785 float(groups.group(3)) / float(groups.group(1)), ])
787 self._data["tests"][self._test_ID] = test_result
789 def end_test(self, test):
790 """Called when test ends.
792 :param test: Test to process.
798 def visit_keyword(self, keyword):
799 """Implements traversing through the keyword and its child keywords.
801 :param keyword: Keyword to process.
802 :type keyword: Keyword
805 if self.start_keyword(keyword) is not False:
806 self.end_keyword(keyword)
808 def start_keyword(self, keyword):
809 """Called when keyword starts. Default implementation does nothing.
811 :param keyword: Keyword to process.
812 :type keyword: Keyword
816 if keyword.type == "setup":
817 self.visit_setup_kw(keyword)
818 elif keyword.type == "teardown":
819 self._lookup_kw_nr = 0
820 self.visit_teardown_kw(keyword)
822 self._lookup_kw_nr = 0
823 self.visit_test_kw(keyword)
824 except AttributeError:
827 def end_keyword(self, keyword):
828 """Called when keyword ends. Default implementation does nothing.
830 :param keyword: Keyword to process.
831 :type keyword: Keyword
836 def visit_test_kw(self, test_kw):
837 """Implements traversing through the test keyword and its child
840 :param test_kw: Keyword to process.
841 :type test_kw: Keyword
844 for keyword in test_kw.keywords:
845 if self.start_test_kw(keyword) is not False:
846 self.visit_test_kw(keyword)
847 self.end_test_kw(keyword)
849 def start_test_kw(self, test_kw):
850 """Called when test keyword starts. Default implementation does
853 :param test_kw: Keyword to process.
854 :type test_kw: Keyword
857 if test_kw.name.count("Show Runtime Counters On All Duts"):
858 self._lookup_kw_nr += 1
859 self._show_run_lookup_nr = 0
860 self._msg_type = "test-show-runtime"
861 elif test_kw.name.count("Start The L2fwd Test") and not self._version:
862 self._msg_type = "dpdk-version"
865 test_kw.messages.visit(self)
867 def end_test_kw(self, test_kw):
868 """Called when keyword ends. Default implementation does nothing.
870 :param test_kw: Keyword to process.
871 :type test_kw: Keyword
876 def visit_setup_kw(self, setup_kw):
877 """Implements traversing through the teardown keyword and its child
880 :param setup_kw: Keyword to process.
881 :type setup_kw: Keyword
884 for keyword in setup_kw.keywords:
885 if self.start_setup_kw(keyword) is not False:
886 self.visit_setup_kw(keyword)
887 self.end_setup_kw(keyword)
889 def start_setup_kw(self, setup_kw):
890 """Called when teardown keyword starts. Default implementation does
893 :param setup_kw: Keyword to process.
894 :type setup_kw: Keyword
897 if setup_kw.name.count("Show Vpp Version On All Duts") \
898 and not self._version:
899 self._msg_type = "vpp-version"
901 elif setup_kw.name.count("Setup performance global Variables") \
902 and not self._timestamp:
903 self._msg_type = "timestamp"
906 setup_kw.messages.visit(self)
908 def end_setup_kw(self, setup_kw):
909 """Called when keyword ends. Default implementation does nothing.
911 :param setup_kw: Keyword to process.
912 :type setup_kw: Keyword
917 def visit_teardown_kw(self, teardown_kw):
918 """Implements traversing through the teardown keyword and its child
921 :param teardown_kw: Keyword to process.
922 :type teardown_kw: Keyword
925 for keyword in teardown_kw.keywords:
926 if self.start_teardown_kw(keyword) is not False:
927 self.visit_teardown_kw(keyword)
928 self.end_teardown_kw(keyword)
930 def start_teardown_kw(self, teardown_kw):
931 """Called when teardown keyword starts. Default implementation does
934 :param teardown_kw: Keyword to process.
935 :type teardown_kw: Keyword
939 if teardown_kw.name.count("Show Vat History On All Duts"):
940 self._vat_history_lookup_nr = 0
941 self._msg_type = "teardown-vat-history"
942 teardown_kw.messages.visit(self)
944 def end_teardown_kw(self, teardown_kw):
945 """Called when keyword ends. Default implementation does nothing.
947 :param teardown_kw: Keyword to process.
948 :type teardown_kw: Keyword
953 def visit_message(self, msg):
954 """Implements visiting the message.
956 :param msg: Message to process.
960 if self.start_message(msg) is not False:
961 self.end_message(msg)
963 def start_message(self, msg):
964 """Called when message starts. Get required information from messages:
967 :param msg: Message to process.
973 self.parse_msg[self._msg_type](msg)
975 def end_message(self, msg):
976 """Called when message ends. Default implementation does nothing.
978 :param msg: Message to process.
985 class InputData(object):
988 The data is extracted from output.xml files generated by Jenkins jobs and
989 stored in pandas' DataFrames.
995 (as described in ExecutionChecker documentation)
997 (as described in ExecutionChecker documentation)
999 (as described in ExecutionChecker documentation)
1002 def __init__(self, spec):
1005 :param spec: Specification.
1006 :type spec: Specification
1013 self._input_data = pd.Series()
1017 """Getter - Input data.
1019 :returns: Input data
1020 :rtype: pandas.Series
1022 return self._input_data
1024 def metadata(self, job, build):
1025 """Getter - metadata
1027 :param job: Job which metadata we want.
1028 :param build: Build which metadata we want.
1032 :rtype: pandas.Series
1035 return self.data[job][build]["metadata"]
1037 def suites(self, job, build):
1040 :param job: Job which suites we want.
1041 :param build: Build which suites we want.
1045 :rtype: pandas.Series
1048 return self.data[job][str(build)]["suites"]
1050 def tests(self, job, build):
1053 :param job: Job which tests we want.
1054 :param build: Build which tests we want.
1058 :rtype: pandas.Series
1061 return self.data[job][build]["tests"]
1063 def _parse_tests(self, job, build, log):
1064 """Process data from robot output.xml file and return JSON structured
1067 :param job: The name of job which build output data will be processed.
1068 :param build: The build which output data will be processed.
1069 :param log: List of log messages.
1072 :type log: list of tuples (severity, msg)
1073 :returns: JSON data structure.
1082 with open(build["file-name"], 'r') as data_file:
1084 result = ExecutionResult(data_file)
1085 except errors.DataError as err:
1086 log.append(("ERROR", "Error occurred while parsing output.xml: "
1089 checker = ExecutionChecker(metadata, self._cfg.mapping,
1091 result.visit(checker)
1095 def _download_and_parse_build(self, pid, data_queue, job, build, repeat):
1096 """Download and parse the input data file.
1098 :param pid: PID of the process executing this method.
1099 :param data_queue: Shared memory between processes. Queue which keeps
1100 the result data. This data is then read by the main process and used
1101 in further processing.
1102 :param job: Name of the Jenkins job which generated the processed input
1104 :param build: Information about the Jenkins build which generated the
1105 processed input file.
1106 :param repeat: Repeat the download specified number of times if not
1109 :type data_queue: multiprocessing.Manager().Queue()
1117 logging.info(" Processing the job/build: {0}: {1}".
1118 format(job, build["build"]))
1120 logs.append(("INFO", " Processing the job/build: {0}: {1}".
1121 format(job, build["build"])))
1128 success = download_and_unzip_data_file(self._cfg, job, build, pid,
1134 logs.append(("ERROR", "It is not possible to download the input "
1135 "data file from the job '{job}', build "
1136 "'{build}', or it is damaged. Skipped.".
1137 format(job=job, build=build["build"])))
1139 logs.append(("INFO", " Processing data from the build '{0}' ...".
1140 format(build["build"])))
1141 data = self._parse_tests(job, build, logs)
1143 logs.append(("ERROR", "Input data file from the job '{job}', "
1144 "build '{build}' is damaged. Skipped.".
1145 format(job=job, build=build["build"])))
1150 remove(build["file-name"])
1151 except OSError as err:
1152 logs.append(("ERROR", "Cannot remove the file '{0}': {1}".
1153 format(build["file-name"], repr(err))))
1155 # If the time-period is defined in the specification file, remove all
1156 # files which are outside the time period.
1157 timeperiod = self._cfg.input.get("time-period", None)
1158 if timeperiod and data:
1160 timeperiod = timedelta(int(timeperiod))
1161 metadata = data.get("metadata", None)
1163 generated = metadata.get("generated", None)
1165 generated = dt.strptime(generated, "%Y%m%d %H:%M")
1166 if (now - generated) > timeperiod:
1167 # Remove the data and the file:
1172 " The build {job}/{build} is outdated, will be "
1173 "removed".format(job=job, build=build["build"])))
1174 file_name = self._cfg.input["file-name"]
1176 self._cfg.environment["paths"]["DIR[WORKING,DATA]"],
1177 "{job}{sep}{build}{sep}{name}".
1180 build=build["build"],
1184 logs.append(("INFO",
1185 " The file {name} has been removed".
1186 format(name=full_name)))
1187 except OSError as err:
1188 logs.append(("ERROR",
1189 "Cannot remove the file '{0}': {1}".
1190 format(full_name, repr(err))))
1192 logs.append(("INFO", " Done."))
1201 data_queue.put(result)
1203 def download_and_parse_data(self, repeat=1):
1204 """Download the input data files, parse input data from input files and
1205 store in pandas' Series.
1207 :param repeat: Repeat the download specified number of times if not
1212 logging.info("Downloading and parsing input files ...")
1214 work_queue = multiprocessing.JoinableQueue()
1215 manager = multiprocessing.Manager()
1216 data_queue = manager.Queue()
1217 cpus = multiprocessing.cpu_count()
1220 for cpu in range(cpus):
1221 worker = Worker(work_queue,
1223 self._download_and_parse_build)
1224 worker.daemon = True
1226 workers.append(worker)
1227 os.system("taskset -p -c {0} {1} > /dev/null 2>&1".
1228 format(cpu, worker.pid))
1230 for job, builds in self._cfg.builds.items():
1231 for build in builds:
1232 work_queue.put((job, build, repeat))
1236 logging.info("Done.")
1238 while not data_queue.empty():
1239 result = data_queue.get()
1242 build_nr = result["build"]["build"]
1245 data = result["data"]
1246 build_data = pd.Series({
1247 "metadata": pd.Series(data["metadata"].values(),
1248 index=data["metadata"].keys()),
1249 "suites": pd.Series(data["suites"].values(),
1250 index=data["suites"].keys()),
1251 "tests": pd.Series(data["tests"].values(),
1252 index=data["tests"].keys())})
1254 if self._input_data.get(job, None) is None:
1255 self._input_data[job] = pd.Series()
1256 self._input_data[job][str(build_nr)] = build_data
1258 self._cfg.set_input_file_name(job, build_nr,
1259 result["build"]["file-name"])
1261 self._cfg.set_input_state(job, build_nr, result["state"])
1263 for item in result["logs"]:
1264 if item[0] == "INFO":
1265 logging.info(item[1])
1266 elif item[0] == "ERROR":
1267 logging.error(item[1])
1268 elif item[0] == "DEBUG":
1269 logging.debug(item[1])
1270 elif item[0] == "CRITICAL":
1271 logging.critical(item[1])
1272 elif item[0] == "WARNING":
1273 logging.warning(item[1])
1277 # Terminate all workers
1278 for worker in workers:
1282 logging.info("Done.")
1285 def _end_of_tag(tag_filter, start=0, closer="'"):
1286 """Return the index of character in the string which is the end of tag.
1288 :param tag_filter: The string where the end of tag is being searched.
1289 :param start: The index where the searching is stated.
1290 :param closer: The character which is the tag closer.
1291 :type tag_filter: str
1294 :returns: The index of the tag closer.
1299 idx_opener = tag_filter.index(closer, start)
1300 return tag_filter.index(closer, idx_opener + 1)
1305 def _condition(tag_filter):
1306 """Create a conditional statement from the given tag filter.
1308 :param tag_filter: Filter based on tags from the element specification.
1309 :type tag_filter: str
1310 :returns: Conditional statement which can be evaluated.
1316 index = InputData._end_of_tag(tag_filter, index)
1320 tag_filter = tag_filter[:index] + " in tags" + tag_filter[index:]
1322 def filter_data(self, element, params=None, data_set="tests",
1323 continue_on_error=False):
1324 """Filter required data from the given jobs and builds.
1326 The output data structure is:
1330 - test (or suite) 1 ID:
1336 - test (or suite) n ID:
1343 :param element: Element which will use the filtered data.
1344 :param params: Parameters which will be included in the output. If None,
1345 all parameters are included.
1346 :param data_set: The set of data to be filtered: tests, suites,
1348 :param continue_on_error: Continue if there is error while reading the
1349 data. The Item will be empty then
1350 :type element: pandas.Series
1353 :type continue_on_error: bool
1354 :returns: Filtered data.
1355 :rtype pandas.Series
1359 if element["filter"] in ("all", "template"):
1362 cond = InputData._condition(element["filter"])
1363 logging.debug(" Filter: {0}".format(cond))
1365 logging.error(" No filter defined.")
1369 params = element.get("parameters", None)
1371 params.append("type")
1375 for job, builds in element["data"].items():
1376 data[job] = pd.Series()
1377 for build in builds:
1378 data[job][str(build)] = pd.Series()
1380 data_iter = self.data[job][str(build)][data_set].\
1383 if continue_on_error:
1387 for test_ID, test_data in data_iter:
1388 if eval(cond, {"tags": test_data.get("tags", "")}):
1389 data[job][str(build)][test_ID] = pd.Series()
1391 for param, val in test_data.items():
1392 data[job][str(build)][test_ID][param] = val
1394 for param in params:
1396 data[job][str(build)][test_ID][param] =\
1399 data[job][str(build)][test_ID][param] =\
1403 except (KeyError, IndexError, ValueError) as err:
1404 logging.error(" Missing mandatory parameter in the element "
1405 "specification: {0}".format(err))
1407 except AttributeError:
1410 logging.error(" The filter '{0}' is not correct. Check if all "
1411 "tags are enclosed by apostrophes.".format(cond))
1415 def merge_data(data):
1416 """Merge data from more jobs and builds to a simple data structure.
1418 The output data structure is:
1420 - test (suite) 1 ID:
1426 - test (suite) n ID:
1429 :param data: Data to merge.
1430 :type data: pandas.Series
1431 :returns: Merged data.
1432 :rtype: pandas.Series
1435 logging.info(" Merging data ...")
1437 merged_data = pd.Series()
1438 for _, builds in data.iteritems():
1439 for _, item in builds.iteritems():
1440 for ID, item_data in item.iteritems():
1441 merged_data[ID] = item_data