X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Ftools%2Fpresentation%2Futils.py;h=ba329321873d9753c3709d536143283e1919b62b;hp=a15742a21f55f3798182e0609271779ad82d0963;hb=2d001ed910d3835848fccb7bb96a98a5270698fe;hpb=52f64f232293130904d54a62609eaffc1b145608 diff --git a/resources/tools/presentation/utils.py b/resources/tools/presentation/utils.py index a15742a21f..ba32932187 100644 --- a/resources/tools/presentation/utils.py +++ b/resources/tools/presentation/utils.py @@ -1,4 +1,4 @@ -# Copyright (c) 2017 Cisco and/or its affiliates. +# Copyright (c) 2018 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -14,14 +14,17 @@ """General purpose utilities. """ +import multiprocessing import subprocess import numpy as np import pandas as pd import logging +import csv +import prettytable from os import walk, makedirs, environ from os.path import join, isdir -from shutil import copy, Error +from shutil import move, Error from math import sqrt from errors import PresentationError @@ -87,21 +90,11 @@ def remove_outliers(input_list, outlier_const=1.5, window=14): iqr = (upper_quartile - lower_quartile) * outlier_const quartile_set = (lower_quartile - iqr, upper_quartile + iqr) result_lst = list() - for y in data.tolist(): + for y in input_list: if quartile_set[0] <= y <= quartile_set[1]: result_lst.append(y) return result_lst - # input_series = pd.Series() - # for index, value in enumerate(input_list): - # item_pd = pd.Series([value, ], index=[index, ]) - # input_series.append(item_pd) - # output_series, _ = split_outliers(input_series, outlier_const=outlier_const, - # window=window) - # output_list = [y for x, y in output_series.items() if not np.isnan(y)] - # - # return output_list - def split_outliers(input_series, outlier_const=1.5, window=14): """Go through the input data and generate two pandas series: @@ -133,9 +126,9 @@ def split_outliers(input_series, outlier_const=1.5, window=14): q1 = np.percentile(y_rolling_array, 25) q3 = np.percentile(y_rolling_array, 75) iqr = (q3 - q1) * outlier_const - low, high = q1 - iqr, q3 + iqr + low = q1 - iqr item_pd = pd.Series([item_y, ], index=[item_x, ]) - if low <= item_y <= high: + if low <= item_y: trimmed_data = trimmed_data.append(item_pd) else: outliers = outliers.append(item_pd) @@ -150,7 +143,7 @@ def get_files(path, extension=None, full_path=True): :param path: Path to files. :param extension: Extension of files to process. If it is the empty string, - all files will be processed. + all files will be processed. :param full_path: If True, the files with full path are generated. :type path: str :type extension: str @@ -208,8 +201,10 @@ def execute_command(cmd): stdout, stderr = proc.communicate() - logging.info(stdout) - logging.info(stderr) + if stdout: + logging.info(stdout) + if stderr: + logging.info(stderr) if proc.returncode != 0: logging.error(" Command execution failed.") @@ -260,10 +255,7 @@ def archive_input_data(spec): logging.info(" Archiving the input data files ...") - if spec.is_debug: - extension = spec.debug["input-format"] - else: - extension = spec.input["file-format"] + extension = spec.input["file-format"] data_files = get_files(spec.environment["paths"]["DIR[WORKING,DATA]"], extension=extension) dst = spec.environment["paths"]["DIR[STATIC,ARCH]"] @@ -274,11 +266,116 @@ def archive_input_data(spec): makedirs(dst) for data_file in data_files: - logging.info(" Copying the file: {0} ...".format(data_file)) - copy(data_file, dst) + logging.info(" Moving the file: {0} ...".format(data_file)) + move(data_file, dst) except (Error, OSError) as err: raise PresentationError("Not possible to archive the input data.", str(err)) logging.info(" Done.") + + +def classify_anomalies(data, window): + """Evaluates if the sample value is an outlier, regression, normal or + progression compared to the previous data within the window. + We use the intervals defined as: + - regress: less than trimmed moving median - 3 * stdev + - normal: between trimmed moving median - 3 * stdev and median + 3 * stdev + - progress: more than trimmed moving median + 3 * stdev + where stdev is trimmed moving standard deviation. + + :param data: Full data set with the outliers replaced by nan. + :param window: Window size used to calculate moving average and moving + stdev. + :type data: pandas.Series + :type window: int + :returns: Evaluated results. + :rtype: list + """ + + if data.size < 3: + return None + + win_size = data.size if data.size < window else window + tmm = data.rolling(window=win_size, min_periods=2).median() + tmstd = data.rolling(window=win_size, min_periods=2).std() + + classification = ["normal", ] + first = True + for build, value in data.iteritems(): + if first: + first = False + continue + if np.isnan(value) or np.isnan(tmm[build]) or np.isnan(tmstd[build]): + classification.append("outlier") + elif value < (tmm[build] - 3 * tmstd[build]): + classification.append("regression") + elif value > (tmm[build] + 3 * tmstd[build]): + classification.append("progression") + else: + classification.append("normal") + return classification + + +def convert_csv_to_pretty_txt(csv_file, txt_file): + """Convert the given csv table to pretty text table. + + :param csv_file: The path to the input csv file. + :param txt_file: The path to the output pretty text file. + :type csv_file: str + :type txt_file: str + """ + + txt_table = None + with open(csv_file, 'rb') as csv_file: + csv_content = csv.reader(csv_file, delimiter=',', quotechar='"') + for row in csv_content: + if txt_table is None: + txt_table = prettytable.PrettyTable(row) + else: + txt_table.add_row(row) + txt_table.align["Test case"] = "l" + if txt_table: + with open(txt_file, "w") as txt_file: + txt_file.write(str(txt_table)) + + +class Worker(multiprocessing.Process): + """Worker class used to process tasks in separate parallel processes. + """ + + def __init__(self, work_queue, data_queue, func): + """Initialization. + + :param work_queue: Queue with items to process. + :param data_queue: Shared memory between processes. Queue which keeps + the result data. This data is then read by the main process and used + in further processing. + :param func: Function which is executed by the worker. + :type work_queue: multiprocessing.JoinableQueue + :type data_queue: multiprocessing.Manager().Queue() + :type func: Callable object + """ + super(Worker, self).__init__() + self._work_queue = work_queue + self._data_queue = data_queue + self._func = func + + def run(self): + """Method representing the process's activity. + """ + + while True: + try: + self.process(self._work_queue.get()) + finally: + self._work_queue.task_done() + + def process(self, item_to_process): + """Method executed by the runner. + + :param item_to_process: Data to be processed by the function. + :type item_to_process: tuple + """ + self._func(self.pid, self._data_queue, *item_to_process)