CSIT-1110: Integrate anomaly detection into PAL
[csit.git] / resources / tools / presentation / new / input_data_parser.py
diff --git a/resources/tools/presentation/new/input_data_parser.py b/resources/tools/presentation/new/input_data_parser.py
new file mode 100644 (file)
index 0000000..beec34c
--- /dev/null
@@ -0,0 +1,1093 @@
+# Copyright (c) 2018 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Data pre-processing
+
+- extract data from output.xml files generated by Jenkins jobs and store in
+  pandas' Series,
+- provide access to the data.
+"""
+
+import multiprocessing
+import os
+import re
+import pandas as pd
+import logging
+
+from robot.api import ExecutionResult, ResultVisitor
+from robot import errors
+from collections import OrderedDict
+from string import replace
+from os import remove
+
+from input_data_files import download_and_unzip_data_file
+from utils import Worker
+
+
+class ExecutionChecker(ResultVisitor):
+    """Class to traverse through the test suite structure.
+
+    The functionality implemented in this class generates a json structure:
+
+    Performance tests:
+
+    {
+        "metadata": {  # Optional
+            "version": "VPP version",
+            "job": "Jenkins job name",
+            "build": "Information about the build"
+        },
+        "suites": {
+            "Suite name 1": {
+                "doc": "Suite 1 documentation",
+                "parent": "Suite 1 parent",
+                "level": "Level of the suite in the suite hierarchy"
+            }
+            "Suite name N": {
+                "doc": "Suite N documentation",
+                "parent": "Suite 2 parent",
+                "level": "Level of the suite in the suite hierarchy"
+            }
+        }
+        "tests": {
+            "ID": {
+                "name": "Test name",
+                "parent": "Name of the parent of the test",
+                "doc": "Test documentation"
+                "msg": "Test message"
+                "tags": ["tag 1", "tag 2", "tag n"],
+                "type": "PDR" | "NDR",
+                "throughput": {
+                    "value": int,
+                    "unit": "pps" | "bps" | "percentage"
+                },
+                "latency": {
+                    "direction1": {
+                        "100": {
+                            "min": int,
+                            "avg": int,
+                            "max": int
+                        },
+                        "50": {  # Only for NDR
+                            "min": int,
+                            "avg": int,
+                            "max": int
+                        },
+                        "10": {  # Only for NDR
+                            "min": int,
+                            "avg": int,
+                            "max": int
+                        }
+                    },
+                    "direction2": {
+                        "100": {
+                            "min": int,
+                            "avg": int,
+                            "max": int
+                        },
+                        "50": {  # Only for NDR
+                            "min": int,
+                            "avg": int,
+                            "max": int
+                        },
+                        "10": {  # Only for NDR
+                            "min": int,
+                            "avg": int,
+                            "max": int
+                        }
+                    }
+                },
+                "lossTolerance": "lossTolerance",  # Only for PDR
+                "vat-history": "DUT1 and DUT2 VAT History"
+                },
+                "show-run": "Show Run"
+            },
+            "ID" {
+                # next test
+            }
+        }
+    }
+
+    Functional tests:
+
+
+    {
+        "metadata": {  # Optional
+            "version": "VPP version",
+            "job": "Jenkins job name",
+            "build": "Information about the build"
+        },
+        "suites": {
+            "Suite name 1": {
+                "doc": "Suite 1 documentation",
+                "parent": "Suite 1 parent",
+                "level": "Level of the suite in the suite hierarchy"
+            }
+            "Suite name N": {
+                "doc": "Suite N documentation",
+                "parent": "Suite 2 parent",
+                "level": "Level of the suite in the suite hierarchy"
+            }
+        }
+        "tests": {
+            "ID": {
+                "name": "Test name",
+                "parent": "Name of the parent of the test",
+                "doc": "Test documentation"
+                "msg": "Test message"
+                "tags": ["tag 1", "tag 2", "tag n"],
+                "vat-history": "DUT1 and DUT2 VAT History"
+                "show-run": "Show Run"
+                "status": "PASS" | "FAIL"
+            },
+            "ID" {
+                # next test
+            }
+        }
+    }
+
+    .. note:: ID is the lowercase full path to the test.
+    """
+
+    REGEX_RATE = re.compile(r'^[\D\d]*FINAL_RATE:\s(\d+\.\d+)\s(\w+)')
+
+    REGEX_LAT_NDR = re.compile(r'^[\D\d]*'
+                               r'LAT_\d+%NDR:\s\[\'(-?\d+\/-?\d+/-?\d+)\','
+                               r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
+                               r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
+                               r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
+                               r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
+                               r'\s\'(-?\d+/-?\d+/-?\d+)\'\]')
+
+    REGEX_LAT_PDR = re.compile(r'^[\D\d]*'
+                               r'LAT_\d+%PDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
+                               r'\s\'(-?\d+/-?\d+/-?\d+)\'\][\D\d]*')
+
+    REGEX_TOLERANCE = re.compile(r'^[\D\d]*LOSS_ACCEPTANCE:\s(\d*\.\d*)\s'
+                                 r'[\D\d]*')
+
+    REGEX_VERSION = re.compile(r"(return STDOUT Version:\s*)(.*)")
+
+    REGEX_TCP = re.compile(r'Total\s(rps|cps|throughput):\s([0-9]*).*$')
+
+    REGEX_MRR = re.compile(r'MaxReceivedRate_Results\s\[pkts/(\d*)sec\]:\s'
+                           r'tx\s(\d*),\srx\s(\d*)')
+
+    def __init__(self, metadata):
+        """Initialisation.
+
+        :param metadata: Key-value pairs to be included in "metadata" part of
+        JSON structure.
+        :type metadata: dict
+        """
+
+        # Type of message to parse out from the test messages
+        self._msg_type = None
+
+        # VPP version
+        self._version = None
+
+        # Number of VAT History messages found:
+        # 0 - no message
+        # 1 - VAT History of DUT1
+        # 2 - VAT History of DUT2
+        self._lookup_kw_nr = 0
+        self._vat_history_lookup_nr = 0
+
+        # Number of Show Running messages found
+        # 0 - no message
+        # 1 - Show run message found
+        self._show_run_lookup_nr = 0
+
+        # Test ID of currently processed test- the lowercase full path to the
+        # test
+        self._test_ID = None
+
+        # The main data structure
+        self._data = {
+            "metadata": OrderedDict(),
+            "suites": OrderedDict(),
+            "tests": OrderedDict()
+        }
+
+        # Save the provided metadata
+        for key, val in metadata.items():
+            self._data["metadata"][key] = val
+
+        # Dictionary defining the methods used to parse different types of
+        # messages
+        self.parse_msg = {
+            "setup-version": self._get_version,
+            "teardown-vat-history": self._get_vat_history,
+            "test-show-runtime": self._get_show_run
+        }
+
+    @property
+    def data(self):
+        """Getter - Data parsed from the XML file.
+
+        :returns: Data parsed from the XML file.
+        :rtype: dict
+        """
+        return self._data
+
+    def _get_version(self, msg):
+        """Called when extraction of VPP version is required.
+
+        :param msg: Message to process.
+        :type msg: Message
+        :returns: Nothing.
+        """
+
+        if msg.message.count("return STDOUT Version:"):
+            self._version = str(re.search(self.REGEX_VERSION, msg.message).
+                                group(2))
+            self._data["metadata"]["version"] = self._version
+            self._data["metadata"]["generated"] = msg.timestamp
+            self._msg_type = None
+
+    def _get_vat_history(self, msg):
+        """Called when extraction of VAT command history is required.
+
+        :param msg: Message to process.
+        :type msg: Message
+        :returns: Nothing.
+        """
+        if msg.message.count("VAT command history:"):
+            self._vat_history_lookup_nr += 1
+            if self._vat_history_lookup_nr == 1:
+                self._data["tests"][self._test_ID]["vat-history"] = str()
+            else:
+                self._msg_type = None
+            text = re.sub("[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3} "
+                          "VAT command history:", "", msg.message, count=1). \
+                replace("\n\n", "\n").replace('\n', ' |br| ').\
+                replace('\r', '').replace('"', "'")
+
+            self._data["tests"][self._test_ID]["vat-history"] += " |br| "
+            self._data["tests"][self._test_ID]["vat-history"] += \
+                "**DUT" + str(self._vat_history_lookup_nr) + ":** " + text
+
+    def _get_show_run(self, msg):
+        """Called when extraction of VPP operational data (output of CLI command
+        Show Runtime) is required.
+
+        :param msg: Message to process.
+        :type msg: Message
+        :returns: Nothing.
+        """
+        if msg.message.count("return STDOUT Thread "):
+            self._show_run_lookup_nr += 1
+            if self._lookup_kw_nr == 1 and self._show_run_lookup_nr == 1:
+                self._data["tests"][self._test_ID]["show-run"] = str()
+            if self._lookup_kw_nr > 1:
+                self._msg_type = None
+            if self._show_run_lookup_nr == 1:
+                text = msg.message.replace("vat# ", "").\
+                    replace("return STDOUT ", "").replace("\n\n", "\n").\
+                    replace('\n', ' |br| ').\
+                    replace('\r', '').replace('"', "'")
+                try:
+                    self._data["tests"][self._test_ID]["show-run"] += " |br| "
+                    self._data["tests"][self._test_ID]["show-run"] += \
+                        "**DUT" + str(self._lookup_kw_nr) + ":** |br| " + text
+                except KeyError:
+                    pass
+
+    def _get_latency(self, msg, test_type):
+        """Get the latency data from the test message.
+
+        :param msg: Message to be parsed.
+        :param test_type: Type of the test - NDR or PDR.
+        :type msg: str
+        :type test_type: str
+        :returns: Latencies parsed from the message.
+        :rtype: dict
+        """
+
+        if test_type == "NDR":
+            groups = re.search(self.REGEX_LAT_NDR, msg)
+            groups_range = range(1, 7)
+        elif test_type == "PDR":
+            groups = re.search(self.REGEX_LAT_PDR, msg)
+            groups_range = range(1, 3)
+        else:
+            return {}
+
+        latencies = list()
+        for idx in groups_range:
+            try:
+                lat = [int(item) for item in str(groups.group(idx)).split('/')]
+            except (AttributeError, ValueError):
+                lat = [-1, -1, -1]
+            latencies.append(lat)
+
+        keys = ("min", "avg", "max")
+        latency = {
+            "direction1": {
+            },
+            "direction2": {
+            }
+        }
+
+        latency["direction1"]["100"] = dict(zip(keys, latencies[0]))
+        latency["direction2"]["100"] = dict(zip(keys, latencies[1]))
+        if test_type == "NDR":
+            latency["direction1"]["50"] = dict(zip(keys, latencies[2]))
+            latency["direction2"]["50"] = dict(zip(keys, latencies[3]))
+            latency["direction1"]["10"] = dict(zip(keys, latencies[4]))
+            latency["direction2"]["10"] = dict(zip(keys, latencies[5]))
+
+        return latency
+
+    def visit_suite(self, suite):
+        """Implements traversing through the suite and its direct children.
+
+        :param suite: Suite to process.
+        :type suite: Suite
+        :returns: Nothing.
+        """
+        if self.start_suite(suite) is not False:
+            suite.suites.visit(self)
+            suite.tests.visit(self)
+            self.end_suite(suite)
+
+    def start_suite(self, suite):
+        """Called when suite starts.
+
+        :param suite: Suite to process.
+        :type suite: Suite
+        :returns: Nothing.
+        """
+
+        try:
+            parent_name = suite.parent.name
+        except AttributeError:
+            return
+
+        doc_str = suite.doc.replace('"', "'").replace('\n', ' ').\
+            replace('\r', '').replace('*[', ' |br| *[').replace("*", "**")
+        doc_str = replace(doc_str, ' |br| *[', '*[', maxreplace=1)
+
+        self._data["suites"][suite.longname.lower().replace('"', "'").
+            replace(" ", "_")] = {
+                "name": suite.name.lower(),
+                "doc": doc_str,
+                "parent": parent_name,
+                "level": len(suite.longname.split("."))
+            }
+
+        suite.keywords.visit(self)
+
+    def end_suite(self, suite):
+        """Called when suite ends.
+
+        :param suite: Suite to process.
+        :type suite: Suite
+        :returns: Nothing.
+        """
+        pass
+
+    def visit_test(self, test):
+        """Implements traversing through the test.
+
+        :param test: Test to process.
+        :type test: Test
+        :returns: Nothing.
+        """
+        if self.start_test(test) is not False:
+            test.keywords.visit(self)
+            self.end_test(test)
+
+    def start_test(self, test):
+        """Called when test starts.
+
+        :param test: Test to process.
+        :type test: Test
+        :returns: Nothing.
+        """
+
+        tags = [str(tag) for tag in test.tags]
+        test_result = dict()
+        test_result["name"] = test.name.lower()
+        test_result["parent"] = test.parent.name.lower()
+        test_result["tags"] = tags
+        doc_str = test.doc.replace('"', "'").replace('\n', ' '). \
+            replace('\r', '').replace('[', ' |br| [')
+        test_result["doc"] = replace(doc_str, ' |br| [', '[', maxreplace=1)
+        test_result["msg"] = test.message.replace('\n', ' |br| '). \
+            replace('\r', '').replace('"', "'")
+        if test.status == "PASS" and ("NDRPDRDISC" in tags or
+                                      "TCP" in tags or
+                                      "MRR" in tags):
+            if "NDRDISC" in tags:
+                test_type = "NDR"
+            elif "PDRDISC" in tags:
+                test_type = "PDR"
+            elif "TCP" in tags:
+                test_type = "TCP"
+            elif "MRR" in tags:
+                test_type = "MRR"
+            else:
+                return
+
+            test_result["type"] = test_type
+
+            if test_type in ("NDR", "PDR"):
+                try:
+                    rate_value = str(re.search(
+                        self.REGEX_RATE, test.message).group(1))
+                except AttributeError:
+                    rate_value = "-1"
+                try:
+                    rate_unit = str(re.search(
+                        self.REGEX_RATE, test.message).group(2))
+                except AttributeError:
+                    rate_unit = "-1"
+
+                test_result["throughput"] = dict()
+                test_result["throughput"]["value"] = \
+                    int(rate_value.split('.')[0])
+                test_result["throughput"]["unit"] = rate_unit
+                test_result["latency"] = \
+                    self._get_latency(test.message, test_type)
+                if test_type == "PDR":
+                    test_result["lossTolerance"] = str(re.search(
+                        self.REGEX_TOLERANCE, test.message).group(1))
+
+            elif test_type in ("TCP", ):
+                groups = re.search(self.REGEX_TCP, test.message)
+                test_result["result"] = dict()
+                test_result["result"]["value"] = int(groups.group(2))
+                test_result["result"]["unit"] = groups.group(1)
+            elif test_type in ("MRR", ):
+                groups = re.search(self.REGEX_MRR, test.message)
+                test_result["result"] = dict()
+                test_result["result"]["duration"] = int(groups.group(1))
+                test_result["result"]["tx"] = int(groups.group(2))
+                test_result["result"]["rx"] = int(groups.group(3))
+                test_result["result"]["throughput"] = int(
+                    test_result["result"]["rx"] /
+                    test_result["result"]["duration"])
+        else:
+            test_result["status"] = test.status
+
+        self._test_ID = test.longname.lower()
+        self._data["tests"][self._test_ID] = test_result
+
+    def end_test(self, test):
+        """Called when test ends.
+
+        :param test: Test to process.
+        :type test: Test
+        :returns: Nothing.
+        """
+        pass
+
+    def visit_keyword(self, keyword):
+        """Implements traversing through the keyword and its child keywords.
+
+        :param keyword: Keyword to process.
+        :type keyword: Keyword
+        :returns: Nothing.
+        """
+        if self.start_keyword(keyword) is not False:
+            self.end_keyword(keyword)
+
+    def start_keyword(self, keyword):
+        """Called when keyword starts. Default implementation does nothing.
+
+        :param keyword: Keyword to process.
+        :type keyword: Keyword
+        :returns: Nothing.
+        """
+        try:
+            if keyword.type == "setup":
+                self.visit_setup_kw(keyword)
+            elif keyword.type == "teardown":
+                self._lookup_kw_nr = 0
+                self.visit_teardown_kw(keyword)
+            else:
+                self._lookup_kw_nr = 0
+                self.visit_test_kw(keyword)
+        except AttributeError:
+            pass
+
+    def end_keyword(self, keyword):
+        """Called when keyword ends. Default implementation does nothing.
+
+        :param keyword: Keyword to process.
+        :type keyword: Keyword
+        :returns: Nothing.
+        """
+        pass
+
+    def visit_test_kw(self, test_kw):
+        """Implements traversing through the test keyword and its child
+        keywords.
+
+        :param test_kw: Keyword to process.
+        :type test_kw: Keyword
+        :returns: Nothing.
+        """
+        for keyword in test_kw.keywords:
+            if self.start_test_kw(keyword) is not False:
+                self.visit_test_kw(keyword)
+                self.end_test_kw(keyword)
+
+    def start_test_kw(self, test_kw):
+        """Called when test keyword starts. Default implementation does
+        nothing.
+
+        :param test_kw: Keyword to process.
+        :type test_kw: Keyword
+        :returns: Nothing.
+        """
+        if test_kw.name.count("Show Runtime Counters On All Duts"):
+            self._lookup_kw_nr += 1
+            self._show_run_lookup_nr = 0
+            self._msg_type = "test-show-runtime"
+            test_kw.messages.visit(self)
+
+    def end_test_kw(self, test_kw):
+        """Called when keyword ends. Default implementation does nothing.
+
+        :param test_kw: Keyword to process.
+        :type test_kw: Keyword
+        :returns: Nothing.
+        """
+        pass
+
+    def visit_setup_kw(self, setup_kw):
+        """Implements traversing through the teardown keyword and its child
+        keywords.
+
+        :param setup_kw: Keyword to process.
+        :type setup_kw: Keyword
+        :returns: Nothing.
+        """
+        for keyword in setup_kw.keywords:
+            if self.start_setup_kw(keyword) is not False:
+                self.visit_setup_kw(keyword)
+                self.end_setup_kw(keyword)
+
+    def start_setup_kw(self, setup_kw):
+        """Called when teardown keyword starts. Default implementation does
+        nothing.
+
+        :param setup_kw: Keyword to process.
+        :type setup_kw: Keyword
+        :returns: Nothing.
+        """
+        if setup_kw.name.count("Show Vpp Version On All Duts") \
+                and not self._version:
+            self._msg_type = "setup-version"
+            setup_kw.messages.visit(self)
+
+    def end_setup_kw(self, setup_kw):
+        """Called when keyword ends. Default implementation does nothing.
+
+        :param setup_kw: Keyword to process.
+        :type setup_kw: Keyword
+        :returns: Nothing.
+        """
+        pass
+
+    def visit_teardown_kw(self, teardown_kw):
+        """Implements traversing through the teardown keyword and its child
+        keywords.
+
+        :param teardown_kw: Keyword to process.
+        :type teardown_kw: Keyword
+        :returns: Nothing.
+        """
+        for keyword in teardown_kw.keywords:
+            if self.start_teardown_kw(keyword) is not False:
+                self.visit_teardown_kw(keyword)
+                self.end_teardown_kw(keyword)
+
+    def start_teardown_kw(self, teardown_kw):
+        """Called when teardown keyword starts. Default implementation does
+        nothing.
+
+        :param teardown_kw: Keyword to process.
+        :type teardown_kw: Keyword
+        :returns: Nothing.
+        """
+
+        if teardown_kw.name.count("Show Vat History On All Duts"):
+            self._vat_history_lookup_nr = 0
+            self._msg_type = "teardown-vat-history"
+            teardown_kw.messages.visit(self)
+
+    def end_teardown_kw(self, teardown_kw):
+        """Called when keyword ends. Default implementation does nothing.
+
+        :param teardown_kw: Keyword to process.
+        :type teardown_kw: Keyword
+        :returns: Nothing.
+        """
+        pass
+
+    def visit_message(self, msg):
+        """Implements visiting the message.
+
+        :param msg: Message to process.
+        :type msg: Message
+        :returns: Nothing.
+        """
+        if self.start_message(msg) is not False:
+            self.end_message(msg)
+
+    def start_message(self, msg):
+        """Called when message starts. Get required information from messages:
+        - VPP version.
+
+        :param msg: Message to process.
+        :type msg: Message
+        :returns: Nothing.
+        """
+
+        if self._msg_type:
+            self.parse_msg[self._msg_type](msg)
+
+    def end_message(self, msg):
+        """Called when message ends. Default implementation does nothing.
+
+        :param msg: Message to process.
+        :type msg: Message
+        :returns: Nothing.
+        """
+        pass
+
+
+class InputData(object):
+    """Input data
+
+    The data is extracted from output.xml files generated by Jenkins jobs and
+    stored in pandas' DataFrames.
+
+    The data structure:
+    - job name
+      - build number
+        - metadata
+          - job
+          - build
+          - vpp version
+        - suites
+        - tests
+          - ID: test data (as described in ExecutionChecker documentation)
+    """
+
+    def __init__(self, spec):
+        """Initialization.
+
+        :param spec: Specification.
+        :type spec: Specification
+        """
+
+        # Specification:
+        self._cfg = spec
+
+        # Data store:
+        self._input_data = pd.Series()
+
+    @property
+    def data(self):
+        """Getter - Input data.
+
+        :returns: Input data
+        :rtype: pandas.Series
+        """
+        return self._input_data
+
+    def metadata(self, job, build):
+        """Getter - metadata
+
+        :param job: Job which metadata we want.
+        :param build: Build which metadata we want.
+        :type job: str
+        :type build: str
+        :returns: Metadata
+        :rtype: pandas.Series
+        """
+
+        return self.data[job][build]["metadata"]
+
+    def suites(self, job, build):
+        """Getter - suites
+
+        :param job: Job which suites we want.
+        :param build: Build which suites we want.
+        :type job: str
+        :type build: str
+        :returns: Suites.
+        :rtype: pandas.Series
+        """
+
+        return self.data[job][str(build)]["suites"]
+
+    def tests(self, job, build):
+        """Getter - tests
+
+        :param job: Job which tests we want.
+        :param build: Build which tests we want.
+        :type job: str
+        :type build: str
+        :returns: Tests.
+        :rtype: pandas.Series
+        """
+
+        return self.data[job][build]["tests"]
+
+    @staticmethod
+    def _parse_tests(job, build, log):
+        """Process data from robot output.xml file and return JSON structured
+        data.
+
+        :param job: The name of job which build output data will be processed.
+        :param build: The build which output data will be processed.
+        :param log: List of log messages.
+        :type job: str
+        :type build: dict
+        :type log: list of tuples (severity, msg)
+        :returns: JSON data structure.
+        :rtype: dict
+        """
+
+        metadata = {
+            "job": job,
+            "build": build
+        }
+
+        with open(build["file-name"], 'r') as data_file:
+            try:
+                result = ExecutionResult(data_file)
+            except errors.DataError as err:
+                log.append(("ERROR", "Error occurred while parsing output.xml: "
+                                     "{0}".format(err)))
+                return None
+        checker = ExecutionChecker(metadata)
+        result.visit(checker)
+
+        return checker.data
+
+    def _download_and_parse_build(self, pid, data_queue, job, build, repeat):
+        """Download and parse the input data file.
+
+        :param pid: PID of the process executing this method.
+        :param data_queue: Shared memory between processes. Queue which keeps
+            the result data. This data is then read by the main process and used
+            in further processing.
+        :param job: Name of the Jenkins job which generated the processed input
+            file.
+        :param build: Information about the Jenkins build which generated the
+            processed input file.
+        :param repeat: Repeat the download specified number of times if not
+            successful.
+        :type pid: int
+        :type data_queue: multiprocessing.Manager().Queue()
+        :type job: str
+        :type build: dict
+        :type repeat: int
+        """
+
+        logs = list()
+
+        logging.info("  Processing the job/build: {0}: {1}".
+                     format(job, build["build"]))
+
+        logs.append(("INFO", "  Processing the job/build: {0}: {1}".
+                     format(job, build["build"])))
+
+        state = "failed"
+        success = False
+        data = None
+        do_repeat = repeat
+        while do_repeat:
+            success = download_and_unzip_data_file(self._cfg, job, build, pid,
+                                                   logs)
+            if success:
+                break
+            do_repeat -= 1
+        if not success:
+            logs.append(("ERROR", "It is not possible to download the input "
+                                  "data file from the job '{job}', build "
+                                  "'{build}', or it is damaged. Skipped.".
+                         format(job=job, build=build["build"])))
+        if success:
+            logs.append(("INFO", "  Processing data from the build '{0}' ...".
+                         format(build["build"])))
+            data = InputData._parse_tests(job, build, logs)
+            if data is None:
+                logs.append(("ERROR", "Input data file from the job '{job}', "
+                                      "build '{build}' is damaged. Skipped.".
+                             format(job=job, build=build["build"])))
+            else:
+                state = "processed"
+
+            try:
+                remove(build["file-name"])
+            except OSError as err:
+                logs.append(("ERROR", "Cannot remove the file '{0}': {1}".
+                             format(build["file-name"], err)))
+        logs.append(("INFO", "  Done."))
+
+        result = {
+            "data": data,
+            "state": state,
+            "job": job,
+            "build": build,
+            "logs": logs
+        }
+        data_queue.put(result)
+
+    def download_and_parse_data(self, repeat=1):
+        """Download the input data files, parse input data from input files and
+        store in pandas' Series.
+
+        :param repeat: Repeat the download specified number of times if not
+            successful.
+        :type repeat: int
+        """
+
+        logging.info("Downloading and parsing input files ...")
+
+        work_queue = multiprocessing.JoinableQueue()
+        manager = multiprocessing.Manager()
+        data_queue = manager.Queue()
+        cpus = multiprocessing.cpu_count()
+
+        workers = list()
+        for cpu in range(cpus):
+            worker = Worker(work_queue,
+                            data_queue,
+                            self._download_and_parse_build)
+            worker.daemon = True
+            worker.start()
+            workers.append(worker)
+            os.system("taskset -p -c {0} {1} > /dev/null 2>&1".
+                      format(cpu, worker.pid))
+
+        for job, builds in self._cfg.builds.items():
+            for build in builds:
+                work_queue.put((job, build, repeat))
+
+        work_queue.join()
+
+        logging.info("Done.")
+
+        while not data_queue.empty():
+            result = data_queue.get()
+
+            job = result["job"]
+            build_nr = result["build"]["build"]
+
+            if result["data"]:
+                data = result["data"]
+                build_data = pd.Series({
+                    "metadata": pd.Series(data["metadata"].values(),
+                                          index=data["metadata"].keys()),
+                    "suites": pd.Series(data["suites"].values(),
+                                        index=data["suites"].keys()),
+                    "tests": pd.Series(data["tests"].values(),
+                                       index=data["tests"].keys())})
+
+                if self._input_data.get(job, None) is None:
+                    self._input_data[job] = pd.Series()
+                self._input_data[job][str(build_nr)] = build_data
+
+                self._cfg.set_input_file_name(job, build_nr,
+                                              result["build"]["file-name"])
+
+            self._cfg.set_input_state(job, build_nr, result["state"])
+
+            for item in result["logs"]:
+                if item[0] == "INFO":
+                    logging.info(item[1])
+                elif item[0] == "ERROR":
+                    logging.error(item[1])
+                elif item[0] == "DEBUG":
+                    logging.debug(item[1])
+                elif item[0] == "CRITICAL":
+                    logging.critical(item[1])
+                elif item[0] == "WARNING":
+                    logging.warning(item[1])
+
+        del data_queue
+
+        # Terminate all workers
+        for worker in workers:
+            worker.terminate()
+            worker.join()
+
+        logging.info("Done.")
+
+    @staticmethod
+    def _end_of_tag(tag_filter, start=0, closer="'"):
+        """Return the index of character in the string which is the end of tag.
+
+        :param tag_filter: The string where the end of tag is being searched.
+        :param start: The index where the searching is stated.
+        :param closer: The character which is the tag closer.
+        :type tag_filter: str
+        :type start: int
+        :type closer: str
+        :returns: The index of the tag closer.
+        :rtype: int
+        """
+
+        try:
+            idx_opener = tag_filter.index(closer, start)
+            return tag_filter.index(closer, idx_opener + 1)
+        except ValueError:
+            return None
+
+    @staticmethod
+    def _condition(tag_filter):
+        """Create a conditional statement from the given tag filter.
+
+        :param tag_filter: Filter based on tags from the element specification.
+        :type tag_filter: str
+        :returns: Conditional statement which can be evaluated.
+        :rtype: str
+        """
+
+        index = 0
+        while True:
+            index = InputData._end_of_tag(tag_filter, index)
+            if index is None:
+                return tag_filter
+            index += 1
+            tag_filter = tag_filter[:index] + " in tags" + tag_filter[index:]
+
+    def filter_data(self, element, params=None, data_set="tests",
+                    continue_on_error=False):
+        """Filter required data from the given jobs and builds.
+
+        The output data structure is:
+
+        - job 1
+          - build 1
+            - test (suite) 1 ID:
+              - param 1
+              - param 2
+              ...
+              - param n
+            ...
+            - test (suite) n ID:
+            ...
+          ...
+          - build n
+        ...
+        - job n
+
+        :param element: Element which will use the filtered data.
+        :param params: Parameters which will be included in the output. If None,
+        all parameters are included.
+        :param data_set: The set of data to be filtered: tests, suites,
+        metadata.
+        :param continue_on_error: Continue if there is error while reading the
+        data. The Item will be empty then
+        :type element: pandas.Series
+        :type params: list
+        :type data_set: str
+        :type continue_on_error: bool
+        :returns: Filtered data.
+        :rtype pandas.Series
+        """
+
+        try:
+            if element["filter"] in ("all", "template"):
+                cond = "True"
+            else:
+                cond = InputData._condition(element["filter"])
+            logging.debug("   Filter: {0}".format(cond))
+        except KeyError:
+            logging.error("  No filter defined.")
+            return None
+
+        if params is None:
+            params = element.get("parameters", None)
+
+        data = pd.Series()
+        try:
+            for job, builds in element["data"].items():
+                data[job] = pd.Series()
+                for build in builds:
+                    data[job][str(build)] = pd.Series()
+                    try:
+                        data_iter = self.data[job][str(build)][data_set].\
+                            iteritems()
+                    except KeyError:
+                        if continue_on_error:
+                            continue
+                        else:
+                            return None
+                    for test_ID, test_data in data_iter:
+                        if eval(cond, {"tags": test_data.get("tags", "")}):
+                            data[job][str(build)][test_ID] = pd.Series()
+                            if params is None:
+                                for param, val in test_data.items():
+                                    data[job][str(build)][test_ID][param] = val
+                            else:
+                                for param in params:
+                                    try:
+                                        data[job][str(build)][test_ID][param] =\
+                                            test_data[param]
+                                    except KeyError:
+                                        data[job][str(build)][test_ID][param] =\
+                                            "No Data"
+            return data
+
+        except (KeyError, IndexError, ValueError) as err:
+            logging.error("   Missing mandatory parameter in the element "
+                          "specification: {0}".format(err))
+            return None
+        except AttributeError:
+            return None
+        except SyntaxError:
+            logging.error("   The filter '{0}' is not correct. Check if all "
+                          "tags are enclosed by apostrophes.".format(cond))
+            return None
+
+    @staticmethod
+    def merge_data(data):
+        """Merge data from more jobs and builds to a simple data structure.
+
+        The output data structure is:
+
+        - test (suite) 1 ID:
+          - param 1
+          - param 2
+          ...
+          - param n
+        ...
+        - test (suite) n ID:
+        ...
+
+        :param data: Data to merge.
+        :type data: pandas.Series
+        :returns: Merged data.
+        :rtype: pandas.Series
+        """
+
+        logging.info("    Merging data ...")
+
+        merged_data = pd.Series()
+        for _, builds in data.iteritems():
+            for _, item in builds.iteritems():
+                for ID, item_data in item.iteritems():
+                    merged_data[ID] = item_data
+
+        return merged_data