X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=resources%2Ftools%2Fpresentation%2Finput_data_parser.py;h=cf132377748870a9609efe89ba4c947729351a53;hb=39b4a07718ecab94ea331362edb62dfcf678bd09;hp=9428b2cdc9695fe66b8a171d43a242657b017ccb;hpb=372eab0eac428149d547b2d6eb2ce43cd0d750f6;p=csit.git diff --git a/resources/tools/presentation/input_data_parser.py b/resources/tools/presentation/input_data_parser.py index 9428b2cdc9..cf13237774 100644 --- a/resources/tools/presentation/input_data_parser.py +++ b/resources/tools/presentation/input_data_parser.py @@ -31,6 +31,7 @@ from string import replace from os import remove from input_data_files import download_and_unzip_data_file +from utils import Worker class ExecutionChecker(ResultVisitor): @@ -175,7 +176,10 @@ class ExecutionChecker(ResultVisitor): REGEX_TOLERANCE = re.compile(r'^[\D\d]*LOSS_ACCEPTANCE:\s(\d*\.\d*)\s' r'[\D\d]*') - REGEX_VERSION = re.compile(r"(return STDOUT Version:\s*)(.*)") + REGEX_VERSION_VPP = re.compile(r"(return STDOUT Version:\s*)(.*)") + + REGEX_VERSION_DPDK = re.compile(r"(return STDOUT testpmd)([\d\D\n]*)" + r"(RTE Version: 'DPDK )(.*)(')") REGEX_TCP = re.compile(r'Total\s(rps|cps|throughput):\s([0-9]*).*$') @@ -196,6 +200,9 @@ class ExecutionChecker(ResultVisitor): # VPP version self._version = None + # Timestamp + self._timestamp = None + # Number of VAT History messages found: # 0 - no message # 1 - VAT History of DUT1 @@ -226,7 +233,9 @@ class ExecutionChecker(ResultVisitor): # Dictionary defining the methods used to parse different types of # messages self.parse_msg = { - "setup-version": self._get_version, + "timestamp": self._get_timestamp, + "vpp-version": self._get_vpp_version, + "dpdk-version": self._get_dpdk_version, "teardown-vat-history": self._get_vat_history, "test-show-runtime": self._get_show_run } @@ -240,7 +249,7 @@ class ExecutionChecker(ResultVisitor): """ return self._data - def _get_version(self, msg): + def _get_vpp_version(self, msg): """Called when extraction of VPP version is required. :param msg: Message to process. @@ -249,12 +258,41 @@ class ExecutionChecker(ResultVisitor): """ if msg.message.count("return STDOUT Version:"): - self._version = str(re.search(self.REGEX_VERSION, msg.message). + self._version = str(re.search(self.REGEX_VERSION_VPP, msg.message). group(2)) self._data["metadata"]["version"] = self._version - self._data["metadata"]["generated"] = msg.timestamp self._msg_type = None + def _get_dpdk_version(self, msg): + """Called when extraction of DPDK version is required. + + :param msg: Message to process. + :type msg: Message + :returns: Nothing. + """ + + if msg.message.count("return STDOUT testpmd"): + try: + self._version = str(re.search( + self.REGEX_VERSION_DPDK, msg.message). group(4)) + self._data["metadata"]["version"] = self._version + except IndexError: + pass + finally: + self._msg_type = None + + def _get_timestamp(self, msg): + """Called when extraction of timestamp is required. + + :param msg: Message to process. + :type msg: Message + :returns: Nothing. + """ + + self._timestamp = msg.timestamp[:14] + self._data["metadata"]["generated"] = self._timestamp + self._msg_type = None + def _get_vat_history(self, msg): """Called when extraction of VAT command history is required. @@ -426,6 +464,7 @@ class ExecutionChecker(ResultVisitor): test_result["doc"] = replace(doc_str, ' |br| [', '[', maxreplace=1) test_result["msg"] = test.message.replace('\n', ' |br| '). \ replace('\r', '').replace('"', "'") + test_result["status"] = test.status if test.status == "PASS" and ("NDRPDRDISC" in tags or "TCP" in tags or "MRR" in tags): @@ -469,6 +508,7 @@ class ExecutionChecker(ResultVisitor): test_result["result"] = dict() test_result["result"]["value"] = int(groups.group(2)) test_result["result"]["unit"] = groups.group(1) + elif test_type in ("MRR", ): groups = re.search(self.REGEX_MRR, test.message) test_result["result"] = dict() @@ -478,8 +518,6 @@ class ExecutionChecker(ResultVisitor): test_result["result"]["throughput"] = int( test_result["result"]["rx"] / test_result["result"]["duration"]) - else: - test_result["status"] = test.status self._test_ID = test.longname.lower() self._data["tests"][self._test_ID] = test_result @@ -556,7 +594,11 @@ class ExecutionChecker(ResultVisitor): self._lookup_kw_nr += 1 self._show_run_lookup_nr = 0 self._msg_type = "test-show-runtime" - test_kw.messages.visit(self) + elif test_kw.name.count("Start The L2fwd Test") and not self._version: + self._msg_type = "dpdk-version" + else: + return + test_kw.messages.visit(self) def end_test_kw(self, test_kw): """Called when keyword ends. Default implementation does nothing. @@ -590,8 +632,14 @@ class ExecutionChecker(ResultVisitor): """ if setup_kw.name.count("Show Vpp Version On All Duts") \ and not self._version: - self._msg_type = "setup-version" - setup_kw.messages.visit(self) + self._msg_type = "vpp-version" + + elif setup_kw.name.count("Setup performance global Variables") \ + and not self._timestamp: + self._msg_type = "timestamp" + else: + return + setup_kw.messages.visit(self) def end_setup_kw(self, setup_kw): """Called when keyword ends. Default implementation does nothing. @@ -863,12 +911,10 @@ class InputData(object): logging.info("Downloading and parsing input files ...") work_queue = multiprocessing.JoinableQueue() - manager = multiprocessing.Manager() - data_queue = manager.Queue() - cpus = multiprocessing.cpu_count() + workers = list() for cpu in range(cpus): worker = Worker(work_queue, @@ -1008,9 +1054,6 @@ class InputData(object): :rtype pandas.Series """ - logging.info(" Creating the data set for the {0} '{1}'.". - format(element.get("type", ""), element.get("title", ""))) - try: if element["filter"] in ("all", "template"): cond = "True" @@ -1095,44 +1138,3 @@ class InputData(object): merged_data[ID] = item_data return merged_data - - -class Worker(multiprocessing.Process): - """Worker class used to download and process input files in separate - parallel processes. - """ - - def __init__(self, work_queue, data_queue, func): - """Initialization. - - :param work_queue: Queue with items to process. - :param data_queue: Shared memory between processes. Queue which keeps - the result data. This data is then read by the main process and used - in further processing. - :param func: Function which is executed by the worker. - :type work_queue: multiprocessing.JoinableQueue - :type data_queue: multiprocessing.Manager().Queue() - :type func: Callable object - """ - super(Worker, self).__init__() - self._work_queue = work_queue - self._data_queue = data_queue - self._func = func - - def run(self): - """Method representing the process's activity. - """ - - while True: - try: - self.process(self._work_queue.get()) - finally: - self._work_queue.task_done() - - def process(self, item_to_process): - """Method executed by the runner. - - :param item_to_process: Data to be processed by the function. - :type item_to_process: tuple - """ - self._func(self.pid, self._data_queue, *item_to_process)