-# Copyright (c) 2017 Cisco and/or its affiliates.
+# Copyright (c) 2018 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
"""General purpose utilities.
"""
+import multiprocessing
import subprocess
import numpy as np
import pandas as pd
import logging
+import csv
+import prettytable
from os import walk, makedirs, environ
from os.path import join, isdir
-from shutil import copy, Error
+from shutil import move, Error
from math import sqrt
from errors import PresentationError
return float(((nr2 - nr1) / nr1) * 100)
+
def remove_outliers(input_list, outlier_const=1.5, window=14):
"""Return list with outliers removed, using split_outliers.
:rtype: list of floats
"""
- input_series = pd.Series()
- for index, value in enumerate(input_list):
- item_pd = pd.Series([value, ], index=[index, ])
- input_series.append(item_pd)
- output_series, _ = split_outliers(input_series, outlier_const=outlier_const,
- window=window)
- output_list = [y for x, y in output_series.items() if not np.isnan(y)]
-
- return output_list
+ data = np.array(input_list)
+ upper_quartile = np.percentile(data, 75)
+ lower_quartile = np.percentile(data, 25)
+ iqr = (upper_quartile - lower_quartile) * outlier_const
+ quartile_set = (lower_quartile - iqr, upper_quartile + iqr)
+ result_lst = list()
+ for y in input_list:
+ if quartile_set[0] <= y <= quartile_set[1]:
+ result_lst.append(y)
+ return result_lst
def split_outliers(input_series, outlier_const=1.5, window=14):
q1 = np.percentile(y_rolling_array, 25)
q3 = np.percentile(y_rolling_array, 75)
iqr = (q3 - q1) * outlier_const
- low, high = q1 - iqr, q3 + iqr
+ low = q1 - iqr
item_pd = pd.Series([item_y, ], index=[item_x, ])
- if low <= item_y <= high:
+ if low <= item_y:
trimmed_data = trimmed_data.append(item_pd)
else:
outliers = outliers.append(item_pd)
:param path: Path to files.
:param extension: Extension of files to process. If it is the empty string,
- all files will be processed.
+ all files will be processed.
:param full_path: If True, the files with full path are generated.
:type path: str
:type extension: str
stdout, stderr = proc.communicate()
- logging.info(stdout)
- logging.info(stderr)
+ if stdout:
+ logging.info(stdout)
+ if stderr:
+ logging.info(stderr)
if proc.returncode != 0:
logging.error(" Command execution failed.")
logging.info(" Archiving the input data files ...")
- if spec.is_debug:
- extension = spec.debug["input-format"]
- else:
- extension = spec.input["file-format"]
+ extension = spec.input["file-format"]
data_files = get_files(spec.environment["paths"]["DIR[WORKING,DATA]"],
extension=extension)
dst = spec.environment["paths"]["DIR[STATIC,ARCH]"]
makedirs(dst)
for data_file in data_files:
- logging.info(" Copying the file: {0} ...".format(data_file))
- copy(data_file, dst)
+ logging.info(" Moving the file: {0} ...".format(data_file))
+ move(data_file, dst)
except (Error, OSError) as err:
raise PresentationError("Not possible to archive the input data.",
str(err))
logging.info(" Done.")
+
+
+def classify_anomalies(data, window):
+ """Evaluates if the sample value is an outlier, regression, normal or
+ progression compared to the previous data within the window.
+ We use the intervals defined as:
+ - regress: less than trimmed moving median - 3 * stdev
+ - normal: between trimmed moving median - 3 * stdev and median + 3 * stdev
+ - progress: more than trimmed moving median + 3 * stdev
+ where stdev is trimmed moving standard deviation.
+
+ :param data: Full data set with the outliers replaced by nan.
+ :param window: Window size used to calculate moving average and moving
+ stdev.
+ :type data: pandas.Series
+ :type window: int
+ :returns: Evaluated results.
+ :rtype: list
+ """
+
+ if data.size < 3:
+ return None
+
+ win_size = data.size if data.size < window else window
+ tmm = data.rolling(window=win_size, min_periods=2).median()
+ tmstd = data.rolling(window=win_size, min_periods=2).std()
+
+ classification = ["normal", ]
+ first = True
+ for build, value in data.iteritems():
+ if first:
+ first = False
+ continue
+ if np.isnan(value) or np.isnan(tmm[build]) or np.isnan(tmstd[build]):
+ classification.append("outlier")
+ elif value < (tmm[build] - 3 * tmstd[build]):
+ classification.append("regression")
+ elif value > (tmm[build] + 3 * tmstd[build]):
+ classification.append("progression")
+ else:
+ classification.append("normal")
+ return classification
+
+
+def convert_csv_to_pretty_txt(csv_file, txt_file):
+ """Convert the given csv table to pretty text table.
+
+ :param csv_file: The path to the input csv file.
+ :param txt_file: The path to the output pretty text file.
+ :type csv_file: str
+ :type txt_file: str
+ """
+
+ txt_table = None
+ with open(csv_file, 'rb') as csv_file:
+ csv_content = csv.reader(csv_file, delimiter=',', quotechar='"')
+ for row in csv_content:
+ if txt_table is None:
+ txt_table = prettytable.PrettyTable(row)
+ else:
+ txt_table.add_row(row)
+ txt_table.align["Test case"] = "l"
+ if txt_table:
+ with open(txt_file, "w") as txt_file:
+ txt_file.write(str(txt_table))
+
+
+class Worker(multiprocessing.Process):
+ """Worker class used to process tasks in separate parallel processes.
+ """
+
+ def __init__(self, work_queue, data_queue, func):
+ """Initialization.
+
+ :param work_queue: Queue with items to process.
+ :param data_queue: Shared memory between processes. Queue which keeps
+ the result data. This data is then read by the main process and used
+ in further processing.
+ :param func: Function which is executed by the worker.
+ :type work_queue: multiprocessing.JoinableQueue
+ :type data_queue: multiprocessing.Manager().Queue()
+ :type func: Callable object
+ """
+ super(Worker, self).__init__()
+ self._work_queue = work_queue
+ self._data_queue = data_queue
+ self._func = func
+
+ def run(self):
+ """Method representing the process's activity.
+ """
+
+ while True:
+ try:
+ self.process(self._work_queue.get())
+ finally:
+ self._work_queue.task_done()
+
+ def process(self, item_to_process):
+ """Method executed by the runner.
+
+ :param item_to_process: Data to be processed by the function.
+ :type item_to_process: tuple
+ """
+ self._func(self.pid, self._data_queue, *item_to_process)