C-Dash: Telemetry - Add option to ignore hosts
[csit.git] / csit.infra.dash / app / cdash / utils / telemetry_data.py
index e88b8ee..8018796 100644 (file)
@@ -16,6 +16,8 @@ operations with it.
 """
 
 
+import binascii
+import zlib
 import pandas as pd
 
 from ..trending.graphs import select_trending_data
@@ -43,21 +45,24 @@ class TelemetryData:
     def from_dataframe(self, in_data: pd.DataFrame=pd.DataFrame()) -> None:
         """Read the input from pandas DataFrame.
 
-        This method must be call at the begining to create all data structures.
+        This method must be called at the beginning to create all data
+        structures.
         """
 
         if in_data.empty:
             return
 
-        df = pd.DataFrame()
         metrics = set()  # A set of unique metrics
 
         # Create a dataframe with metrics for selected tests:
+        lst_items = list()
         for itm in self._tests:
             sel_data = select_trending_data(in_data, itm)
             if sel_data is not None:
                 sel_data["test_name"] = itm["id"]
-                df = pd.concat([df, sel_data], ignore_index=True, copy=False)
+                lst_items.append(sel_data)
+        df = pd.concat(lst_items, ignore_index=True, copy=False)
+
         # Use only neccessary data:
         df = df[[
             "job",
@@ -86,18 +91,34 @@ class TelemetryData:
                 "value": list(),
                 "timestamp": list()
             }
-            if row["telemetry"] is not None and \
-                    not isinstance(row["telemetry"], float):
-                for itm in row["telemetry"]:
-                    itm_lst = itm.replace("'", "").rsplit(" ", maxsplit=2)
-                    metric, labels = itm_lst[0].split("{")
-                    d_telemetry["metric"].append(metric)
-                    d_telemetry["labels"].append(
-                        [tuple(x.split("=")) for x in labels[:-1].split(",")]
-                    )
-                    d_telemetry["value"].append(itm_lst[1])
-                    d_telemetry["timestamp"].append(itm_lst[2])
-                metrics.update(d_telemetry["metric"])
+            
+            # If there is no telemetry data, use empty dictionary
+            if row["telemetry"] is None or isinstance(row["telemetry"], float):
+                lst_telemetry.append(pd.DataFrame(data=d_telemetry))
+                continue
+
+            # Read telemetry data
+            # - list of uncompressed strings List[str, ...], or
+            # - list with only one compressed string List[str]
+            try:
+                tm_data = zlib.decompress(
+                    binascii.a2b_base64(row["telemetry"][0].encode())
+                ).decode().split("\n")
+            except (binascii.Error, zlib.error, AttributeError, IndexError):
+                tm_data = row["telemetry"]
+
+            # Pre-process telemetry data
+            for itm in tm_data:
+                itm_lst = itm.replace("'", "").rsplit(" ", maxsplit=2)
+                metric, labels = itm_lst[0].split("{")
+                d_telemetry["metric"].append(metric)
+                d_telemetry["labels"].append(
+                    [tuple(x.split("=")) for x in labels[:-1].split(",")]
+                )
+                d_telemetry["value"].append(itm_lst[1])
+                d_telemetry["timestamp"].append(itm_lst[2])
+
+            metrics.update(d_telemetry["metric"])
             lst_telemetry.append(pd.DataFrame(data=d_telemetry))
         df["telemetry"] = lst_telemetry
 
@@ -163,23 +184,20 @@ class TelemetryData:
         :rtype: dict
         """
 
-        df_labels = pd.DataFrame()
+        lst_labels = list()
         tmp_labels = dict()
         for _, row in self._data.iterrows():
             telemetry = row["telemetry"]
             for itm in metrics:
                 df = telemetry.loc[(telemetry["metric"] == itm)]
-                df_labels = pd.concat(
-                    [df_labels, df],
-                    ignore_index=True,
-                    copy=False
-                )
+                lst_labels.append(df)
                 for _, tm in df.iterrows():
                     for label in tm["labels"]:
                         if label[0] not in tmp_labels:
                             tmp_labels[label[0]] = set()
                         tmp_labels[label[0]].add(label[1])
 
+        df_labels = pd.concat(lst_labels, ignore_index=True, copy=False)
         selected_labels = dict()
         for key in sorted(tmp_labels):
             selected_labels[key] = sorted(tmp_labels[key])
@@ -260,17 +278,19 @@ class TelemetryData:
             return bool(passed and all(passed))
 
         self._selected_metrics_labels = pd.DataFrame()
+        lst_items = list()
         for _, row in self._unique_metrics_labels.iterrows():
             if _is_selected(row["labels"], selection):
-                self._selected_metrics_labels = pd.concat(
-                    [self._selected_metrics_labels, row.to_frame().T],
-                    ignore_index=True,
-                    axis=0,
-                    copy=False
-                )
+                lst_items.append(row.to_frame().T)
+        self._selected_metrics_labels = \
+            pd.concat(lst_items, ignore_index=True, axis=0, copy=False)
         return self._selected_metrics_labels
 
-    def select_tm_trending_data(self, selection: dict) -> pd.DataFrame:
+    def select_tm_trending_data(
+            self,
+            selection: dict,
+            ignore_host: bool = False
+        ) -> pd.DataFrame:
         """Select telemetry data for trending based on user's 'selection'.
 
         The output dataframe includes these columns:
@@ -294,37 +314,49 @@ class TelemetryData:
             - "tm_value".
 
         :param selection: User's selection (metrics and labels).
+        :param ignore_host: Ignore 'hostname' and 'hook' labels in metrics.
         :type selection: dict
+        :type ignore_host: bool
         :returns: Dataframe with selected data.
         :rtype: pandas.DataFrame
         """
 
-        df = pd.DataFrame()
-
         if self._data is None:
-            return df
+            return pd.DataFrame()
         if self._data.empty:
-            return df
+            return pd.DataFrame()
         if not selection:
-            return df
+            return pd.DataFrame()
 
         df_sel = pd.DataFrame.from_dict(selection)
+        lst_rows = list()
         for _, row in self._data.iterrows():
             tm_row = row["telemetry"]
             for _, tm_sel in df_sel.iterrows():
                 df_tmp = tm_row.loc[tm_row["metric"] == tm_sel["metric"]]
                 for _, tm in df_tmp.iterrows():
-                    if tm["labels"] == tm_sel["labels"]:
-                        labels = ','.join(
-                            [f"{itm[0]}='{itm[1]}'" for itm in tm["labels"]]
-                        )
+                    do_it = False
+                    if ignore_host:
+                        if tm["labels"][2:] == tm_sel["labels"][2:]:
+                            labels = ','.join(
+                                [f"{i[0]}='{i[1]}'" for i in tm["labels"][2:]]
+                            )
+                            do_it = True
+                    else:
+                        if tm["labels"] == tm_sel["labels"]:
+                            labels = ','.join(
+                                [f"{i[0]}='{i[1]}'" for i in tm["labels"]]
+                            )
+                            do_it = True
+                    if do_it:
                         row["tm_metric"] = f"{tm['metric']}{{{labels}}}"
                         row["tm_value"] = tm["value"]
-                        new_row = row.drop(labels=["telemetry", ])
-                        df = pd.concat(
-                            [df, new_row.to_frame().T],
-                            ignore_index=True,
-                            axis=0,
-                            copy=False
+                        lst_rows.append(
+                            row.drop(labels=["telemetry", ]).to_frame().T
                         )
-        return df
+        if lst_rows:
+            return pd.concat(
+                lst_rows, ignore_index=True, axis=0, copy=False
+            ).drop_duplicates()
+        else:
+            return pd.DataFrame()