-def _select_data(in_data, period, fill_missing=False, use_first=False):
- """Select the data from the full data set. The selection is done by picking
- the samples depending on the period: period = 1: All, period = 2: every
- second sample, period = 3: every third sample ...
-
- :param in_data: Full set of data.
- :param period: Sampling period.
- :param fill_missing: If the chosen sample is missing in the full set, its
- nearest neighbour is used.
- :param use_first: Use the first sample even though it is not chosen.
- :type in_data: OrderedDict
- :type period: int
- :type fill_missing: bool
- :type use_first: bool
- :returns: Reduced data.
- :rtype: OrderedDict
- """
-
- first_idx = min(in_data.keys())
- last_idx = max(in_data.keys())
-
- idx = last_idx
- data_dict = dict()
- if use_first:
- data_dict[first_idx] = in_data[first_idx]
- while idx >= first_idx:
- data = in_data.get(idx, None)
- if data is None:
- if fill_missing:
- threshold = int(round(idx - period / 2)) + 1 - period % 2
- idx_low = first_idx if threshold < first_idx else threshold
- threshold = int(round(idx + period / 2))
- idx_high = last_idx if threshold > last_idx else threshold
-
- flag_l = True
- flag_h = True
- idx_lst = list()
- inc = 1
- while flag_l or flag_h:
- if idx + inc > idx_high:
- flag_h = False
- else:
- idx_lst.append(idx + inc)
- if idx - inc < idx_low:
- flag_l = False
- else:
- idx_lst.append(idx - inc)
- inc += 1
-
- for i in idx_lst:
- if i in in_data.keys():
- data_dict[i] = in_data[i]
- break
- else:
- data_dict[idx] = data
- idx -= period
-
- return OrderedDict(sorted(data_dict.items(), key=lambda t: t[0]))
-
-
-def _evaluate_results(trimmed_data, window=10):
- """Evaluates if the sample value is regress, normal or progress compared to
- previous data within the window.
- We use the intervals defined as:
- - regress: less than trimmed moving median - 3 * stdev
- - normal: between trimmed moving median - 3 * stdev and median + 3 * stdev
- - progress: more than trimmed moving median + 3 * stdev
- where stdev is trimmed moving standard deviation.
-
- :param trimmed_data: Full data set with the outliers replaced by nan.
- :param window: Window size used to calculate moving average and moving stdev.
- :type trimmed_data: pandas.Series
- :type window: int
- :returns: Evaluated results.
- :rtype: list
- """
-
- if len(trimmed_data) > 2:
- win_size = trimmed_data.size if trimmed_data.size < window else window
- results = [0.66, ]
- tmm = trimmed_data.rolling(window=win_size, min_periods=2).median()
- tmstd = trimmed_data.rolling(window=win_size, min_periods=2).std()
-
- first = True
- for build_nr, value in trimmed_data.iteritems():
- if first:
- first = False
- continue
- if (np.isnan(value)
- or np.isnan(tmm[build_nr])
- or np.isnan(tmstd[build_nr])):
- results.append(0.0)
- elif value < (tmm[build_nr] - 3 * tmstd[build_nr]):
- results.append(0.33)
- elif value > (tmm[build_nr] + 3 * tmstd[build_nr]):
- results.append(1.0)
- else:
- results.append(0.66)
- else:
- results = [0.0, ]
- try:
- tmm = np.median(trimmed_data)
- tmstd = np.std(trimmed_data)
- if trimmed_data.values[-1] < (tmm - 3 * tmstd):
- results.append(0.33)
- elif (tmm - 3 * tmstd) <= trimmed_data.values[-1] <= (
- tmm + 3 * tmstd):
- results.append(0.66)
- else:
- results.append(1.0)
- except TypeError:
- results.append(None)
- return results
-
-
-def _generate_trending_traces(in_data, build_info, period, moving_win_size=10,
- fill_missing=True, use_first=False,