CSIT-1288: Prepare data to be sent by Jenkins
[csit.git] / resources / tools / presentation / utils.py
index 8365bfa..51bb1d0 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2017 Cisco and/or its affiliates.
+# Copyright (c) 2018 Cisco and/or its affiliates.
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
 """General purpose utilities.
 """
 
+import multiprocessing
 import subprocess
 import numpy as np
-import pandas as pd
 import logging
+import csv
+import prettytable
 
 from os import walk, makedirs, environ
 from os.path import join, isdir
-from shutil import copy, Error
+from shutil import move, Error
 from math import sqrt
 
 from errors import PresentationError
+from jumpavg.BitCountingClassifier import BitCountingClassifier
 
 
 def mean(items):
@@ -68,68 +71,12 @@ def relative_change(nr1, nr2):
     return float(((nr2 - nr1) / nr1) * 100)
 
 
-def remove_outliers(input_data, outlier_const):
-    """
-
-    :param input_data: Data from which the outliers will be removed.
-    :param outlier_const: Outlier constant.
-    :type input_data: list
-    :type outlier_const: float
-    :returns: The input list without outliers.
-    :rtype: list
-    """
-
-    data = np.array(input_data)
-    upper_quartile = np.percentile(data, 75)
-    lower_quartile = np.percentile(data, 25)
-    iqr = (upper_quartile - lower_quartile) * outlier_const
-    quartile_set = (lower_quartile - iqr, upper_quartile + iqr)
-    result_lst = list()
-    for y in data.tolist():
-        if quartile_set[0] <= y <= quartile_set[1]:
-            result_lst.append(y)
-    return result_lst
-
-
-def find_outliers(input_data, outlier_const=1.5):
-    """Go through the input data and generate two pandas series:
-    - input data without outliers
-    - outliers.
-    The function uses IQR to detect outliers.
-
-    :param input_data: Data to be examined for outliers.
-    :param outlier_const: Outlier constant.
-    :type input_data: pandas.Series
-    :type outlier_const: float
-    :returns: Tuple: input data with outliers removed; Outliers.
-    :rtype: tuple (trimmed_data, outliers)
-    """
-
-    upper_quartile = input_data.quantile(q=0.75)
-    lower_quartile = input_data.quantile(q=0.25)
-    iqr = (upper_quartile - lower_quartile) * outlier_const
-    low = lower_quartile - iqr
-    high = upper_quartile + iqr
-    trimmed_data = pd.Series()
-    outliers = pd.Series()
-    for item in input_data.items():
-        item_pd = pd.Series([item[1], ], index=[item[0], ])
-        if low <= item[1] <= high:
-            trimmed_data = trimmed_data.append(item_pd)
-        else:
-            trimmed_data = trimmed_data.append(pd.Series([np.nan, ],
-                                                         index=[item[0], ]))
-            outliers = outliers.append(item_pd)
-
-    return trimmed_data, outliers
-
-
 def get_files(path, extension=None, full_path=True):
     """Generates the list of files to process.
 
     :param path: Path to files.
     :param extension: Extension of files to process. If it is the empty string,
-    all files will be processed.
+        all files will be processed.
     :param full_path: If True, the files with full path are generated.
     :type path: str
     :type extension: str
@@ -173,8 +120,8 @@ def execute_command(cmd):
 
     :param cmd: Command to execute.
     :type cmd: str
-    :returns: Return code of the executed command.
-    :rtype: int
+    :returns: Return code of the executed command, stdout and stderr.
+    :rtype: tuple(int, str, str)
     """
 
     env = environ.copy()
@@ -187,8 +134,10 @@ def execute_command(cmd):
 
     stdout, stderr = proc.communicate()
 
-    logging.info(stdout)
-    logging.info(stderr)
+    if stdout:
+        logging.info(stdout)
+    if stderr:
+        logging.info(stderr)
 
     if proc.returncode != 0:
         logging.error("    Command execution failed.")
@@ -239,10 +188,7 @@ def archive_input_data(spec):
 
     logging.info("    Archiving the input data files ...")
 
-    if spec.is_debug:
-        extension = spec.debug["input-format"]
-    else:
-        extension = spec.input["file-format"]
+    extension = spec.input["file-format"]
     data_files = get_files(spec.environment["paths"]["DIR[WORKING,DATA]"],
                            extension=extension)
     dst = spec.environment["paths"]["DIR[STATIC,ARCH]"]
@@ -253,11 +199,119 @@ def archive_input_data(spec):
             makedirs(dst)
 
         for data_file in data_files:
-            logging.info("      Copying the file: {0} ...".format(data_file))
-            copy(data_file, dst)
+            logging.info("      Moving the file: {0} ...".format(data_file))
+            move(data_file, dst)
 
     except (Error, OSError) as err:
         raise PresentationError("Not possible to archive the input data.",
                                 str(err))
 
     logging.info("    Done.")
+
+
+def classify_anomalies(data):
+    """Process the data and return anomalies and trending values.
+
+    Gather data into groups with average as trend value.
+    Decorate values within groups to be normal,
+    the first value of changed average as a regression, or a progression.
+
+    :param data: Full data set with unavailable samples replaced by nan.
+    :type data: OrderedDict
+    :returns: Classification and trend values
+    :rtype: 2-tuple, list of strings and list of floats
+    """
+    # Nan mean something went wrong.
+    # Use 0.0 to cause that being reported as a severe regression.
+    bare_data = [0.0 if np.isnan(sample.avg) else sample
+                 for _, sample in data.iteritems()]
+    # TODO: Put analogous iterator into jumpavg library.
+    groups = BitCountingClassifier().classify(bare_data)
+    groups.reverse()  # Just to use .pop() for FIFO.
+    classification = []
+    avgs = []
+    active_group = None
+    values_left = 0
+    avg = 0.0
+    for _, sample in data.iteritems():
+        if np.isnan(sample.avg):
+            classification.append("outlier")
+            avgs.append(sample.avg)
+            continue
+        if values_left < 1 or active_group is None:
+            values_left = 0
+            while values_left < 1:  # Ignore empty groups (should not happen).
+                active_group = groups.pop()
+                values_left = len(active_group.values)
+            avg = active_group.metadata.avg
+            classification.append(active_group.metadata.classification)
+            avgs.append(avg)
+            values_left -= 1
+            continue
+        classification.append("normal")
+        avgs.append(avg)
+        values_left -= 1
+    return classification, avgs
+
+
+def convert_csv_to_pretty_txt(csv_file, txt_file):
+    """Convert the given csv table to pretty text table.
+
+    :param csv_file: The path to the input csv file.
+    :param txt_file: The path to the output pretty text file.
+    :type csv_file: str
+    :type txt_file: str
+    """
+
+    txt_table = None
+    with open(csv_file, 'rb') as csv_file:
+        csv_content = csv.reader(csv_file, delimiter=',', quotechar='"')
+        for row in csv_content:
+            if txt_table is None:
+                txt_table = prettytable.PrettyTable(row)
+            else:
+                txt_table.add_row(row)
+        txt_table.align["Test case"] = "l"
+    if txt_table:
+        with open(txt_file, "w") as txt_file:
+            txt_file.write(str(txt_table))
+
+
+class Worker(multiprocessing.Process):
+    """Worker class used to process tasks in separate parallel processes.
+    """
+
+    def __init__(self, work_queue, data_queue, func):
+        """Initialization.
+
+        :param work_queue: Queue with items to process.
+        :param data_queue: Shared memory between processes. Queue which keeps
+            the result data. This data is then read by the main process and used
+            in further processing.
+        :param func: Function which is executed by the worker.
+        :type work_queue: multiprocessing.JoinableQueue
+        :type data_queue: multiprocessing.Manager().Queue()
+        :type func: Callable object
+        """
+        super(Worker, self).__init__()
+        self._work_queue = work_queue
+        self._data_queue = data_queue
+        self._func = func
+
+    def run(self):
+        """Method representing the process's activity.
+        """
+
+        while True:
+            try:
+                self.process(self._work_queue.get())
+            finally:
+                self._work_queue.task_done()
+
+    def process(self, item_to_process):
+        """Method executed by the runner.
+
+        :param item_to_process: Data to be processed by the function.
+        :type item_to_process: tuple
+        """
+        self._func(self.pid, self._data_queue, *item_to_process)