CSIT-942: RCA - Option 1: Analysing Archived VPP Results
[csit.git] / resources / tools / presentation / utils.py
index 2fbf70c..0a9d985 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2017 Cisco and/or its affiliates.
+# Copyright (c) 2018 Cisco and/or its affiliates.
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at:
@@ -14,6 +14,7 @@
 """General purpose utilities.
 """
 
+import multiprocessing
 import subprocess
 import numpy as np
 import pandas as pd
@@ -21,7 +22,7 @@ import logging
 
 from os import walk, makedirs, environ
 from os.path import join, isdir
-from shutil import copy, Error
+from shutil import move, Error
 from math import sqrt
 
 from errors import PresentationError
@@ -81,15 +82,16 @@ def remove_outliers(input_list, outlier_const=1.5, window=14):
     :rtype: list of floats
     """
 
-    input_series = pd.Series()
-    for index, value in enumerate(input_list):
-        item_pd = pd.Series([value, ], index=[index, ])
-        input_series.append(item_pd)
-    output_series, _ = split_outliers(input_series, outlier_const=outlier_const,
-                                      window=window)
-    output_list = [y for x, y in output_series.items() if not np.isnan(y)]
-
-    return output_list
+    data = np.array(input_list)
+    upper_quartile = np.percentile(data, 75)
+    lower_quartile = np.percentile(data, 25)
+    iqr = (upper_quartile - lower_quartile) * outlier_const
+    quartile_set = (lower_quartile - iqr, upper_quartile + iqr)
+    result_lst = list()
+    for y in input_list:
+        if quartile_set[0] <= y <= quartile_set[1]:
+            result_lst.append(y)
+    return result_lst
 
 
 def split_outliers(input_series, outlier_const=1.5, window=14):
@@ -122,9 +124,9 @@ def split_outliers(input_series, outlier_const=1.5, window=14):
         q1 = np.percentile(y_rolling_array, 25)
         q3 = np.percentile(y_rolling_array, 75)
         iqr = (q3 - q1) * outlier_const
-        low, high = q1 - iqr, q3 + iqr
+        low = q1 - iqr
         item_pd = pd.Series([item_y, ], index=[item_x, ])
-        if low <= item_y <= high:
+        if low <= item_y:
             trimmed_data = trimmed_data.append(item_pd)
         else:
             outliers = outliers.append(item_pd)
@@ -139,7 +141,7 @@ def get_files(path, extension=None, full_path=True):
 
     :param path: Path to files.
     :param extension: Extension of files to process. If it is the empty string,
-    all files will be processed.
+        all files will be processed.
     :param full_path: If True, the files with full path are generated.
     :type path: str
     :type extension: str
@@ -197,8 +199,10 @@ def execute_command(cmd):
 
     stdout, stderr = proc.communicate()
 
-    logging.info(stdout)
-    logging.info(stderr)
+    if stdout:
+        logging.info(stdout)
+    if stderr:
+        logging.info(stderr)
 
     if proc.returncode != 0:
         logging.error("    Command execution failed.")
@@ -249,10 +253,7 @@ def archive_input_data(spec):
 
     logging.info("    Archiving the input data files ...")
 
-    if spec.is_debug:
-        extension = spec.debug["input-format"]
-    else:
-        extension = spec.input["file-format"]
+    extension = spec.input["file-format"]
     data_files = get_files(spec.environment["paths"]["DIR[WORKING,DATA]"],
                            extension=extension)
     dst = spec.environment["paths"]["DIR[STATIC,ARCH]"]
@@ -263,11 +264,93 @@ def archive_input_data(spec):
             makedirs(dst)
 
         for data_file in data_files:
-            logging.info("      Copying the file: {0} ...".format(data_file))
-            copy(data_file, dst)
+            logging.info("      Moving the file: {0} ...".format(data_file))
+            move(data_file, dst)
 
     except (Error, OSError) as err:
         raise PresentationError("Not possible to archive the input data.",
                                 str(err))
 
     logging.info("    Done.")
+
+
+def classify_anomalies(data, window):
+    """Evaluates if the sample value is an outlier, regression, normal or
+    progression compared to the previous data within the window.
+    We use the intervals defined as:
+    - regress: less than trimmed moving median - 3 * stdev
+    - normal: between trimmed moving median - 3 * stdev and median + 3 * stdev
+    - progress: more than trimmed moving median + 3 * stdev
+    where stdev is trimmed moving standard deviation.
+
+    :param data: Full data set with the outliers replaced by nan.
+    :param window: Window size used to calculate moving average and moving
+        stdev.
+    :type data: pandas.Series
+    :type window: int
+    :returns: Evaluated results.
+    :rtype: list
+    """
+
+    if data.size < 3:
+        return None
+
+    win_size = data.size if data.size < window else window
+    tmm = data.rolling(window=win_size, min_periods=2).median()
+    tmstd = data.rolling(window=win_size, min_periods=2).std()
+
+    classification = ["normal", ]
+    first = True
+    for build, value in data.iteritems():
+        if first:
+            first = False
+            continue
+        if np.isnan(value) or np.isnan(tmm[build]) or np.isnan(tmstd[build]):
+            classification.append("outlier")
+        elif value < (tmm[build] - 3 * tmstd[build]):
+            classification.append("regression")
+        elif value > (tmm[build] + 3 * tmstd[build]):
+            classification.append("progression")
+        else:
+            classification.append("normal")
+    return classification
+
+
+class Worker(multiprocessing.Process):
+    """Worker class used to process tasks in separate parallel processes.
+    """
+
+    def __init__(self, work_queue, data_queue, func):
+        """Initialization.
+
+        :param work_queue: Queue with items to process.
+        :param data_queue: Shared memory between processes. Queue which keeps
+            the result data. This data is then read by the main process and used
+            in further processing.
+        :param func: Function which is executed by the worker.
+        :type work_queue: multiprocessing.JoinableQueue
+        :type data_queue: multiprocessing.Manager().Queue()
+        :type func: Callable object
+        """
+        super(Worker, self).__init__()
+        self._work_queue = work_queue
+        self._data_queue = data_queue
+        self._func = func
+
+    def run(self):
+        """Method representing the process's activity.
+        """
+
+        while True:
+            try:
+                self.process(self._work_queue.get())
+            finally:
+                self._work_queue.task_done()
+
+    def process(self, item_to_process):
+        """Method executed by the runner.
+
+        :param item_to_process: Data to be processed by the function.
+        :type item_to_process: tuple
+        """
+        self._func(self.pid, self._data_queue, *item_to_process)