X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=resources%2Ftools%2Fpresentation%2Futils.py;h=0a9d985a88743fc5ab8af1b8484e991b8d61b061;hb=fcea4eb3f63218e77cc594ff4322dc40d72fd632;hp=ab86bafdd7c4574046010f02571576afc07f86f3;hpb=8c0d241b8b4b4289be8333b7fd1d5ce0726414d6;p=csit.git diff --git a/resources/tools/presentation/utils.py b/resources/tools/presentation/utils.py index ab86bafdd7..0a9d985a88 100644 --- a/resources/tools/presentation/utils.py +++ b/resources/tools/presentation/utils.py @@ -14,6 +14,7 @@ """General purpose utilities. """ +import multiprocessing import subprocess import numpy as np import pandas as pd @@ -271,3 +272,85 @@ def archive_input_data(spec): str(err)) logging.info(" Done.") + + +def classify_anomalies(data, window): + """Evaluates if the sample value is an outlier, regression, normal or + progression compared to the previous data within the window. + We use the intervals defined as: + - regress: less than trimmed moving median - 3 * stdev + - normal: between trimmed moving median - 3 * stdev and median + 3 * stdev + - progress: more than trimmed moving median + 3 * stdev + where stdev is trimmed moving standard deviation. + + :param data: Full data set with the outliers replaced by nan. + :param window: Window size used to calculate moving average and moving + stdev. + :type data: pandas.Series + :type window: int + :returns: Evaluated results. + :rtype: list + """ + + if data.size < 3: + return None + + win_size = data.size if data.size < window else window + tmm = data.rolling(window=win_size, min_periods=2).median() + tmstd = data.rolling(window=win_size, min_periods=2).std() + + classification = ["normal", ] + first = True + for build, value in data.iteritems(): + if first: + first = False + continue + if np.isnan(value) or np.isnan(tmm[build]) or np.isnan(tmstd[build]): + classification.append("outlier") + elif value < (tmm[build] - 3 * tmstd[build]): + classification.append("regression") + elif value > (tmm[build] + 3 * tmstd[build]): + classification.append("progression") + else: + classification.append("normal") + return classification + + +class Worker(multiprocessing.Process): + """Worker class used to process tasks in separate parallel processes. + """ + + def __init__(self, work_queue, data_queue, func): + """Initialization. + + :param work_queue: Queue with items to process. + :param data_queue: Shared memory between processes. Queue which keeps + the result data. This data is then read by the main process and used + in further processing. + :param func: Function which is executed by the worker. + :type work_queue: multiprocessing.JoinableQueue + :type data_queue: multiprocessing.Manager().Queue() + :type func: Callable object + """ + super(Worker, self).__init__() + self._work_queue = work_queue + self._data_queue = data_queue + self._func = func + + def run(self): + """Method representing the process's activity. + """ + + while True: + try: + self.process(self._work_queue.get()) + finally: + self._work_queue.task_done() + + def process(self, item_to_process): + """Method executed by the runner. + + :param item_to_process: Data to be processed by the function. + :type item_to_process: tuple + """ + self._func(self.pid, self._data_queue, *item_to_process)