1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Data pre-processing
16 - extract data from output.xml files generated by Jenkins jobs and store in
18 - provide access to the data.
21 import multiprocessing
27 from robot.api import ExecutionResult, ResultVisitor
28 from robot import errors
29 from collections import OrderedDict
30 from string import replace
33 from input_data_files import download_and_unzip_data_file
34 from utils import Worker
37 class ExecutionChecker(ResultVisitor):
38 """Class to traverse through the test suite structure.
40 The functionality implemented in this class generates a json structure:
45 "metadata": { # Optional
46 "version": "VPP version",
47 "job": "Jenkins job name",
48 "build": "Information about the build"
52 "doc": "Suite 1 documentation",
53 "parent": "Suite 1 parent",
54 "level": "Level of the suite in the suite hierarchy"
57 "doc": "Suite N documentation",
58 "parent": "Suite 2 parent",
59 "level": "Level of the suite in the suite hierarchy"
65 "parent": "Name of the parent of the test",
66 "doc": "Test documentation"
68 "tags": ["tag 1", "tag 2", "tag n"],
69 "type": "PDR" | "NDR",
72 "unit": "pps" | "bps" | "percentage"
81 "50": { # Only for NDR
86 "10": { # Only for NDR
98 "50": { # Only for NDR
103 "10": { # Only for NDR
110 "lossTolerance": "lossTolerance", # Only for PDR
111 "vat-history": "DUT1 and DUT2 VAT History"
113 "show-run": "Show Run"
125 "metadata": { # Optional
126 "version": "VPP version",
127 "job": "Jenkins job name",
128 "build": "Information about the build"
132 "doc": "Suite 1 documentation",
133 "parent": "Suite 1 parent",
134 "level": "Level of the suite in the suite hierarchy"
137 "doc": "Suite N documentation",
138 "parent": "Suite 2 parent",
139 "level": "Level of the suite in the suite hierarchy"
145 "parent": "Name of the parent of the test",
146 "doc": "Test documentation"
147 "msg": "Test message"
148 "tags": ["tag 1", "tag 2", "tag n"],
149 "vat-history": "DUT1 and DUT2 VAT History"
150 "show-run": "Show Run"
151 "status": "PASS" | "FAIL"
159 .. note:: ID is the lowercase full path to the test.
162 REGEX_RATE = re.compile(r'^[\D\d]*FINAL_RATE:\s(\d+\.\d+)\s(\w+)')
164 REGEX_LAT_NDR = re.compile(r'^[\D\d]*'
165 r'LAT_\d+%NDR:\s\[\'(-?\d+\/-?\d+/-?\d+)\','
166 r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
167 r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
168 r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
169 r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
170 r'\s\'(-?\d+/-?\d+/-?\d+)\'\]')
172 REGEX_LAT_PDR = re.compile(r'^[\D\d]*'
173 r'LAT_\d+%PDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
174 r'\s\'(-?\d+/-?\d+/-?\d+)\'\][\D\d]*')
176 REGEX_TOLERANCE = re.compile(r'^[\D\d]*LOSS_ACCEPTANCE:\s(\d*\.\d*)\s'
179 REGEX_VERSION = re.compile(r"(return STDOUT Version:\s*)(.*)")
181 REGEX_TCP = re.compile(r'Total\s(rps|cps|throughput):\s([0-9]*).*$')
183 REGEX_MRR = re.compile(r'MaxReceivedRate_Results\s\[pkts/(\d*)sec\]:\s'
184 r'tx\s(\d*),\srx\s(\d*)')
186 def __init__(self, metadata):
189 :param metadata: Key-value pairs to be included in "metadata" part of
194 # Type of message to parse out from the test messages
195 self._msg_type = None
200 # Number of VAT History messages found:
202 # 1 - VAT History of DUT1
203 # 2 - VAT History of DUT2
204 self._lookup_kw_nr = 0
205 self._vat_history_lookup_nr = 0
207 # Number of Show Running messages found
209 # 1 - Show run message found
210 self._show_run_lookup_nr = 0
212 # Test ID of currently processed test- the lowercase full path to the
216 # The main data structure
218 "metadata": OrderedDict(),
219 "suites": OrderedDict(),
220 "tests": OrderedDict()
223 # Save the provided metadata
224 for key, val in metadata.items():
225 self._data["metadata"][key] = val
227 # Dictionary defining the methods used to parse different types of
230 "setup-version": self._get_version,
231 "teardown-vat-history": self._get_vat_history,
232 "test-show-runtime": self._get_show_run
237 """Getter - Data parsed from the XML file.
239 :returns: Data parsed from the XML file.
244 def _get_version(self, msg):
245 """Called when extraction of VPP version is required.
247 :param msg: Message to process.
252 if msg.message.count("return STDOUT Version:"):
253 self._version = str(re.search(self.REGEX_VERSION, msg.message).
255 self._data["metadata"]["version"] = self._version
256 self._data["metadata"]["generated"] = msg.timestamp
257 self._msg_type = None
259 def _get_vat_history(self, msg):
260 """Called when extraction of VAT command history is required.
262 :param msg: Message to process.
266 if msg.message.count("VAT command history:"):
267 self._vat_history_lookup_nr += 1
268 if self._vat_history_lookup_nr == 1:
269 self._data["tests"][self._test_ID]["vat-history"] = str()
271 self._msg_type = None
272 text = re.sub("[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3} "
273 "VAT command history:", "", msg.message, count=1). \
274 replace("\n\n", "\n").replace('\n', ' |br| ').\
275 replace('\r', '').replace('"', "'")
277 self._data["tests"][self._test_ID]["vat-history"] += " |br| "
278 self._data["tests"][self._test_ID]["vat-history"] += \
279 "**DUT" + str(self._vat_history_lookup_nr) + ":** " + text
281 def _get_show_run(self, msg):
282 """Called when extraction of VPP operational data (output of CLI command
283 Show Runtime) is required.
285 :param msg: Message to process.
289 if msg.message.count("return STDOUT Thread "):
290 self._show_run_lookup_nr += 1
291 if self._lookup_kw_nr == 1 and self._show_run_lookup_nr == 1:
292 self._data["tests"][self._test_ID]["show-run"] = str()
293 if self._lookup_kw_nr > 1:
294 self._msg_type = None
295 if self._show_run_lookup_nr == 1:
296 text = msg.message.replace("vat# ", "").\
297 replace("return STDOUT ", "").replace("\n\n", "\n").\
298 replace('\n', ' |br| ').\
299 replace('\r', '').replace('"', "'")
301 self._data["tests"][self._test_ID]["show-run"] += " |br| "
302 self._data["tests"][self._test_ID]["show-run"] += \
303 "**DUT" + str(self._lookup_kw_nr) + ":** |br| " + text
307 def _get_latency(self, msg, test_type):
308 """Get the latency data from the test message.
310 :param msg: Message to be parsed.
311 :param test_type: Type of the test - NDR or PDR.
314 :returns: Latencies parsed from the message.
318 if test_type == "NDR":
319 groups = re.search(self.REGEX_LAT_NDR, msg)
320 groups_range = range(1, 7)
321 elif test_type == "PDR":
322 groups = re.search(self.REGEX_LAT_PDR, msg)
323 groups_range = range(1, 3)
328 for idx in groups_range:
330 lat = [int(item) for item in str(groups.group(idx)).split('/')]
331 except (AttributeError, ValueError):
333 latencies.append(lat)
335 keys = ("min", "avg", "max")
343 latency["direction1"]["100"] = dict(zip(keys, latencies[0]))
344 latency["direction2"]["100"] = dict(zip(keys, latencies[1]))
345 if test_type == "NDR":
346 latency["direction1"]["50"] = dict(zip(keys, latencies[2]))
347 latency["direction2"]["50"] = dict(zip(keys, latencies[3]))
348 latency["direction1"]["10"] = dict(zip(keys, latencies[4]))
349 latency["direction2"]["10"] = dict(zip(keys, latencies[5]))
353 def visit_suite(self, suite):
354 """Implements traversing through the suite and its direct children.
356 :param suite: Suite to process.
360 if self.start_suite(suite) is not False:
361 suite.suites.visit(self)
362 suite.tests.visit(self)
363 self.end_suite(suite)
365 def start_suite(self, suite):
366 """Called when suite starts.
368 :param suite: Suite to process.
374 parent_name = suite.parent.name
375 except AttributeError:
378 doc_str = suite.doc.replace('"', "'").replace('\n', ' ').\
379 replace('\r', '').replace('*[', ' |br| *[').replace("*", "**")
380 doc_str = replace(doc_str, ' |br| *[', '*[', maxreplace=1)
382 self._data["suites"][suite.longname.lower().replace('"', "'").
383 replace(" ", "_")] = {
384 "name": suite.name.lower(),
386 "parent": parent_name,
387 "level": len(suite.longname.split("."))
390 suite.keywords.visit(self)
392 def end_suite(self, suite):
393 """Called when suite ends.
395 :param suite: Suite to process.
401 def visit_test(self, test):
402 """Implements traversing through the test.
404 :param test: Test to process.
408 if self.start_test(test) is not False:
409 test.keywords.visit(self)
412 def start_test(self, test):
413 """Called when test starts.
415 :param test: Test to process.
420 tags = [str(tag) for tag in test.tags]
422 test_result["name"] = test.name.lower()
423 test_result["parent"] = test.parent.name.lower()
424 test_result["tags"] = tags
425 doc_str = test.doc.replace('"', "'").replace('\n', ' '). \
426 replace('\r', '').replace('[', ' |br| [')
427 test_result["doc"] = replace(doc_str, ' |br| [', '[', maxreplace=1)
428 test_result["msg"] = test.message.replace('\n', ' |br| '). \
429 replace('\r', '').replace('"', "'")
430 if test.status == "PASS" and ("NDRPDRDISC" in tags or
433 if "NDRDISC" in tags:
435 elif "PDRDISC" in tags:
444 test_result["type"] = test_type
446 if test_type in ("NDR", "PDR"):
448 rate_value = str(re.search(
449 self.REGEX_RATE, test.message).group(1))
450 except AttributeError:
453 rate_unit = str(re.search(
454 self.REGEX_RATE, test.message).group(2))
455 except AttributeError:
458 test_result["throughput"] = dict()
459 test_result["throughput"]["value"] = \
460 int(rate_value.split('.')[0])
461 test_result["throughput"]["unit"] = rate_unit
462 test_result["latency"] = \
463 self._get_latency(test.message, test_type)
464 if test_type == "PDR":
465 test_result["lossTolerance"] = str(re.search(
466 self.REGEX_TOLERANCE, test.message).group(1))
468 elif test_type in ("TCP", ):
469 groups = re.search(self.REGEX_TCP, test.message)
470 test_result["result"] = dict()
471 test_result["result"]["value"] = int(groups.group(2))
472 test_result["result"]["unit"] = groups.group(1)
473 elif test_type in ("MRR", ):
474 groups = re.search(self.REGEX_MRR, test.message)
475 test_result["result"] = dict()
476 test_result["result"]["duration"] = int(groups.group(1))
477 test_result["result"]["tx"] = int(groups.group(2))
478 test_result["result"]["rx"] = int(groups.group(3))
479 test_result["result"]["throughput"] = int(
480 test_result["result"]["rx"] /
481 test_result["result"]["duration"])
483 test_result["status"] = test.status
485 self._test_ID = test.longname.lower()
486 self._data["tests"][self._test_ID] = test_result
488 def end_test(self, test):
489 """Called when test ends.
491 :param test: Test to process.
497 def visit_keyword(self, keyword):
498 """Implements traversing through the keyword and its child keywords.
500 :param keyword: Keyword to process.
501 :type keyword: Keyword
504 if self.start_keyword(keyword) is not False:
505 self.end_keyword(keyword)
507 def start_keyword(self, keyword):
508 """Called when keyword starts. Default implementation does nothing.
510 :param keyword: Keyword to process.
511 :type keyword: Keyword
515 if keyword.type == "setup":
516 self.visit_setup_kw(keyword)
517 elif keyword.type == "teardown":
518 self._lookup_kw_nr = 0
519 self.visit_teardown_kw(keyword)
521 self._lookup_kw_nr = 0
522 self.visit_test_kw(keyword)
523 except AttributeError:
526 def end_keyword(self, keyword):
527 """Called when keyword ends. Default implementation does nothing.
529 :param keyword: Keyword to process.
530 :type keyword: Keyword
535 def visit_test_kw(self, test_kw):
536 """Implements traversing through the test keyword and its child
539 :param test_kw: Keyword to process.
540 :type test_kw: Keyword
543 for keyword in test_kw.keywords:
544 if self.start_test_kw(keyword) is not False:
545 self.visit_test_kw(keyword)
546 self.end_test_kw(keyword)
548 def start_test_kw(self, test_kw):
549 """Called when test keyword starts. Default implementation does
552 :param test_kw: Keyword to process.
553 :type test_kw: Keyword
556 if test_kw.name.count("Show Runtime Counters On All Duts"):
557 self._lookup_kw_nr += 1
558 self._show_run_lookup_nr = 0
559 self._msg_type = "test-show-runtime"
560 test_kw.messages.visit(self)
562 def end_test_kw(self, test_kw):
563 """Called when keyword ends. Default implementation does nothing.
565 :param test_kw: Keyword to process.
566 :type test_kw: Keyword
571 def visit_setup_kw(self, setup_kw):
572 """Implements traversing through the teardown keyword and its child
575 :param setup_kw: Keyword to process.
576 :type setup_kw: Keyword
579 for keyword in setup_kw.keywords:
580 if self.start_setup_kw(keyword) is not False:
581 self.visit_setup_kw(keyword)
582 self.end_setup_kw(keyword)
584 def start_setup_kw(self, setup_kw):
585 """Called when teardown keyword starts. Default implementation does
588 :param setup_kw: Keyword to process.
589 :type setup_kw: Keyword
592 if setup_kw.name.count("Show Vpp Version On All Duts") \
593 and not self._version:
594 self._msg_type = "setup-version"
595 setup_kw.messages.visit(self)
597 def end_setup_kw(self, setup_kw):
598 """Called when keyword ends. Default implementation does nothing.
600 :param setup_kw: Keyword to process.
601 :type setup_kw: Keyword
606 def visit_teardown_kw(self, teardown_kw):
607 """Implements traversing through the teardown keyword and its child
610 :param teardown_kw: Keyword to process.
611 :type teardown_kw: Keyword
614 for keyword in teardown_kw.keywords:
615 if self.start_teardown_kw(keyword) is not False:
616 self.visit_teardown_kw(keyword)
617 self.end_teardown_kw(keyword)
619 def start_teardown_kw(self, teardown_kw):
620 """Called when teardown keyword starts. Default implementation does
623 :param teardown_kw: Keyword to process.
624 :type teardown_kw: Keyword
628 if teardown_kw.name.count("Show Vat History On All Duts"):
629 self._vat_history_lookup_nr = 0
630 self._msg_type = "teardown-vat-history"
631 teardown_kw.messages.visit(self)
633 def end_teardown_kw(self, teardown_kw):
634 """Called when keyword ends. Default implementation does nothing.
636 :param teardown_kw: Keyword to process.
637 :type teardown_kw: Keyword
642 def visit_message(self, msg):
643 """Implements visiting the message.
645 :param msg: Message to process.
649 if self.start_message(msg) is not False:
650 self.end_message(msg)
652 def start_message(self, msg):
653 """Called when message starts. Get required information from messages:
656 :param msg: Message to process.
662 self.parse_msg[self._msg_type](msg)
664 def end_message(self, msg):
665 """Called when message ends. Default implementation does nothing.
667 :param msg: Message to process.
674 class InputData(object):
677 The data is extracted from output.xml files generated by Jenkins jobs and
678 stored in pandas' DataFrames.
689 - ID: test data (as described in ExecutionChecker documentation)
692 def __init__(self, spec):
695 :param spec: Specification.
696 :type spec: Specification
703 self._input_data = pd.Series()
707 """Getter - Input data.
710 :rtype: pandas.Series
712 return self._input_data
714 def metadata(self, job, build):
717 :param job: Job which metadata we want.
718 :param build: Build which metadata we want.
722 :rtype: pandas.Series
725 return self.data[job][build]["metadata"]
727 def suites(self, job, build):
730 :param job: Job which suites we want.
731 :param build: Build which suites we want.
735 :rtype: pandas.Series
738 return self.data[job][str(build)]["suites"]
740 def tests(self, job, build):
743 :param job: Job which tests we want.
744 :param build: Build which tests we want.
748 :rtype: pandas.Series
751 return self.data[job][build]["tests"]
754 def _parse_tests(job, build, log):
755 """Process data from robot output.xml file and return JSON structured
758 :param job: The name of job which build output data will be processed.
759 :param build: The build which output data will be processed.
760 :param log: List of log messages.
763 :type log: list of tuples (severity, msg)
764 :returns: JSON data structure.
773 with open(build["file-name"], 'r') as data_file:
775 result = ExecutionResult(data_file)
776 except errors.DataError as err:
777 log.append(("ERROR", "Error occurred while parsing output.xml: "
780 checker = ExecutionChecker(metadata)
781 result.visit(checker)
785 def _download_and_parse_build(self, pid, data_queue, job, build, repeat):
786 """Download and parse the input data file.
788 :param pid: PID of the process executing this method.
789 :param data_queue: Shared memory between processes. Queue which keeps
790 the result data. This data is then read by the main process and used
791 in further processing.
792 :param job: Name of the Jenkins job which generated the processed input
794 :param build: Information about the Jenkins build which generated the
795 processed input file.
796 :param repeat: Repeat the download specified number of times if not
799 :type data_queue: multiprocessing.Manager().Queue()
807 logging.info(" Processing the job/build: {0}: {1}".
808 format(job, build["build"]))
810 logs.append(("INFO", " Processing the job/build: {0}: {1}".
811 format(job, build["build"])))
818 success = download_and_unzip_data_file(self._cfg, job, build, pid,
824 logs.append(("ERROR", "It is not possible to download the input "
825 "data file from the job '{job}', build "
826 "'{build}', or it is damaged. Skipped.".
827 format(job=job, build=build["build"])))
829 logs.append(("INFO", " Processing data from the build '{0}' ...".
830 format(build["build"])))
831 data = InputData._parse_tests(job, build, logs)
833 logs.append(("ERROR", "Input data file from the job '{job}', "
834 "build '{build}' is damaged. Skipped.".
835 format(job=job, build=build["build"])))
840 remove(build["file-name"])
841 except OSError as err:
842 logs.append(("ERROR", "Cannot remove the file '{0}': {1}".
843 format(build["file-name"], err)))
844 logs.append(("INFO", " Done."))
853 data_queue.put(result)
855 def download_and_parse_data(self, repeat=1):
856 """Download the input data files, parse input data from input files and
857 store in pandas' Series.
859 :param repeat: Repeat the download specified number of times if not
864 logging.info("Downloading and parsing input files ...")
866 work_queue = multiprocessing.JoinableQueue()
867 manager = multiprocessing.Manager()
868 data_queue = manager.Queue()
869 cpus = multiprocessing.cpu_count()
872 for cpu in range(cpus):
873 worker = Worker(work_queue,
875 self._download_and_parse_build)
878 workers.append(worker)
879 os.system("taskset -p -c {0} {1} > /dev/null 2>&1".
880 format(cpu, worker.pid))
882 for job, builds in self._cfg.builds.items():
884 work_queue.put((job, build, repeat))
888 logging.info("Done.")
890 while not data_queue.empty():
891 result = data_queue.get()
894 build_nr = result["build"]["build"]
897 data = result["data"]
898 build_data = pd.Series({
899 "metadata": pd.Series(data["metadata"].values(),
900 index=data["metadata"].keys()),
901 "suites": pd.Series(data["suites"].values(),
902 index=data["suites"].keys()),
903 "tests": pd.Series(data["tests"].values(),
904 index=data["tests"].keys())})
906 if self._input_data.get(job, None) is None:
907 self._input_data[job] = pd.Series()
908 self._input_data[job][str(build_nr)] = build_data
910 self._cfg.set_input_file_name(job, build_nr,
911 result["build"]["file-name"])
913 self._cfg.set_input_state(job, build_nr, result["state"])
915 for item in result["logs"]:
916 if item[0] == "INFO":
917 logging.info(item[1])
918 elif item[0] == "ERROR":
919 logging.error(item[1])
920 elif item[0] == "DEBUG":
921 logging.debug(item[1])
922 elif item[0] == "CRITICAL":
923 logging.critical(item[1])
924 elif item[0] == "WARNING":
925 logging.warning(item[1])
929 # Terminate all workers
930 for worker in workers:
934 logging.info("Done.")
937 def _end_of_tag(tag_filter, start=0, closer="'"):
938 """Return the index of character in the string which is the end of tag.
940 :param tag_filter: The string where the end of tag is being searched.
941 :param start: The index where the searching is stated.
942 :param closer: The character which is the tag closer.
943 :type tag_filter: str
946 :returns: The index of the tag closer.
951 idx_opener = tag_filter.index(closer, start)
952 return tag_filter.index(closer, idx_opener + 1)
957 def _condition(tag_filter):
958 """Create a conditional statement from the given tag filter.
960 :param tag_filter: Filter based on tags from the element specification.
961 :type tag_filter: str
962 :returns: Conditional statement which can be evaluated.
968 index = InputData._end_of_tag(tag_filter, index)
972 tag_filter = tag_filter[:index] + " in tags" + tag_filter[index:]
974 def filter_data(self, element, params=None, data_set="tests",
975 continue_on_error=False):
976 """Filter required data from the given jobs and builds.
978 The output data structure is:
995 :param element: Element which will use the filtered data.
996 :param params: Parameters which will be included in the output. If None,
997 all parameters are included.
998 :param data_set: The set of data to be filtered: tests, suites,
1000 :param continue_on_error: Continue if there is error while reading the
1001 data. The Item will be empty then
1002 :type element: pandas.Series
1005 :type continue_on_error: bool
1006 :returns: Filtered data.
1007 :rtype pandas.Series
1011 if element["filter"] in ("all", "template"):
1014 cond = InputData._condition(element["filter"])
1015 logging.debug(" Filter: {0}".format(cond))
1017 logging.error(" No filter defined.")
1021 params = element.get("parameters", None)
1025 for job, builds in element["data"].items():
1026 data[job] = pd.Series()
1027 for build in builds:
1028 data[job][str(build)] = pd.Series()
1030 data_iter = self.data[job][str(build)][data_set].\
1033 if continue_on_error:
1037 for test_ID, test_data in data_iter:
1038 if eval(cond, {"tags": test_data.get("tags", "")}):
1039 data[job][str(build)][test_ID] = pd.Series()
1041 for param, val in test_data.items():
1042 data[job][str(build)][test_ID][param] = val
1044 for param in params:
1046 data[job][str(build)][test_ID][param] =\
1049 data[job][str(build)][test_ID][param] =\
1053 except (KeyError, IndexError, ValueError) as err:
1054 logging.error(" Missing mandatory parameter in the element "
1055 "specification: {0}".format(err))
1057 except AttributeError:
1060 logging.error(" The filter '{0}' is not correct. Check if all "
1061 "tags are enclosed by apostrophes.".format(cond))
1065 def merge_data(data):
1066 """Merge data from more jobs and builds to a simple data structure.
1068 The output data structure is:
1070 - test (suite) 1 ID:
1076 - test (suite) n ID:
1079 :param data: Data to merge.
1080 :type data: pandas.Series
1081 :returns: Merged data.
1082 :rtype: pandas.Series
1085 logging.info(" Merging data ...")
1087 merged_data = pd.Series()
1088 for _, builds in data.iteritems():
1089 for _, item in builds.iteritems():
1090 for ID, item_data in item.iteritems():
1091 merged_data[ID] = item_data