X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Ftools%2Fpresentation%2Futils.py;h=51bb1d0305ecf6111d10feea95a86de3312bd6db;hp=95faffdd665ecc6483c1e0b8c4aabeb4df8bab1e;hb=5ad9b364cbd45a0b25d73412b9777ac14df92b0a;hpb=2093328406eb64ae825a06967632e901ae0ba6db diff --git a/resources/tools/presentation/utils.py b/resources/tools/presentation/utils.py index 95faffdd66..51bb1d0305 100644 --- a/resources/tools/presentation/utils.py +++ b/resources/tools/presentation/utils.py @@ -1,4 +1,4 @@ -# Copyright (c) 2017 Cisco and/or its affiliates. +# Copyright (c) 2018 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -14,10 +14,12 @@ """General purpose utilities. """ +import multiprocessing import subprocess import numpy as np -import pandas as pd import logging +import csv +import prettytable from os import walk, makedirs, environ from os.path import join, isdir @@ -25,6 +27,7 @@ from shutil import move, Error from math import sqrt from errors import PresentationError +from jumpavg.BitCountingClassifier import BitCountingClassifier def mean(items): @@ -68,73 +71,6 @@ def relative_change(nr1, nr2): return float(((nr2 - nr1) / nr1) * 100) -def remove_outliers(input_list, outlier_const=1.5, window=14): - """Return list with outliers removed, using split_outliers. - - :param input_list: Data from which the outliers will be removed. - :param outlier_const: Outlier constant. - :param window: How many preceding values to take into account. - :type input_list: list of floats - :type outlier_const: float - :type window: int - :returns: The input list without outliers. - :rtype: list of floats - """ - - data = np.array(input_list) - upper_quartile = np.percentile(data, 75) - lower_quartile = np.percentile(data, 25) - iqr = (upper_quartile - lower_quartile) * outlier_const - quartile_set = (lower_quartile - iqr, upper_quartile + iqr) - result_lst = list() - for y in input_list: - if quartile_set[0] <= y <= quartile_set[1]: - result_lst.append(y) - return result_lst - - -def split_outliers(input_series, outlier_const=1.5, window=14): - """Go through the input data and generate two pandas series: - - input data with outliers replaced by NAN - - outliers. - The function uses IQR to detect outliers. - - :param input_series: Data to be examined for outliers. - :param outlier_const: Outlier constant. - :param window: How many preceding values to take into account. - :type input_series: pandas.Series - :type outlier_const: float - :type window: int - :returns: Input data with NAN outliers and Outliers. - :rtype: (pandas.Series, pandas.Series) - """ - - list_data = list(input_series.items()) - head_size = min(window, len(list_data)) - head_list = list_data[:head_size] - trimmed_data = pd.Series() - outliers = pd.Series() - for item_x, item_y in head_list: - item_pd = pd.Series([item_y, ], index=[item_x, ]) - trimmed_data = trimmed_data.append(item_pd) - for index, (item_x, item_y) in list(enumerate(list_data))[head_size:]: - y_rolling_list = [y for (x, y) in list_data[index - head_size:index]] - y_rolling_array = np.array(y_rolling_list) - q1 = np.percentile(y_rolling_array, 25) - q3 = np.percentile(y_rolling_array, 75) - iqr = (q3 - q1) * outlier_const - low = q1 - iqr - item_pd = pd.Series([item_y, ], index=[item_x, ]) - if low <= item_y: - trimmed_data = trimmed_data.append(item_pd) - else: - outliers = outliers.append(item_pd) - nan_pd = pd.Series([np.nan, ], index=[item_x, ]) - trimmed_data = trimmed_data.append(nan_pd) - - return trimmed_data, outliers - - def get_files(path, extension=None, full_path=True): """Generates the list of files to process. @@ -184,8 +120,8 @@ def execute_command(cmd): :param cmd: Command to execute. :type cmd: str - :returns: Return code of the executed command. - :rtype: int + :returns: Return code of the executed command, stdout and stderr. + :rtype: tuple(int, str, str) """ env = environ.copy() @@ -198,8 +134,10 @@ def execute_command(cmd): stdout, stderr = proc.communicate() - logging.info(stdout) - logging.info(stderr) + if stdout: + logging.info(stdout) + if stderr: + logging.info(stderr) if proc.returncode != 0: logging.error(" Command execution failed.") @@ -250,10 +188,7 @@ def archive_input_data(spec): logging.info(" Archiving the input data files ...") - if spec.is_debug: - extension = spec.debug["input-format"] - else: - extension = spec.input["file-format"] + extension = spec.input["file-format"] data_files = get_files(spec.environment["paths"]["DIR[WORKING,DATA]"], extension=extension) dst = spec.environment["paths"]["DIR[STATIC,ARCH]"] @@ -272,3 +207,111 @@ def archive_input_data(spec): str(err)) logging.info(" Done.") + + +def classify_anomalies(data): + """Process the data and return anomalies and trending values. + + Gather data into groups with average as trend value. + Decorate values within groups to be normal, + the first value of changed average as a regression, or a progression. + + :param data: Full data set with unavailable samples replaced by nan. + :type data: OrderedDict + :returns: Classification and trend values + :rtype: 2-tuple, list of strings and list of floats + """ + # Nan mean something went wrong. + # Use 0.0 to cause that being reported as a severe regression. + bare_data = [0.0 if np.isnan(sample.avg) else sample + for _, sample in data.iteritems()] + # TODO: Put analogous iterator into jumpavg library. + groups = BitCountingClassifier().classify(bare_data) + groups.reverse() # Just to use .pop() for FIFO. + classification = [] + avgs = [] + active_group = None + values_left = 0 + avg = 0.0 + for _, sample in data.iteritems(): + if np.isnan(sample.avg): + classification.append("outlier") + avgs.append(sample.avg) + continue + if values_left < 1 or active_group is None: + values_left = 0 + while values_left < 1: # Ignore empty groups (should not happen). + active_group = groups.pop() + values_left = len(active_group.values) + avg = active_group.metadata.avg + classification.append(active_group.metadata.classification) + avgs.append(avg) + values_left -= 1 + continue + classification.append("normal") + avgs.append(avg) + values_left -= 1 + return classification, avgs + + +def convert_csv_to_pretty_txt(csv_file, txt_file): + """Convert the given csv table to pretty text table. + + :param csv_file: The path to the input csv file. + :param txt_file: The path to the output pretty text file. + :type csv_file: str + :type txt_file: str + """ + + txt_table = None + with open(csv_file, 'rb') as csv_file: + csv_content = csv.reader(csv_file, delimiter=',', quotechar='"') + for row in csv_content: + if txt_table is None: + txt_table = prettytable.PrettyTable(row) + else: + txt_table.add_row(row) + txt_table.align["Test case"] = "l" + if txt_table: + with open(txt_file, "w") as txt_file: + txt_file.write(str(txt_table)) + + +class Worker(multiprocessing.Process): + """Worker class used to process tasks in separate parallel processes. + """ + + def __init__(self, work_queue, data_queue, func): + """Initialization. + + :param work_queue: Queue with items to process. + :param data_queue: Shared memory between processes. Queue which keeps + the result data. This data is then read by the main process and used + in further processing. + :param func: Function which is executed by the worker. + :type work_queue: multiprocessing.JoinableQueue + :type data_queue: multiprocessing.Manager().Queue() + :type func: Callable object + """ + super(Worker, self).__init__() + self._work_queue = work_queue + self._data_queue = data_queue + self._func = func + + def run(self): + """Method representing the process's activity. + """ + + while True: + try: + self.process(self._work_queue.get()) + finally: + self._work_queue.task_done() + + def process(self, item_to_process): + """Method executed by the runner. + + :param item_to_process: Data to be processed by the function. + :type item_to_process: tuple + """ + self._func(self.pid, self._data_queue, *item_to_process)