-def _select_data(in_data, period, fill_missing=False, use_first=False):
- """Select the data from the full data set. The selection is done by picking
- the samples depending on the period: period = 1: All, period = 2: every
- second sample, period = 3: every third sample ...
-
- :param in_data: Full set of data.
- :param period: Sampling period.
- :param fill_missing: If the chosen sample is missing in the full set, its
- nearest neighbour is used.
- :param use_first: Use the first sample even though it is not chosen.
- :type in_data: OrderedDict
- :type period: int
- :type fill_missing: bool
- :type use_first: bool
- :returns: Reduced data.
- :rtype: OrderedDict
- """
-
- first_idx = min(in_data.keys())
- last_idx = max(in_data.keys())
-
- idx = last_idx
- data_dict = dict()
- if use_first:
- data_dict[first_idx] = in_data[first_idx]
- while idx >= first_idx:
- data = in_data.get(idx, None)
- if data is None:
- if fill_missing:
- threshold = int(round(idx - period / 2)) + 1 - period % 2
- idx_low = first_idx if threshold < first_idx else threshold
- threshold = int(round(idx + period / 2))
- idx_high = last_idx if threshold > last_idx else threshold
-
- flag_l = True
- flag_h = True
- idx_lst = list()
- inc = 1
- while flag_l or flag_h:
- if idx + inc > idx_high:
- flag_h = False
- else:
- idx_lst.append(idx + inc)
- if idx - inc < idx_low:
- flag_l = False
- else:
- idx_lst.append(idx - inc)
- inc += 1
-
- for i in idx_lst:
- if i in in_data.keys():
- data_dict[i] = in_data[i]
- break
- else:
- data_dict[idx] = data
- idx -= period
-
- return OrderedDict(sorted(data_dict.items(), key=lambda t: t[0]))
-
-
-def _evaluate_results(in_data, trimmed_data, window=10):
- """Evaluates if the sample value is regress, normal or progress compared to
- previous data within the window.
- We use the intervals defined as:
- - regress: less than median - 3 * stdev
- - normal: between median - 3 * stdev and median + 3 * stdev
- - progress: more than median + 3 * stdev
-
- :param in_data: Full data set.
- :param trimmed_data: Full data set without the outliers.
- :param window: Window size used to calculate moving median and moving stdev.
- :type in_data: pandas.Series
- :type trimmed_data: pandas.Series
- :type window: int
- :returns: Evaluated results.
- :rtype: list
- """
-
- if len(in_data) > 2:
- win_size = in_data.size if in_data.size < window else window
- results = [0.0, ] * win_size
- median = in_data.rolling(window=win_size).median()
- stdev_t = trimmed_data.rolling(window=win_size, min_periods=2).std()
- m_vals = median.values
- s_vals = stdev_t.values
- d_vals = in_data.values
- for day in range(win_size, in_data.size):
- if np.isnan(m_vals[day - 1]) or np.isnan(s_vals[day - 1]):
- results.append(0.0)
- elif d_vals[day] < (m_vals[day - 1] - 3 * s_vals[day - 1]):
- results.append(0.33)
- elif (m_vals[day - 1] - 3 * s_vals[day - 1]) <= d_vals[day] <= \
- (m_vals[day - 1] + 3 * s_vals[day - 1]):
- results.append(0.66)
- else:
- results.append(1.0)
- else:
- results = [0.0, ]
- try:
- median = np.median(in_data)
- stdev = np.std(in_data)
- if in_data.values[-1] < (median - 3 * stdev):
- results.append(0.33)
- elif (median - 3 * stdev) <= in_data.values[-1] <= (
- median + 3 * stdev):
- results.append(0.66)
- else:
- results.append(1.0)
- except TypeError:
- results.append(None)
- return results
-
-
-def _generate_trending_traces(in_data, build_info, period, moving_win_size=10,
- fill_missing=True, use_first=False,
- show_moving_median=True, name="", color=""):