From: Tibor Frank Date: Mon, 20 Feb 2023 09:50:29 +0000 (+0100) Subject: C-Dash: Add processing of compressed telemetry data X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=commitdiff_plain;h=dfdc2dcdab4e58539536b264ce6226d0e8e179d8 C-Dash: Add processing of compressed telemetry data Signed-off-by: Tibor Frank Change-Id: Iedb67abcc5218d573890d6065e0f5c8acbaf67f6 --- diff --git a/csit.infra.dash/app/cdash/utils/telemetry_data.py b/csit.infra.dash/app/cdash/utils/telemetry_data.py index e88b8eed06..9c2e45f9a1 100644 --- a/csit.infra.dash/app/cdash/utils/telemetry_data.py +++ b/csit.infra.dash/app/cdash/utils/telemetry_data.py @@ -16,6 +16,8 @@ operations with it. """ +import binascii +import zlib import pandas as pd from ..trending.graphs import select_trending_data @@ -43,7 +45,8 @@ class TelemetryData: def from_dataframe(self, in_data: pd.DataFrame=pd.DataFrame()) -> None: """Read the input from pandas DataFrame. - This method must be call at the begining to create all data structures. + This method must be called at the beginning to create all data + structures. """ if in_data.empty: @@ -86,18 +89,34 @@ class TelemetryData: "value": list(), "timestamp": list() } - if row["telemetry"] is not None and \ - not isinstance(row["telemetry"], float): - for itm in row["telemetry"]: - itm_lst = itm.replace("'", "").rsplit(" ", maxsplit=2) - metric, labels = itm_lst[0].split("{") - d_telemetry["metric"].append(metric) - d_telemetry["labels"].append( - [tuple(x.split("=")) for x in labels[:-1].split(",")] - ) - d_telemetry["value"].append(itm_lst[1]) - d_telemetry["timestamp"].append(itm_lst[2]) - metrics.update(d_telemetry["metric"]) + + # If there is no telemetry data, use empty dictionary + if row["telemetry"] is None or isinstance(row["telemetry"], float): + lst_telemetry.append(pd.DataFrame(data=d_telemetry)) + continue + + # Read telemetry data + # - list of uncompressed strings List[str, ...], or + # - list with only one compressed string List[str] + try: + tm_data = zlib.decompress( + binascii.a2b_base64(row["telemetry"][0].encode()) + ).decode().split("\n") + except (binascii.Error, zlib.error, AttributeError, IndexError): + tm_data = row["telemetry"] + + # Pre-process telemetry data + for itm in tm_data: + itm_lst = itm.replace("'", "").rsplit(" ", maxsplit=2) + metric, labels = itm_lst[0].split("{") + d_telemetry["metric"].append(metric) + d_telemetry["labels"].append( + [tuple(x.split("=")) for x in labels[:-1].split(",")] + ) + d_telemetry["value"].append(itm_lst[1]) + d_telemetry["timestamp"].append(itm_lst[2]) + + metrics.update(d_telemetry["metric"]) lst_telemetry.append(pd.DataFrame(data=d_telemetry)) df["telemetry"] = lst_telemetry