Revert "CSIT-1110: Update code after jumpavg 0.1.2 release"
[csit.git] / resources / tools / presentation / input_data_parser.py
index 9428b2c..0bb2b6c 100644 (file)
@@ -31,6 +31,7 @@ from string import replace
 from os import remove
 
 from input_data_files import download_and_unzip_data_file
+from utils import Worker
 
 
 class ExecutionChecker(ResultVisitor):
@@ -175,7 +176,10 @@ class ExecutionChecker(ResultVisitor):
     REGEX_TOLERANCE = re.compile(r'^[\D\d]*LOSS_ACCEPTANCE:\s(\d*\.\d*)\s'
                                  r'[\D\d]*')
 
-    REGEX_VERSION = re.compile(r"(return STDOUT Version:\s*)(.*)")
+    REGEX_VERSION_VPP = re.compile(r"(return STDOUT Version:\s*)(.*)")
+
+    REGEX_VERSION_DPDK = re.compile(r"(return STDOUT testpmd)([\d\D\n]*)"
+                                    r"(RTE Version: 'DPDK )(.*)(')")
 
     REGEX_TCP = re.compile(r'Total\s(rps|cps|throughput):\s([0-9]*).*$')
 
@@ -196,6 +200,9 @@ class ExecutionChecker(ResultVisitor):
         # VPP version
         self._version = None
 
+        # Timestamp
+        self._timestamp = None
+
         # Number of VAT History messages found:
         # 0 - no message
         # 1 - VAT History of DUT1
@@ -226,7 +233,9 @@ class ExecutionChecker(ResultVisitor):
         # Dictionary defining the methods used to parse different types of
         # messages
         self.parse_msg = {
-            "setup-version": self._get_version,
+            "timestamp": self._get_timestamp,
+            "vpp-version": self._get_vpp_version,
+            "dpdk-version": self._get_dpdk_version,
             "teardown-vat-history": self._get_vat_history,
             "test-show-runtime": self._get_show_run
         }
@@ -240,7 +249,7 @@ class ExecutionChecker(ResultVisitor):
         """
         return self._data
 
-    def _get_version(self, msg):
+    def _get_vpp_version(self, msg):
         """Called when extraction of VPP version is required.
 
         :param msg: Message to process.
@@ -249,12 +258,41 @@ class ExecutionChecker(ResultVisitor):
         """
 
         if msg.message.count("return STDOUT Version:"):
-            self._version = str(re.search(self.REGEX_VERSION, msg.message).
+            self._version = str(re.search(self.REGEX_VERSION_VPP, msg.message).
                                 group(2))
             self._data["metadata"]["version"] = self._version
-            self._data["metadata"]["generated"] = msg.timestamp
             self._msg_type = None
 
+    def _get_dpdk_version(self, msg):
+        """Called when extraction of DPDK version is required.
+
+        :param msg: Message to process.
+        :type msg: Message
+        :returns: Nothing.
+        """
+
+        if msg.message.count("return STDOUT testpmd"):
+            try:
+                self._version = str(re.search(
+                    self.REGEX_VERSION_DPDK, msg.message). group(4))
+                self._data["metadata"]["version"] = self._version
+            except IndexError:
+                pass
+            finally:
+                self._msg_type = None
+
+    def _get_timestamp(self, msg):
+        """Called when extraction of timestamp is required.
+
+        :param msg: Message to process.
+        :type msg: Message
+        :returns: Nothing.
+        """
+
+        self._timestamp = msg.timestamp[:14]
+        self._data["metadata"]["generated"] = self._timestamp
+        self._msg_type = None
+
     def _get_vat_history(self, msg):
         """Called when extraction of VAT command history is required.
 
@@ -556,7 +594,11 @@ class ExecutionChecker(ResultVisitor):
             self._lookup_kw_nr += 1
             self._show_run_lookup_nr = 0
             self._msg_type = "test-show-runtime"
-            test_kw.messages.visit(self)
+        elif test_kw.name.count("Start The L2fwd Test") and not self._version:
+            self._msg_type = "dpdk-version"
+        else:
+            return
+        test_kw.messages.visit(self)
 
     def end_test_kw(self, test_kw):
         """Called when keyword ends. Default implementation does nothing.
@@ -590,8 +632,14 @@ class ExecutionChecker(ResultVisitor):
         """
         if setup_kw.name.count("Show Vpp Version On All Duts") \
                 and not self._version:
-            self._msg_type = "setup-version"
-            setup_kw.messages.visit(self)
+            self._msg_type = "vpp-version"
+
+        elif setup_kw.name.count("Setup performance global Variables") \
+                and not self._timestamp:
+            self._msg_type = "timestamp"
+        else:
+            return
+        setup_kw.messages.visit(self)
 
     def end_setup_kw(self, setup_kw):
         """Called when keyword ends. Default implementation does nothing.
@@ -863,12 +911,10 @@ class InputData(object):
         logging.info("Downloading and parsing input files ...")
 
         work_queue = multiprocessing.JoinableQueue()
-
         manager = multiprocessing.Manager()
-
         data_queue = manager.Queue()
-
         cpus = multiprocessing.cpu_count()
+
         workers = list()
         for cpu in range(cpus):
             worker = Worker(work_queue,
@@ -1008,9 +1054,6 @@ class InputData(object):
         :rtype pandas.Series
         """
 
-        logging.info("    Creating the data set for the {0} '{1}'.".
-                     format(element.get("type", ""), element.get("title", "")))
-
         try:
             if element["filter"] in ("all", "template"):
                 cond = "True"
@@ -1095,44 +1138,3 @@ class InputData(object):
                     merged_data[ID] = item_data
 
         return merged_data
-
-
-class Worker(multiprocessing.Process):
-    """Worker class used to download and process input files in separate
-    parallel processes.
-    """
-
-    def __init__(self, work_queue, data_queue, func):
-        """Initialization.
-
-        :param work_queue: Queue with items to process.
-        :param data_queue: Shared memory between processes. Queue which keeps
-            the result data. This data is then read by the main process and used
-            in further processing.
-        :param func: Function which is executed by the worker.
-        :type work_queue: multiprocessing.JoinableQueue
-        :type data_queue: multiprocessing.Manager().Queue()
-        :type func: Callable object
-        """
-        super(Worker, self).__init__()
-        self._work_queue = work_queue
-        self._data_queue = data_queue
-        self._func = func
-
-    def run(self):
-        """Method representing the process's activity.
-        """
-
-        while True:
-            try:
-                self.process(self._work_queue.get())
-            finally:
-                self._work_queue.task_done()
-
-    def process(self, item_to_process):
-        """Method executed by the runner.
-
-        :param item_to_process: Data to be processed by the function.
-        :type item_to_process: tuple
-        """
-        self._func(self.pid, self._data_queue, *item_to_process)