-# Copyright (c) 2017 Cisco and/or its affiliates.
+# Copyright (c) 2018 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
"""General purpose utilities.
"""
+import multiprocessing
import subprocess
+import math
import numpy as np
-import pandas as pd
import logging
+import csv
+import prettytable
from os import walk, makedirs, environ
from os.path import join, isdir
-from shutil import copy, Error
-from math import sqrt
+from shutil import move, Error
+from datetime import datetime
+from pandas import Series
from errors import PresentationError
+from jumpavg.BitCountingClassifier import BitCountingClassifier
def mean(items):
:returns: Stdev.
:rtype: float
"""
-
- avg = mean(items)
- variance = [(x - avg) ** 2 for x in items]
- stddev = sqrt(mean(variance))
- return stddev
+ return Series.std(Series(items))
def relative_change(nr1, nr2):
return float(((nr2 - nr1) / nr1) * 100)
-def remove_outliers(input_data, outlier_const):
- """
+def relative_change_stdev(mean1, mean2, std1, std2):
+ """Compute relative standard deviation of change of two values.
- :param input_data: Data from which the outliers will be removed.
- :param outlier_const: Outlier constant.
- :type input_data: list
- :type outlier_const: float
- :returns: The input list without outliers.
- :rtype: list
- """
+ The "1" values are the base for comparison.
+ Results are returned as percentage (and percentual points for stdev).
+ Linearized theory is used, so results are wrong for relatively large stdev.
- data = np.array(input_data)
- upper_quartile = np.percentile(data, 75)
- lower_quartile = np.percentile(data, 25)
- iqr = (upper_quartile - lower_quartile) * outlier_const
- quartile_set = (lower_quartile - iqr, upper_quartile + iqr)
- result_lst = list()
- for y in data.tolist():
- if quartile_set[0] <= y <= quartile_set[1]:
- result_lst.append(y)
- return result_lst
-
-
-def find_outliers(input_data, outlier_const=1.5):
- """Go through the input data and generate two pandas series:
- - input data without outliers
- - outliers.
- The function uses IQR to detect outliers.
-
- :param input_data: Data to be examined for outliers.
- :param outlier_const: Outlier constant.
- :type input_data: pandas.Series
- :type outlier_const: float
- :returns: Tuple: input data with outliers removed; Outliers.
- :rtype: tuple (trimmed_data, outliers)
+ :param mean1: Mean of the first number.
+ :param mean2: Mean of the second number.
+ :param std1: Standard deviation estimate of the first number.
+ :param std2: Standard deviation estimate of the second number.
+ :type mean1: float
+ :type mean2: float
+ :type std1: float
+ :type std2: float
+ :returns: Relative change and its stdev.
+ :rtype: float
"""
-
- upper_quartile = input_data.quantile(q=0.75)
- lower_quartile = input_data.quantile(q=0.25)
- iqr = (upper_quartile - lower_quartile) * outlier_const
- low = lower_quartile - iqr
- high = upper_quartile + iqr
- trimmed_data = pd.Series()
- outliers = pd.Series()
- for item in input_data.items():
- item_pd = pd.Series([item[1], ], index=[item[0], ])
- if low <= item[1] <= high:
- trimmed_data = trimmed_data.append(item_pd)
- else:
- trimmed_data = trimmed_data.append(pd.Series([np.nan, ],
- index=[item[0], ]))
- outliers = outliers.append(item_pd)
-
- return trimmed_data, outliers
+ mean1, mean2 = float(mean1), float(mean2)
+ quotient = mean2 / mean1
+ first = std1 / mean1
+ second = std2 / mean2
+ std = quotient * math.sqrt(first * first + second * second)
+ return (quotient - 1) * 100, std * 100
def get_files(path, extension=None, full_path=True):
:param path: Path to files.
:param extension: Extension of files to process. If it is the empty string,
- all files will be processed.
+ all files will be processed.
:param full_path: If True, the files with full path are generated.
:type path: str
:type extension: str
:param cmd: Command to execute.
:type cmd: str
- :returns: Return code of the executed command.
- :rtype: int
+ :returns: Return code of the executed command, stdout and stderr.
+ :rtype: tuple(int, str, str)
"""
env = environ.copy()
stdout, stderr = proc.communicate()
- logging.info(stdout)
- logging.info(stderr)
+ if stdout:
+ logging.info(stdout)
+ if stderr:
+ logging.info(stderr)
if proc.returncode != 0:
logging.error(" Command execution failed.")
return execute_command(cmd)
+def get_build_timestamp(jenkins_url, job_name, build_nr):
+ """Get the timestamp of the build of the given job.
+
+ :param jenkins_url: Jenkins URL.
+ :param job_name: Job name.
+ :param build_nr: Build number.
+ :type jenkins_url: str
+ :type job_name: str
+ :type build_nr: int
+ :returns: The timestamp.
+ :rtype: datetime.datetime
+ """
+
+ url = "{jenkins_url}/{job_name}/{build_nr}".format(jenkins_url=jenkins_url,
+ job_name=job_name,
+ build_nr=build_nr)
+ cmd = "wget -qO- {url}".format(url=url)
+
+ timestamp = execute_command(cmd)
+
+ return datetime.fromtimestamp(timestamp/1000)
+
+
def archive_input_data(spec):
"""Archive the report.
logging.info(" Archiving the input data files ...")
- if spec.is_debug:
- extension = spec.debug["input-format"]
- else:
- extension = spec.input["file-format"]
- data_files = get_files(spec.environment["paths"]["DIR[WORKING,DATA]"],
- extension=extension)
+ extension = spec.input["arch-file-format"]
+ data_files = list()
+ for ext in extension:
+ data_files.extend(get_files(
+ spec.environment["paths"]["DIR[WORKING,DATA]"], extension=ext))
dst = spec.environment["paths"]["DIR[STATIC,ARCH]"]
logging.info(" Destination: {0}".format(dst))
makedirs(dst)
for data_file in data_files:
- logging.info(" Copying the file: {0} ...".format(data_file))
- copy(data_file, dst)
+ logging.info(" Moving the file: {0} ...".format(data_file))
+ move(data_file, dst)
except (Error, OSError) as err:
raise PresentationError("Not possible to archive the input data.",
str(err))
logging.info(" Done.")
+
+
+def classify_anomalies(data):
+ """Process the data and return anomalies and trending values.
+
+ Gather data into groups with average as trend value.
+ Decorate values within groups to be normal,
+ the first value of changed average as a regression, or a progression.
+
+ :param data: Full data set with unavailable samples replaced by nan.
+ :type data: OrderedDict
+ :returns: Classification and trend values
+ :rtype: 2-tuple, list of strings and list of floats
+ """
+ # Nan mean something went wrong.
+ # Use 0.0 to cause that being reported as a severe regression.
+ bare_data = [0.0 if np.isnan(sample.avg) else sample
+ for _, sample in data.iteritems()]
+ # TODO: Put analogous iterator into jumpavg library.
+ groups = BitCountingClassifier().classify(bare_data)
+ groups.reverse() # Just to use .pop() for FIFO.
+ classification = []
+ avgs = []
+ active_group = None
+ values_left = 0
+ avg = 0.0
+ for _, sample in data.iteritems():
+ if np.isnan(sample.avg):
+ classification.append("outlier")
+ avgs.append(sample.avg)
+ continue
+ if values_left < 1 or active_group is None:
+ values_left = 0
+ while values_left < 1: # Ignore empty groups (should not happen).
+ active_group = groups.pop()
+ values_left = len(active_group.values)
+ avg = active_group.metadata.avg
+ classification.append(active_group.metadata.classification)
+ avgs.append(avg)
+ values_left -= 1
+ continue
+ classification.append("normal")
+ avgs.append(avg)
+ values_left -= 1
+ return classification, avgs
+
+
+def convert_csv_to_pretty_txt(csv_file, txt_file):
+ """Convert the given csv table to pretty text table.
+
+ :param csv_file: The path to the input csv file.
+ :param txt_file: The path to the output pretty text file.
+ :type csv_file: str
+ :type txt_file: str
+ """
+
+ txt_table = None
+ with open(csv_file, 'rb') as csv_file:
+ csv_content = csv.reader(csv_file, delimiter=',', quotechar='"')
+ for row in csv_content:
+ if txt_table is None:
+ txt_table = prettytable.PrettyTable(row)
+ else:
+ txt_table.add_row(row)
+ txt_table.align["Test case"] = "l"
+ if txt_table:
+ with open(txt_file, "w") as txt_file:
+ txt_file.write(str(txt_table))
+
+
+class Worker(multiprocessing.Process):
+ """Worker class used to process tasks in separate parallel processes.
+ """
+
+ def __init__(self, work_queue, data_queue, func):
+ """Initialization.
+
+ :param work_queue: Queue with items to process.
+ :param data_queue: Shared memory between processes. Queue which keeps
+ the result data. This data is then read by the main process and used
+ in further processing.
+ :param func: Function which is executed by the worker.
+ :type work_queue: multiprocessing.JoinableQueue
+ :type data_queue: multiprocessing.Manager().Queue()
+ :type func: Callable object
+ """
+ super(Worker, self).__init__()
+ self._work_queue = work_queue
+ self._data_queue = data_queue
+ self._func = func
+
+ def run(self):
+ """Method representing the process's activity.
+ """
+
+ while True:
+ try:
+ self.process(self._work_queue.get())
+ finally:
+ self._work_queue.task_done()
+
+ def process(self, item_to_process):
+ """Method executed by the runner.
+
+ :param item_to_process: Data to be processed by the function.
+ :type item_to_process: tuple
+ """
+ self._func(self.pid, self._data_queue, *item_to_process)