CSIT-1101: Optimize input data processing 62/12662/10
authorTibor Frank <tifrank@cisco.com>
Mon, 21 May 2018 06:50:42 +0000 (08:50 +0200)
committerTibor Frank <tifrank@cisco.com>
Thu, 24 May 2018 11:47:06 +0000 (11:47 +0000)
 - Add multiprocessing

Change-Id: Iaeb9dcf0f4523de379ecb7f775b941d9deb080b2
Signed-off-by: Tibor Frank <tifrank@cisco.com>
docs/report/index.rst
docs/report/test_configuration/vpp_functional_configuration/index.rst
resources/tools/presentation/input_data_files.py
resources/tools/presentation/input_data_parser.py
resources/tools/presentation/pal.py

index 93946ba..696fdcc 100644 (file)
@@ -13,4 +13,4 @@ CSIT 18.04
     detailed_test_results/index
     test_configuration/index
     test_operational_data/index
     detailed_test_results/index
     test_configuration/index
     test_operational_data/index
-    csit_framework_documentation/index
\ No newline at end of file
+    csit_framework_documentation/index
index aab94bd..36f72eb 100644 (file)
@@ -1,5 +1,5 @@
-VPP Functional Test Configs
-===========================
+VPP Functional Test Configs - Ubuntu
+====================================
 
 .. note::
 
 
 .. note::
 
index 9e0cfa6..cde6d1a 100644 (file)
@@ -16,11 +16,9 @@ Download all data.
 """
 
 import re
 """
 
 import re
-import logging
 
 
-from os import rename
+from os import rename, mkdir
 from os.path import join
 from os.path import join
-from shutil import move
 from zipfile import ZipFile, is_zipfile, BadZipfile
 from httplib import responses
 from requests import get, codes, RequestException, Timeout, TooManyRedirects, \
 from zipfile import ZipFile, is_zipfile, BadZipfile
 from httplib import responses
 from requests import get, codes, RequestException, Timeout, TooManyRedirects, \
@@ -38,31 +36,33 @@ SEPARATOR = "__"
 REGEX_RELEASE = re.compile(r'(\D*)(\d{4}|master)(\D*)')
 
 
 REGEX_RELEASE = re.compile(r'(\D*)(\d{4}|master)(\D*)')
 
 
-def _download_file(url, file_name):
+def _download_file(url, file_name, log):
     """Download a file with input data.
 
     :param url: URL to the file to download.
     :param file_name: Name of file to download.
     """Download a file with input data.
 
     :param url: URL to the file to download.
     :param file_name: Name of file to download.
+    :param log: List of log messages.
     :type url: str
     :type file_name: str
     :type url: str
     :type file_name: str
+    :type log: list of tuples (severity, msg)
     :returns: True if the download was successful, otherwise False.
     :rtype: bool
     """
 
     success = False
     try:
     :returns: True if the download was successful, otherwise False.
     :rtype: bool
     """
 
     success = False
     try:
-        logging.info("      Connecting to '{0}' ...".format(url))
+        log.append(("INFO", "    Connecting to '{0}' ...".format(url)))
 
         response = get(url, stream=True)
         code = response.status_code
 
 
         response = get(url, stream=True)
         code = response.status_code
 
-        logging.info("      {0}: {1}".format(code, responses[code]))
+        log.append(("INFO", "    {0}: {1}".format(code, responses[code])))
 
         if code != codes["OK"]:
             return False
 
 
         if code != codes["OK"]:
             return False
 
-        logging.info("      Downloading the file '{0}' to '{1}' ...".
-                     format(url, file_name))
+        log.append(("INFO", "    Downloading the file '{0}' to '{1}' ...".
+                    format(url, file_name)))
 
         file_handle = open(file_name, "wb")
         for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
 
         file_handle = open(file_name, "wb")
         for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
@@ -71,38 +71,38 @@ def _download_file(url, file_name):
         file_handle.close()
         success = True
     except ConnectionError as err:
         file_handle.close()
         success = True
     except ConnectionError as err:
-        logging.error("Not possible to connect to '{0}'.".format(url))
-        logging.debug(str(err))
+        log.append(("ERROR", "Not possible to connect to '{0}'.".format(url)))
+        log.append(("DEBUG", str(err)))
     except HTTPError as err:
     except HTTPError as err:
-        logging.error("Invalid HTTP response from '{0}'.".format(url))
-        logging.debug(str(err))
+        log.append(("ERROR", "Invalid HTTP response from '{0}'.".format(url)))
+        log.append(("DEBUG", str(err)))
     except TooManyRedirects as err:
     except TooManyRedirects as err:
-        logging.error("Request exceeded the configured number "
-                      "of maximum re-directions.")
-        logging.debug(str(err))
+        log.append(("ERROR", "Request exceeded the configured number "
+                             "of maximum re-directions."))
+        log.append(("DEBUG", str(err)))
     except Timeout as err:
     except Timeout as err:
-        logging.error("Request timed out.")
-        logging.debug(str(err))
+        log.append(("ERROR", "Request timed out."))
+        log.append(("DEBUG", str(err)))
     except RequestException as err:
     except RequestException as err:
-        logging.error("Unexpected HTTP request exception.")
-        logging.debug(str(err))
+        log.append(("ERROR", "Unexpected HTTP request exception."))
+        log.append(("DEBUG", str(err)))
     except (IOError, ValueError, KeyError) as err:
     except (IOError, ValueError, KeyError) as err:
-        logging.error("Download failed.")
-        logging.debug(str(err))
+        log.append(("ERROR", "Download failed."))
+        log.append(("DEBUG", str(err)))
 
 
-    logging.info("      Download finished.")
+    log.append(("INFO", "    Download finished."))
     return success
 
 
     return success
 
 
-def _unzip_file(spec, job, build):
+def _unzip_file(spec, build, pid, log):
     """Unzip downloaded source file.
 
     :param spec: Specification read form the specification file.
     """Unzip downloaded source file.
 
     :param spec: Specification read form the specification file.
-    :param job: Name of the Jenkins job.
     :param build: Information about the build.
     :param build: Information about the build.
+    :param log: List of log messages.
     :type spec: Specification
     :type spec: Specification
-    :type job: str
     :type build: dict
     :type build: dict
+    :type log: list of tuples (severity, msg)
     :returns: True if the download was successful, otherwise False.
     :rtype: bool
     """
     :returns: True if the download was successful, otherwise False.
     :rtype: bool
     """
@@ -110,44 +110,48 @@ def _unzip_file(spec, job, build):
     data_file = spec.input["extract"]
     file_name = build["file-name"]
     directory = spec.environment["paths"]["DIR[WORKING,DATA]"]
     data_file = spec.input["extract"]
     file_name = build["file-name"]
     directory = spec.environment["paths"]["DIR[WORKING,DATA]"]
+    tmp_dir = join(directory, str(pid))
+    try:
+        mkdir(tmp_dir)
+    except OSError:
+        pass
     new_name = "{0}{1}{2}".format(file_name.rsplit('.')[-2],
                                   SEPARATOR,
                                   data_file.split("/")[-1])
     new_name = "{0}{1}{2}".format(file_name.rsplit('.')[-2],
                                   SEPARATOR,
                                   data_file.split("/")[-1])
-    logging.info("      Unzipping: '{0}' from '{1}'.".
-                 format(data_file, file_name))
+
+    log.append(("INFO", "    Unzipping: '{0}' from '{1}'.".
+                format(data_file, file_name)))
     try:
         with ZipFile(file_name, 'r') as zip_file:
     try:
         with ZipFile(file_name, 'r') as zip_file:
-            zip_file.extract(data_file, directory)
-        logging.info("      Moving {0} to {1} ...".
-                     format(join(directory, data_file), directory))
-        move(join(directory, data_file), directory)
-        logging.info("      Renaming the file '{0}' to '{1}'".
-                     format(join(directory, data_file.split("/")[-1]),
-                            new_name))
-        rename(join(directory, data_file.split("/")[-1]),
-               new_name)
-        spec.set_input_file_name(job, build["build"],
-                                 new_name)
+            zip_file.extract(data_file, tmp_dir)
+        log.append(("INFO", "    Renaming the file '{0}' to '{1}'".
+                    format(join(tmp_dir, data_file), new_name)))
+        rename(join(tmp_dir, data_file), new_name)
+        build["file-name"] = new_name
         return True
     except (BadZipfile, RuntimeError) as err:
         return True
     except (BadZipfile, RuntimeError) as err:
-        logging.error("Failed to unzip the file '{0}': {1}.".
-                      format(file_name, str(err)))
+        log.append(("ERROR", "Failed to unzip the file '{0}': {1}.".
+                    format(file_name, str(err))))
         return False
     except OSError as err:
         return False
     except OSError as err:
-        logging.error("Failed to rename the file '{0}': {1}.".
-                      format(data_file, str(err)))
+        log.append(("ERROR", "Failed to rename the file '{0}': {1}.".
+                    format(data_file, str(err))))
         return False
 
 
         return False
 
 
-def download_and_unzip_data_file(spec, job, build):
+def download_and_unzip_data_file(spec, job, build, pid, log):
     """Download and unzip a source file.
 
     :param spec: Specification read form the specification file.
     :param job: Name of the Jenkins job.
     :param build: Information about the build.
     """Download and unzip a source file.
 
     :param spec: Specification read form the specification file.
     :param job: Name of the Jenkins job.
     :param build: Information about the build.
+    :param pid: PID of the process executing this method.
+    :param log: List of log messages.
     :type spec: Specification
     :type job: str
     :type build: dict
     :type spec: Specification
     :type job: str
     :type build: dict
+    :type pid: int
+    :type log: list of tuples (severity, msg)
     :returns: True if the download was successful, otherwise False.
     :rtype: bool
     """
     :returns: True if the download was successful, otherwise False.
     :rtype: bool
     """
@@ -158,7 +162,7 @@ def download_and_unzip_data_file(spec, job, build):
         elif spec.input["file-name"].endswith(".gz"):
             url = spec.environment["urls"]["URL[NEXUS,LOG]"]
         else:
         elif spec.input["file-name"].endswith(".gz"):
             url = spec.environment["urls"]["URL[NEXUS,LOG]"]
         else:
-            logging.error("Not supported file format.")
+            log.append(("ERROR", "Not supported file format."))
             return False
     elif job.startswith("hc2vpp-"):
         url = spec.environment["urls"]["URL[JENKINS,HC]"]
             return False
     elif job.startswith("hc2vpp-"):
         url = spec.environment["urls"]["URL[JENKINS,HC]"]
@@ -174,27 +178,35 @@ def download_and_unzip_data_file(spec, job, build):
                     format(job=job, sep=SEPARATOR, build=build["build"],
                            name=file_name))
     # Download the file from the defined source (Jenkins, logs.fd.io):
                     format(job=job, sep=SEPARATOR, build=build["build"],
                            name=file_name))
     # Download the file from the defined source (Jenkins, logs.fd.io):
-    success = _download_file(url, new_name)
+    success = _download_file(url, new_name, log)
+
+    if success and new_name.endswith(".zip"):
+        if not is_zipfile(new_name):
+            success = False
 
     # If not successful, download from docs.fd.io:
     if not success:
 
     # If not successful, download from docs.fd.io:
     if not success:
-        logging.info("      Trying to download from https://docs.fd.io:")
+        log.append(("INFO", "    Trying to download from https://docs.fd.io:"))
         release = re.search(REGEX_RELEASE, job).group(2)
         release = re.search(REGEX_RELEASE, job).group(2)
-        nexus_file_name = "{job}{sep}{build}{sep}{name}". \
-            format(job=job, sep=SEPARATOR, build=build["build"], name=file_name)
-        try:
-            release = "rls{0}".format(int(release))
-        except ValueError:
-            pass
-        url = "{url}/{release}/{dir}/{file}". \
-            format(url=spec.environment["urls"]["URL[NEXUS]"],
-                   release=release,
-                   dir=spec.environment["urls"]["DIR[NEXUS]"],
-                   file=nexus_file_name)
-        success = _download_file(url, new_name)
+        for rls in (release, "master"):
+            nexus_file_name = "{job}{sep}{build}{sep}{name}". \
+                format(job=job, sep=SEPARATOR, build=build["build"],
+                       name=file_name)
+            try:
+                rls = "rls{0}".format(int(rls))
+            except ValueError:
+                pass
+            url = "{url}/{release}/{dir}/{file}". \
+                format(url=spec.environment["urls"]["URL[NEXUS]"],
+                       release=rls,
+                       dir=spec.environment["urls"]["DIR[NEXUS]"],
+                       file=nexus_file_name)
+            success = _download_file(url, new_name, log)
+            if success:
+                break
 
     if success:
 
     if success:
-        spec.set_input_file_name(job, build["build"], new_name)
+        build["file-name"] = new_name
     else:
         return False
 
     else:
         return False
 
@@ -205,12 +217,14 @@ def download_and_unzip_data_file(spec, job, build):
         else:
             rename(new_name, new_name[:-3])
             execute_command("gzip --keep {0}".format(new_name[:-3]))
         else:
             rename(new_name, new_name[:-3])
             execute_command("gzip --keep {0}".format(new_name[:-3]))
-        spec.set_input_file_name(job, build["build"], new_name[:-3])
+        build["file-name"] = new_name[:-3]
 
     if new_name.endswith(".zip"):
         if is_zipfile(new_name):
 
     if new_name.endswith(".zip"):
         if is_zipfile(new_name):
-            return _unzip_file(spec, job, build)
+            return _unzip_file(spec, build, pid, log)
         else:
         else:
+            log.append(("ERROR",
+                        "Zip file '{0}' is corrupted.".format(new_name)))
             return False
     else:
         return True
             return False
     else:
         return True
index d0f9eed..9428b2c 100644 (file)
 - provide access to the data.
 """
 
 - provide access to the data.
 """
 
+import multiprocessing
+import os
 import re
 import pandas as pd
 import logging
 import re
 import pandas as pd
 import logging
-import xml.etree.ElementTree as ET
 
 from robot.api import ExecutionResult, ResultVisitor
 from robot import errors
 
 from robot.api import ExecutionResult, ResultVisitor
 from robot import errors
@@ -698,7 +699,7 @@ class InputData(object):
         self._cfg = spec
 
         # Data store:
         self._cfg = spec
 
         # Data store:
-        self._input_data = None
+        self._input_data = pd.Series()
 
     @property
     def data(self):
 
     @property
     def data(self):
@@ -749,14 +750,16 @@ class InputData(object):
         return self.data[job][build]["tests"]
 
     @staticmethod
         return self.data[job][build]["tests"]
 
     @staticmethod
-    def _parse_tests(job, build):
+    def _parse_tests(job, build, log):
         """Process data from robot output.xml file and return JSON structured
         data.
 
         :param job: The name of job which build output data will be processed.
         :param build: The build which output data will be processed.
         """Process data from robot output.xml file and return JSON structured
         data.
 
         :param job: The name of job which build output data will be processed.
         :param build: The build which output data will be processed.
+        :param log: List of log messages.
         :type job: str
         :type build: dict
         :type job: str
         :type build: dict
+        :type log: list of tuples (severity, msg)
         :returns: JSON data structure.
         :rtype: dict
         """
         :returns: JSON data structure.
         :rtype: dict
         """
@@ -770,54 +773,129 @@ class InputData(object):
             try:
                 result = ExecutionResult(data_file)
             except errors.DataError as err:
             try:
                 result = ExecutionResult(data_file)
             except errors.DataError as err:
-                logging.error("Error occurred while parsing output.xml: {0}".
-                              format(err))
+                log.append(("ERROR", "Error occurred while parsing output.xml: "
+                                     "{0}".format(err)))
                 return None
         checker = ExecutionChecker(metadata)
         result.visit(checker)
 
         return checker.data
 
                 return None
         checker = ExecutionChecker(metadata)
         result.visit(checker)
 
         return checker.data
 
-    def download_and_parse_data(self):
+    def _download_and_parse_build(self, pid, data_queue, job, build, repeat):
+        """Download and parse the input data file.
+
+        :param pid: PID of the process executing this method.
+        :param data_queue: Shared memory between processes. Queue which keeps
+            the result data. This data is then read by the main process and used
+            in further processing.
+        :param job: Name of the Jenkins job which generated the processed input
+            file.
+        :param build: Information about the Jenkins build which generated the
+            processed input file.
+        :param repeat: Repeat the download specified number of times if not
+            successful.
+        :type pid: int
+        :type data_queue: multiprocessing.Manager().Queue()
+        :type job: str
+        :type build: dict
+        :type repeat: int
+        """
+
+        logs = list()
+
+        logging.info("  Processing the job/build: {0}: {1}".
+                     format(job, build["build"]))
+
+        logs.append(("INFO", "  Processing the job/build: {0}: {1}".
+                     format(job, build["build"])))
+
+        state = "failed"
+        success = False
+        data = None
+        do_repeat = repeat
+        while do_repeat:
+            success = download_and_unzip_data_file(self._cfg, job, build, pid,
+                                                   logs)
+            if success:
+                break
+            do_repeat -= 1
+        if not success:
+            logs.append(("ERROR", "It is not possible to download the input "
+                                  "data file from the job '{job}', build "
+                                  "'{build}', or it is damaged. Skipped.".
+                         format(job=job, build=build["build"])))
+        if success:
+            logs.append(("INFO", "  Processing data from the build '{0}' ...".
+                         format(build["build"])))
+            data = InputData._parse_tests(job, build, logs)
+            if data is None:
+                logs.append(("ERROR", "Input data file from the job '{job}', "
+                                      "build '{build}' is damaged. Skipped.".
+                             format(job=job, build=build["build"])))
+            else:
+                state = "processed"
+
+            try:
+                remove(build["file-name"])
+            except OSError as err:
+                logs.append(("ERROR", "Cannot remove the file '{0}': {1}".
+                             format(build["file-name"], err)))
+        logs.append(("INFO", "  Done."))
+
+        result = {
+            "data": data,
+            "state": state,
+            "job": job,
+            "build": build,
+            "logs": logs
+        }
+        data_queue.put(result)
+
+    def download_and_parse_data(self, repeat=1):
         """Download the input data files, parse input data from input files and
         store in pandas' Series.
         """Download the input data files, parse input data from input files and
         store in pandas' Series.
+
+        :param repeat: Repeat the download specified number of times if not
+            successful.
+        :type repeat: int
         """
 
         logging.info("Downloading and parsing input files ...")
 
         """
 
         logging.info("Downloading and parsing input files ...")
 
-        job_data = dict()
+        work_queue = multiprocessing.JoinableQueue()
+
+        manager = multiprocessing.Manager()
+
+        data_queue = manager.Queue()
+
+        cpus = multiprocessing.cpu_count()
+        workers = list()
+        for cpu in range(cpus):
+            worker = Worker(work_queue,
+                            data_queue,
+                            self._download_and_parse_build)
+            worker.daemon = True
+            worker.start()
+            workers.append(worker)
+            os.system("taskset -p -c {0} {1} > /dev/null 2>&1".
+                      format(cpu, worker.pid))
+
         for job, builds in self._cfg.builds.items():
         for job, builds in self._cfg.builds.items():
-            logging.info("  Processing data from the job '{0}' ...".
-                         format(job))
-            builds_data = dict()
             for build in builds:
             for build in builds:
-                logging.info("    Processing the build '{0}'".
-                             format(build["build"]))
-                self._cfg.set_input_state(job, build["build"], "failed")
-                if not download_and_unzip_data_file(self._cfg, job, build):
-                    logging.error("It is not possible to download the input "
-                                  "data file from the job '{job}', build "
-                                  "'{build}', or it is damaged. Skipped.".
-                                  format(job=job, build=build["build"]))
-                    continue
+                work_queue.put((job, build, repeat))
 
 
-                logging.info("      Processing data from the build '{0}' ...".
-                             format(build["build"]))
-                data = InputData._parse_tests(job, build)
-                if data is None:
-                    logging.error("Input data file from the job '{job}', build "
-                                  "'{build}' is damaged. Skipped.".
-                                  format(job=job, build=build["build"]))
-                    continue
+        work_queue.join()
 
 
-                self._cfg.set_input_state(job, build["build"], "processed")
+        logging.info("Done.")
 
 
-                try:
-                    remove(build["file-name"])
-                except OSError as err:
-                    logging.error("Cannot remove the file '{0}': {1}".
-                                  format(build["file-name"], err))
+        while not data_queue.empty():
+            result = data_queue.get()
 
 
+            job = result["job"]
+            build_nr = result["build"]["build"]
+
+            if result["data"]:
+                data = result["data"]
                 build_data = pd.Series({
                     "metadata": pd.Series(data["metadata"].values(),
                                           index=data["metadata"].keys()),
                 build_data = pd.Series({
                     "metadata": pd.Series(data["metadata"].values(),
                                           index=data["metadata"].keys()),
@@ -825,15 +903,35 @@ class InputData(object):
                                         index=data["suites"].keys()),
                     "tests": pd.Series(data["tests"].values(),
                                        index=data["tests"].keys())})
                                         index=data["suites"].keys()),
                     "tests": pd.Series(data["tests"].values(),
                                        index=data["tests"].keys())})
-                builds_data[str(build["build"])] = build_data
-                build["status"] = "processed"
-                logging.info("    Done.")
 
 
-            job_data[job] = pd.Series(builds_data.values(),
-                                      index=builds_data.keys())
-            logging.info("  Done.")
+                if self._input_data.get(job, None) is None:
+                    self._input_data[job] = pd.Series()
+                self._input_data[job][str(build_nr)] = build_data
+
+                self._cfg.set_input_file_name(job, build_nr,
+                                              result["build"]["file-name"])
+
+            self._cfg.set_input_state(job, build_nr, result["state"])
+
+            for item in result["logs"]:
+                if item[0] == "INFO":
+                    logging.info(item[1])
+                elif item[0] == "ERROR":
+                    logging.error(item[1])
+                elif item[0] == "DEBUG":
+                    logging.debug(item[1])
+                elif item[0] == "CRITICAL":
+                    logging.critical(item[1])
+                elif item[0] == "WARNING":
+                    logging.warning(item[1])
+
+        del data_queue
+
+        # Terminate all workers
+        for worker in workers:
+            worker.terminate()
+            worker.join()
 
 
-        self._input_data = pd.Series(job_data.values(), index=job_data.keys())
         logging.info("Done.")
 
     @staticmethod
         logging.info("Done.")
 
     @staticmethod
@@ -997,3 +1095,44 @@ class InputData(object):
                     merged_data[ID] = item_data
 
         return merged_data
                     merged_data[ID] = item_data
 
         return merged_data
+
+
+class Worker(multiprocessing.Process):
+    """Worker class used to download and process input files in separate
+    parallel processes.
+    """
+
+    def __init__(self, work_queue, data_queue, func):
+        """Initialization.
+
+        :param work_queue: Queue with items to process.
+        :param data_queue: Shared memory between processes. Queue which keeps
+            the result data. This data is then read by the main process and used
+            in further processing.
+        :param func: Function which is executed by the worker.
+        :type work_queue: multiprocessing.JoinableQueue
+        :type data_queue: multiprocessing.Manager().Queue()
+        :type func: Callable object
+        """
+        super(Worker, self).__init__()
+        self._work_queue = work_queue
+        self._data_queue = data_queue
+        self._func = func
+
+    def run(self):
+        """Method representing the process's activity.
+        """
+
+        while True:
+            try:
+                self.process(self._work_queue.get())
+            finally:
+                self._work_queue.task_done()
+
+    def process(self, item_to_process):
+        """Method executed by the runner.
+
+        :param item_to_process: Data to be processed by the function.
+        :type item_to_process: tuple
+        """
+        self._func(self.pid, self._data_queue, *item_to_process)
index 1ccefd3..013c921 100644 (file)
@@ -96,7 +96,7 @@ def main():
         prepare_static_content(spec)
 
         data = InputData(spec)
         prepare_static_content(spec)
 
         data = InputData(spec)
-        data.download_and_parse_data()
+        data.download_and_parse_data(repeat=2)
 
         generate_tables(spec, data)
         generate_plots(spec, data)
 
         generate_tables(spec, data)
         generate_plots(spec, data)