# Copyright (c) 2017 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Data pre-processing - extract data from output.xml files generated by Jenkins jobs and store in pandas' Series, - provide access to the data. """ import re import pandas as pd import logging from robot.api import ExecutionResult, ResultVisitor from collections import OrderedDict from string import replace class ExecutionChecker(ResultVisitor): """Class to traverse through the test suite structure. The functionality implemented in this class generates a json structure: Performance tests: { "metadata": { # Optional "version": "VPP version", "job": "Jenkins job name", "build": "Information about the build" }, "suites": { "Suite name 1": { "doc": "Suite 1 documentation", "parent": "Suite 1 parent", "level": "Level of the suite in the suite hierarchy" } "Suite name N": { "doc": "Suite N documentation", "parent": "Suite 2 parent", "level": "Level of the suite in the suite hierarchy" } } "tests": { "ID": { "name": "Test name", "parent": "Name of the parent of the test", "doc": "Test documentation" "msg": "Test message" "tags": ["tag 1", "tag 2", "tag n"], "type": "PDR" | "NDR", "throughput": { "value": int, "unit": "pps" | "bps" | "percentage" }, "latency": { "direction1": { "100": { "min": int, "avg": int, "max": int }, "50": { # Only for NDR "min": int, "avg": int, "max": int }, "10": { # Only for NDR "min": int, "avg": int, "max": int } }, "direction2": { "100": { "min": int, "avg": int, "max": int }, "50": { # Only for NDR "min": int, "avg": int, "max": int }, "10": { # Only for NDR "min": int, "avg": int, "max": int } } }, "lossTolerance": "lossTolerance", # Only for PDR "vat-history": "DUT1 and DUT2 VAT History" }, "show-run": "Show Run" }, "ID" { # next test } } } Functional tests: { "metadata": { # Optional "version": "VPP version", "job": "Jenkins job name", "build": "Information about the build" }, "suites": { "Suite name 1": { "doc": "Suite 1 documentation", "parent": "Suite 1 parent", "level": "Level of the suite in the suite hierarchy" } "Suite name N": { "doc": "Suite N documentation", "parent": "Suite 2 parent", "level": "Level of the suite in the suite hierarchy" } } "tests": { "ID": { "name": "Test name", "parent": "Name of the parent of the test", "doc": "Test documentation" "msg": "Test message" "tags": ["tag 1", "tag 2", "tag n"], "vat-history": "DUT1 and DUT2 VAT History" "show-run": "Show Run" "status": "PASS" | "FAIL" }, "ID" { # next test } } } .. note:: ID is the lowercase full path to the test. """ REGEX_RATE = re.compile(r'^[\D\d]*FINAL_RATE:\s(\d+\.\d+)\s(\w+)') REGEX_LAT_NDR = re.compile(r'^[\D\d]*' r'LAT_\d+%NDR:\s\[\'(-?\d+\/-?\d+/-?\d+)\',' r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n' r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\',' r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n' r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\',' r'\s\'(-?\d+/-?\d+/-?\d+)\'\]') REGEX_LAT_PDR = re.compile(r'^[\D\d]*' r'LAT_\d+%PDR:\s\[\'(-?\d+/-?\d+/-?\d+)\',' r'\s\'(-?\d+/-?\d+/-?\d+)\'\][\D\d]*') REGEX_TOLERANCE = re.compile(r'^[\D\d]*LOSS_ACCEPTANCE:\s(\d*\.\d*)\s' r'[\D\d]*') REGEX_VERSION = re.compile(r"(stdout: 'vat# vat# Version:)(\s*)(.*)") REGEX_TCP = re.compile(r'Total\s(rps|cps|throughput):\s([0-9]*).*$') def __init__(self, **metadata): """Initialisation. :param metadata: Key-value pairs to be included in "metadata" part of JSON structure. :type metadata: dict """ # Type of message to parse out from the test messages self._msg_type = None # VPP version self._version = None # Number of VAT History messages found: # 0 - no message # 1 - VAT History of DUT1 # 2 - VAT History of DUT2 self._lookup_kw_nr = 0 self._vat_history_lookup_nr = 0 # Number of Show Running messages found # 0 - no message # 1 - Show run message found self._show_run_lookup_nr = 0 # Test ID of currently processed test- the lowercase full path to the # test self._test_ID = None # The main data structure self._data = { "metadata": OrderedDict(), "suites": OrderedDict(), "tests": OrderedDict() } # Save the provided metadata for key, val in metadata.items(): self._data["metadata"][key] = val # Dictionary defining the methods used to parse different types of # messages self.parse_msg = { "setup-version": self._get_version, "teardown-vat-history": self._get_vat_history, "teardown-show-runtime": self._get_show_run } @property def data(self): """Getter - Data parsed from the XML file. :returns: Data parsed from the XML file. :rtype: dict """ return self._data def _get_version(self, msg): """Called when extraction of VPP version is required. :param msg: Message to process. :type msg: Message :returns: Nothing. """ if msg.message.count("stdout: 'vat# vat# Version:"): self._version = str(re.search(self.REGEX_VERSION, msg.message). group(3)) self._data["metadata"]["version"] = self._version self._msg_type = None logging.debug(" VPP version: {0}".format(self._version)) def _get_vat_history(self, msg): """Called when extraction of VAT command history is required. :param msg: Message to process. :type msg: Message :returns: Nothing. """ if msg.message.count("VAT command history:"): self._vat_history_lookup_nr += 1 if self._vat_history_lookup_nr == 1: self._data["tests"][self._test_ID]["vat-history"] = str() else: self._msg_type = None text = re.sub("[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3} " "VAT command history:", "", msg.message, count=1). \ replace("\n\n", "\n").replace('\n', ' |br| ').\ replace('\r', '').replace('"', "'") self._data["tests"][self._test_ID]["vat-history"] += " |br| " self._data["tests"][self._test_ID]["vat-history"] += \ "**DUT" + str(self._vat_history_lookup_nr) + ":** " + text def _get_show_run(self, msg): """Called when extraction of VPP operational data (output of CLI command Show Runtime) is required. :param msg: Message to process. :type msg: Message :returns: Nothing. """ if msg.message.count("return STDOUT Thread "): self._show_run_lookup_nr += 1 if self._lookup_kw_nr == 1 and self._show_run_lookup_nr == 1: self._data["tests"][self._test_ID]["show-run"] = str() if self._lookup_kw_nr > 1: self._msg_type = None if self._show_run_lookup_nr == 1: text = msg.message.replace("vat# ", "").\ replace("return STDOUT ", "").replace("\n\n", "\n").\ replace('\n', ' |br| ').\ replace('\r', '').replace('"', "'") try: self._data["tests"][self._test_ID]["show-run"] += " |br| " self._data["tests"][self._test_ID]["show-run"] += \ "**DUT" + str(self._lookup_kw_nr) + ":** |br| " + text except KeyError: pass def _get_latency(self, msg, test_type): """Get the latency data from the test message. :param msg: Message to be parsed. :param test_type: Type of the test - NDR or PDR. :type msg: str :type test_type: str :returns: Latencies parsed from the message. :rtype: dict """ if test_type == "NDR": groups = re.search(self.REGEX_LAT_NDR, msg) groups_range = range(1, 7) elif test_type == "PDR": groups = re.search(self.REGEX_LAT_PDR, msg) groups_range = range(1, 3) else: return {} latencies = list() for idx in groups_range: try: lat = [int(item) for item in str(groups.group(idx)).split('/')] except (AttributeError, ValueError): lat = [-1, -1, -1] latencies.append(lat) keys = ("min", "avg", "max") latency = { "direction1": { }, "direction2": { } } latency["direction1"]["100"] = dict(zip(keys, latencies[0])) latency["direction2"]["100"] = dict(zip(keys, latencies[1])) if test_type == "NDR": latency["direction1"]["50"] = dict(zip(keys, latencies[2])) latency["direction2"]["50"] = dict(zip(keys, latencies[3])) latency["direction1"]["10"] = dict(zip(keys, latencies[4])) latency["direction2"]["10"] = dict(zip(keys, latencies[5])) return latency def visit_suite(self, suite): """Implements traversing through the suite and its direct children. :param suite: Suite to process. :type suite: Suite :returns: Nothing. """ if self.start_suite(suite) is not False: suite.suites.visit(self) suite.tests.visit(self) self.end_suite(suite) def start_suite(self, suite): """Called when suite starts. :param suite: Suite to process. :type suite: Suite :returns: Nothing. """ try: parent_name = suite.parent.name except AttributeError: return doc_str = suite.doc.replace('"', "'").replace('\n', ' ').\ replace('\r', '').replace('*[', ' |br| *[').replace("*", "**") doc_str = replace(doc_str, ' |br| *[', '*[', maxreplace=1) self._data["suites"][suite.longname.lower().replace('"', "'"). replace(" ", "_")] = { "name": suite.name.lower(), "doc": doc_str, "parent": parent_name, "level": len(suite.longname.split(".")) } suite.keywords.visit(self) def end_suite(self, suite): """Called when suite ends. :param suite: Suite to process. :type suite: Suite :returns: Nothing. """ pass def visit_test(self, test): """Implements traversing through the test. :param test: Test to process. :type test: Test :returns: Nothing. """ if self.start_test(test) is not False: test.keywords.visit(self) self.end_test(test) def start_test(self, test): """Called when test starts. :param test: Test to process. :type test: Test :returns: Nothing. """ tags = [str(tag) for tag in test.tags] test_result = dict() test_result["name"] = test.name.lower() test_result["parent"] = test.parent.name.lower() test_result["tags"] = tags doc_str = test.doc.replace('"', "'").replace('\n', ' '). \ replace('\r', '').replace('[', ' |br| [') test_result["doc"] = replace(doc_str, ' |br| [', '[', maxreplace=1) test_result["msg"] = test.message.replace('\n', ' |br| '). \ replace('\r', '').replace('"', "'") if test.status == "PASS" and ("NDRPDRDISC" in tags or "TCP" in tags): if "NDRDISC" in tags: test_type = "NDR" elif "PDRDISC" in tags: test_type = "PDR" elif "TCP" in tags: # Change to wrk? test_type = "TCP" else: return test_result["type"] = test_type if test_type in ("NDR", "PDR"): try: rate_value = str(re.search( self.REGEX_RATE, test.message).group(1)) except AttributeError: rate_value = "-1" try: rate_unit = str(re.search( self.REGEX_RATE, test.message).group(2)) except AttributeError: rate_unit = "-1" test_result["throughput"] = dict() test_result["throughput"]["value"] = \ int(rate_value.split('.')[0]) test_result["throughput"]["unit"] = rate_unit test_result["latency"] = \ self._get_latency(test.message, test_type) if test_type == "PDR": test_result["lossTolerance"] = str(re.search( self.REGEX_TOLERANCE, test.message).group(1)) elif test_type in ("TCP", ): groups = re.search(self.REGEX_TCP, test.message) test_result["result"] = dict() test_result["result"]["value"] = int(groups.group(2)) test_result["result"]["unit"] = groups.group(1) else: test_result["status"] = test.status self._test_ID = test.longname.lower() self._data["tests"][self._test_ID] = test_result def end_test(self, test): """Called when test ends. :param test: Test to process. :type test: Test :returns: Nothing. """ pass def visit_keyword(self, keyword): """Implements traversing through the keyword and its child keywords. :param keyword: Keyword to process. :type keyword: Keyword :returns: Nothing. """ if self.start_keyword(keyword) is not False: self.end_keyword(keyword) def start_keyword(self, keyword): """Called when keyword starts. Default implementation does nothing. :param keyword: Keyword to process. :type keyword: Keyword :returns: Nothing. """ try: if keyword.type == "setup": self.visit_setup_kw(keyword) elif keyword.type == "teardown": self._lookup_kw_nr = 0 self.visit_teardown_kw(keyword) except AttributeError: pass def end_keyword(self, keyword): """Called when keyword ends. Default implementation does nothing. :param keyword: Keyword to process. :type keyword: Keyword :returns: Nothing. """ pass def visit_setup_kw(self, setup_kw): """Implements traversing through the teardown keyword and its child keywords. :param setup_kw: Keyword to process. :type setup_kw: Keyword :returns: Nothing. """ for keyword in setup_kw.keywords: if self.start_setup_kw(keyword) is not False: self.visit_setup_kw(keyword) self.end_setup_kw(keyword) def start_setup_kw(self, setup_kw): """Called when teardown keyword starts. Default implementation does nothing. :param setup_kw: Keyword to process. :type setup_kw: Keyword :returns: Nothing. """ if setup_kw.name.count("Vpp Show Version Verbose") \ and not self._version: self._msg_type = "setup-version" setup_kw.messages.visit(self) def end_setup_kw(self, setup_kw): """Called when keyword ends. Default implementation does nothing. :param setup_kw: Keyword to process. :type setup_kw: Keyword :returns: Nothing. """ pass def visit_teardown_kw(self, teardown_kw): """Implements traversing through the teardown keyword and its child keywords. :param teardown_kw: Keyword to process. :type teardown_kw: Keyword :returns: Nothing. """ for keyword in teardown_kw.keywords: if self.start_teardown_kw(keyword) is not False: self.visit_teardown_kw(keyword) self.end_teardown_kw(keyword) def start_teardown_kw(self, teardown_kw): """Called when teardown keyword starts. Default implementation does nothing. :param teardown_kw: Keyword to process. :type teardown_kw: Keyword :returns: Nothing. """ if teardown_kw.name.count("Show Vat History On All Duts"): self._vat_history_lookup_nr = 0 self._msg_type = "teardown-vat-history" elif teardown_kw.name.count("Show Statistics On All Duts"): self._lookup_kw_nr += 1 self._show_run_lookup_nr = 0 self._msg_type = "teardown-show-runtime" if self._msg_type: teardown_kw.messages.visit(self) def end_teardown_kw(self, teardown_kw): """Called when keyword ends. Default implementation does nothing. :param teardown_kw: Keyword to process. :type teardown_kw: Keyword :returns: Nothing. """ pass def visit_message(self, msg): """Implements visiting the message. :param msg: Message to process. :type msg: Message :returns: Nothing. """ if self.start_message(msg) is not False: self.end_message(msg) def start_message(self, msg): """Called when message starts. Get required information from messages: - VPP version. :param msg: Message to process. :type msg: Message :returns: Nothing. """ if self._msg_type: self.parse_msg[self._msg_type](msg) def end_message(self, msg): """Called when message ends. Default implementation does nothing. :param msg: Message to process. :type msg: Message :returns: Nothing. """ pass class InputData(object): """Input data The data is extracted from output.xml files generated by Jenkins jobs and stored in pandas' DataFrames. The data structure: - job name - build number - metadata - job - build - vpp version - suites - tests - ID: test data (as described in ExecutionChecker documentation) """ def __init__(self, spec): """Initialization. :param spec: Specification. :type spec: Specification """ # Specification: self._cfg = spec # Data store: self._input_data = None @property def data(self): """Getter - Input data. :returns: Input data :rtype: pandas.Series """ return self._input_data def metadata(self, job, build): """Getter - metadata :param job: Job which metadata we want. :param build: Build which metadata we want. :type job: str :type build: str :returns: Metadata :rtype: pandas.Series """ return self.data[job][build]["metadata"] def suites(self, job, build): """Getter - suites :param job: Job which suites we want. :param build: Build which suites we want. :type job: str :type build: str :returns: Suites. :rtype: pandas.Series """ return self.data[job][str(build)]["suites"] def tests(self, job, build): """Getter - tests :param job: Job which tests we want. :param build: Build which tests we want. :type job: str :type build: str :returns: Tests. :rtype: pandas.Series """ return self.data[job][build]["tests"] @staticmethod def _parse_tests(job, build): """Process data from robot output.xml file and return JSON structured data. :param job: The name of job which build output data will be processed. :param build: The build which output data will be processed. :type job: str :type build: dict :returns: JSON data structure. :rtype: dict """ with open(build["file-name"], 'r') as data_file: result = ExecutionResult(data_file) checker = ExecutionChecker(job=job, build=build) result.visit(checker) return checker.data def read_data(self): """Parse input data from input files and store in pandas' Series. """ logging.info("Parsing input files ...") job_data = dict() for job, builds in self._cfg.builds.items(): logging.info(" Extracting data from the job '{0}' ...'". format(job)) builds_data = dict() for build in builds: if build["status"] == "failed" \ or build["status"] == "not found": continue logging.info(" Extracting data from the build '{0}'". format(build["build"])) logging.info(" Processing the file '{0}'". format(build["file-name"])) data = InputData._parse_tests(job, build) build_data = pd.Series({ "metadata": pd.Series(data["metadata"].values(), index=data["metadata"].keys()), "suites": pd.Series(data["suites"].values(), index=data["suites"].keys()), "tests": pd.Series(data["tests"].values(), index=data["tests"].keys()), }) builds_data[str(build["build"])] = build_data logging.info(" Done.") job_data[job] = pd.Series(builds_data.values(), index=builds_data.keys()) logging.info(" Done.") self._input_data = pd.Series(job_data.values(), index=job_data.keys()) logging.info("Done.") @staticmethod def _end_of_tag(tag_filter, start=0, closer="'"): """Return the index of character in the string which is the end of tag. :param tag_filter: The string where the end of tag is being searched. :param start: The index where the searching is stated. :param closer: The character which is the tag closer. :type tag_filter: str :type start: int :type closer: str :returns: The index of the tag closer. :rtype: int """ try: idx_opener = tag_filter.index(closer, start) return tag_filter.index(closer, idx_opener + 1) except ValueError: return None @staticmethod def _condition(tag_filter): """Create a conditional statement from the given tag filter. :param tag_filter: Filter based on tags from the element specification. :type tag_filter: str :returns: Conditional statement which can be evaluated. :rtype: str """ index = 0 while True: index = InputData._end_of_tag(tag_filter, index) if index is None: return tag_filter index += 1 tag_filter = tag_filter[:index] + " in tags" + tag_filter[index:] def filter_data(self, element, params=None, data_set="tests"): """Filter required data from the given jobs and builds. The output data structure is: - job 1 - build 1 - test (suite) 1 ID: - param 1 - param 2 ... - param n ... - test (suite) n ID: ... ... - build n ... - job n :param element: Element which will use the filtered data. :param params: Parameters which will be included in the output. If None, all parameters are included. :param data_set: The set of data to be filtered: tests, suites, metadata. :type element: pandas.Series :type params: list :type data_set: str :returns: Filtered data. :rtype pandas.Series """ logging.info(" Creating the data set for the {0} '{1}'.". format(element["type"], element.get("title", ""))) try: if element["filter"] in ("all", "template"): cond = "True" else: cond = InputData._condition(element["filter"]) logging.debug(" Filter: {0}".format(cond)) except KeyError: logging.error(" No filter defined.") return None if params is None: params = element.get("parameters", None) data = pd.Series() try: for job, builds in element["data"].items(): data[job] = pd.Series() for build in builds: data[job][str(build)] = pd.Series() for test_ID, test_data in \ self.data[job][str(build)][data_set].iteritems(): if eval(cond, {"tags": test_data.get("tags", "")}): data[job][str(build)][test_ID] = pd.Series() if params is None: for param, val in test_data.items(): data[job][str(build)][test_ID][param] = val else: for param in params: try: data[job][str(build)][test_ID][param] =\ test_data[param] except KeyError: data[job][str(build)][test_ID][param] =\ "No Data" return data except (KeyError, IndexError, ValueError) as err: logging.error(" Missing mandatory parameter in the element " "specification.", err) return None except AttributeError: return None except SyntaxError: logging.error(" The filter '{0}' is not correct. Check if all " "tags are enclosed by apostrophes.".format(cond)) return None @staticmethod def merge_data(data): """Merge data from more jobs and builds to a simple data structure. The output data structure is: - test (suite) 1 ID: - param 1 - param 2 ... - param n ... - test (suite) n ID: ... :param data: Data to merge. :type data: pandas.Series :returns: Merged data. :rtype: pandas.Series """ logging.info(" Merging data ...") merged_data = pd.Series() for _, builds in data.iteritems(): for _, item in builds.iteritems(): for ID, item_data in item.iteritems(): merged_data[ID] = item_data return merged_data