+
+
+def classify_anomalies(data, window):
+ """Evaluates if the sample value is an outlier, regression, normal or
+ progression compared to the previous data within the window.
+ We use the intervals defined as:
+ - regress: less than trimmed moving median - 3 * stdev
+ - normal: between trimmed moving median - 3 * stdev and median + 3 * stdev
+ - progress: more than trimmed moving median + 3 * stdev
+ where stdev is trimmed moving standard deviation.
+
+ :param data: Full data set with the outliers replaced by nan.
+ :param window: Window size used to calculate moving average and moving
+ stdev.
+ :type data: pandas.Series
+ :type window: int
+ :returns: Evaluated results.
+ :rtype: list
+ """
+
+ if data.size < 3:
+ return None
+
+ win_size = data.size if data.size < window else window
+ tmm = data.rolling(window=win_size, min_periods=2).median()
+ tmstd = data.rolling(window=win_size, min_periods=2).std()
+
+ classification = ["normal", ]
+ first = True
+ for build, value in data.iteritems():
+ if first:
+ first = False
+ continue
+ if np.isnan(value) or np.isnan(tmm[build]) or np.isnan(tmstd[build]):
+ classification.append("outlier")
+ elif value < (tmm[build] - 3 * tmstd[build]):
+ classification.append("regression")
+ elif value > (tmm[build] + 3 * tmstd[build]):
+ classification.append("progression")
+ else:
+ classification.append("normal")
+ return classification
+
+
+class Worker(multiprocessing.Process):
+ """Worker class used to process tasks in separate parallel processes.
+ """
+
+ def __init__(self, work_queue, data_queue, func):
+ """Initialization.
+
+ :param work_queue: Queue with items to process.
+ :param data_queue: Shared memory between processes. Queue which keeps
+ the result data. This data is then read by the main process and used
+ in further processing.
+ :param func: Function which is executed by the worker.
+ :type work_queue: multiprocessing.JoinableQueue
+ :type data_queue: multiprocessing.Manager().Queue()
+ :type func: Callable object
+ """
+ super(Worker, self).__init__()
+ self._work_queue = work_queue
+ self._data_queue = data_queue
+ self._func = func
+
+ def run(self):
+ """Method representing the process's activity.
+ """
+
+ while True:
+ try:
+ self.process(self._work_queue.get())
+ finally:
+ self._work_queue.task_done()
+
+ def process(self, item_to_process):
+ """Method executed by the runner.
+
+ :param item_to_process: Data to be processed by the function.
+ :type item_to_process: tuple
+ """
+ self._func(self.pid, self._data_queue, *item_to_process)