import hdrh.codec
from datetime import datetime
-from numpy import isnan
-
-from ..jumpavg import classify
+from ..data.utils import classify_anomalies
+
+_NORM_FREQUENCY = 2.0 # [GHz]
+_FREQURENCY = { # [GHz]
+ "2n-aws": 1.000,
+ "2n-dnv": 2.000,
+ "2n-clx": 2.300,
+ "2n-icx": 2.600,
+ "2n-skx": 2.500,
+ "2n-tx2": 2.500,
+ "2n-zn2": 2.900,
+ "3n-alt": 3.000,
+ "3n-aws": 1.000,
+ "3n-dnv": 2.000,
+ "3n-icx": 2.600,
+ "3n-skx": 2.500,
+ "3n-tsh": 2.200
+}
_ANOMALY_COLOR = {
"regression": 0.0,
return latencies
-def _classify_anomalies(data):
- """Process the data and return anomalies and trending values.
-
- Gather data into groups with average as trend value.
- Decorate values within groups to be normal,
- the first value of changed average as a regression, or a progression.
-
- :param data: Full data set with unavailable samples replaced by nan.
- :type data: OrderedDict
- :returns: Classification and trend values
- :rtype: 3-tuple, list of strings, list of floats and list of floats
- """
- # NaN means something went wrong.
- # Use 0.0 to cause that being reported as a severe regression.
- bare_data = [0.0 if isnan(sample) else sample for sample in data.values()]
- # TODO: Make BitCountingGroupList a subclass of list again?
- group_list = classify(bare_data).group_list
- group_list.reverse() # Just to use .pop() for FIFO.
- classification = list()
- avgs = list()
- stdevs = list()
- active_group = None
- values_left = 0
- avg = 0.0
- stdv = 0.0
- for sample in data.values():
- if isnan(sample):
- classification.append("outlier")
- avgs.append(sample)
- stdevs.append(sample)
- continue
- if values_left < 1 or active_group is None:
- values_left = 0
- while values_left < 1: # Ignore empty groups (should not happen).
- active_group = group_list.pop()
- values_left = len(active_group.run_list)
- avg = active_group.stats.avg
- stdv = active_group.stats.stdev
- classification.append(active_group.comment)
- avgs.append(avg)
- stdevs.append(stdv)
- values_left -= 1
- continue
- classification.append("normal")
- avgs.append(avg)
- stdevs.append(stdv)
- values_left -= 1
- return classification, avgs, stdevs
-
-
def select_trending_data(data: pd.DataFrame, itm:dict) -> pd.DataFrame:
"""
"""
def _generate_trending_traces(ttype: str, name: str, df: pd.DataFrame,
- start: datetime, end: datetime, color: str) -> list:
+ start: datetime, end: datetime, color: str, norm_factor: float) -> list:
"""
"""
return list()
x_axis = df["start_time"].tolist()
+ if ttype == "pdr-lat":
+ y_data = [(itm / norm_factor) for itm in df[_VALUE[ttype]].tolist()]
+ else:
+ y_data = [(itm * norm_factor) for itm in df[_VALUE[ttype]].tolist()]
- anomalies, trend_avg, trend_stdev = _classify_anomalies(
- {k: v for k, v in zip(x_axis, df[_VALUE[ttype]])}
+ anomalies, trend_avg, trend_stdev = classify_anomalies(
+ {k: v for k, v in zip(x_axis, y_data)}
)
hover = list()
customdata = list()
- for _, row in df.iterrows():
+ for idx, (_, row) in enumerate(df.iterrows()):
d_type = "trex" if row["dut_type"] == "none" else row["dut_type"]
hover_itm = (
f"date: {row['start_time'].strftime('%Y-%m-%d %H:%M:%S')}<br>"
- f"<prop> [{row[_UNIT[ttype]]}]: {row[_VALUE[ttype]]:,.0f}<br>"
+ f"<prop> [{row[_UNIT[ttype]]}]: {y_data[idx]:,.0f}<br>"
f"<stdev>"
f"{d_type}-ref: {row['dut_version']}<br>"
f"csit-ref: {row['job']}/{row['build']}<br>"
traces = [
go.Scatter( # Samples
x=x_axis,
- y=df[_VALUE[ttype]],
+ y=y_data,
name=name,
mode="markers",
marker={
def graph_trending(data: pd.DataFrame, sel:dict, layout: dict,
- start: datetime, end: datetime) -> tuple:
+ start: datetime, end: datetime, normalize: bool) -> tuple:
"""
"""
name = "-".join((itm["dut"], itm["phy"], itm["framesize"], itm["core"],
itm["test"], itm["testtype"], ))
+ if normalize:
+ phy = itm["phy"].split("-")
+ topo_arch = f"{phy[0]}-{phy[1]}" if len(phy) == 4 else str()
+ norm_factor = (_NORM_FREQUENCY / _FREQURENCY[topo_arch]) \
+ if topo_arch else 1.0
+ else:
+ norm_factor = 1.0
traces = _generate_trending_traces(
- itm["testtype"], name, df, start, end, _get_color(idx)
+ itm["testtype"], name, df, start, end, _get_color(idx), norm_factor
)
if traces:
if not fig_tput:
if itm["testtype"] == "pdr":
traces = _generate_trending_traces(
- "pdr-lat", name, df, start, end, _get_color(idx)
+ "pdr-lat", name, df, start, end, _get_color(idx), norm_factor
)
if traces:
if not fig_lat: