C-Dash: Add processing of compressed telemetry data
[csit.git] / csit.infra.dash / app / cdash / utils / telemetry_data.py
1 # Copyright (c) 2023 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """A module implementing the parsing of OpenMetrics data and elementary
15 operations with it.
16 """
17
18
19 import binascii
20 import zlib
21 import pandas as pd
22
23 from ..trending.graphs import select_trending_data
24
25
26 class TelemetryData:
27     """A class to store and manipulate the telemetry data.
28     """
29
30     def __init__(self, tests: list=list()) -> None:
31         """Initialize the object.
32
33         :param in_data: Input data.
34         :param tests: List of selected tests.
35         :type in_data: pandas.DataFrame
36         :type tests: list
37         """
38
39         self._tests = tests
40         self._data = None
41         self._unique_metrics = list()
42         self._unique_metrics_labels = pd.DataFrame()
43         self._selected_metrics_labels = pd.DataFrame()
44
45     def from_dataframe(self, in_data: pd.DataFrame=pd.DataFrame()) -> None:
46         """Read the input from pandas DataFrame.
47
48         This method must be called at the beginning to create all data
49         structures.
50         """
51
52         if in_data.empty:
53             return
54
55         df = pd.DataFrame()
56         metrics = set()  # A set of unique metrics
57
58         # Create a dataframe with metrics for selected tests:
59         for itm in self._tests:
60             sel_data = select_trending_data(in_data, itm)
61             if sel_data is not None:
62                 sel_data["test_name"] = itm["id"]
63                 df = pd.concat([df, sel_data], ignore_index=True, copy=False)
64         # Use only neccessary data:
65         df = df[[
66             "job",
67             "build",
68             "dut_type",
69             "dut_version",
70             "start_time",
71             "passed",
72             "test_name",
73             "test_type",
74             "result_receive_rate_rate_avg",
75             "result_receive_rate_rate_stdev",
76             "result_receive_rate_rate_unit",
77             "result_pdr_lower_rate_value",
78             "result_pdr_lower_rate_unit",
79             "result_ndr_lower_rate_value",
80             "result_ndr_lower_rate_unit",
81             "telemetry"
82         ]]
83         # Transform metrics from strings to dataframes:
84         lst_telemetry = list()
85         for _, row in df.iterrows():
86             d_telemetry = {
87                 "metric": list(),
88                 "labels": list(),  # list of tuple(label, value)
89                 "value": list(),
90                 "timestamp": list()
91             }
92             
93             # If there is no telemetry data, use empty dictionary
94             if row["telemetry"] is None or isinstance(row["telemetry"], float):
95                 lst_telemetry.append(pd.DataFrame(data=d_telemetry))
96                 continue
97
98             # Read telemetry data
99             # - list of uncompressed strings List[str, ...], or
100             # - list with only one compressed string List[str]
101             try:
102                 tm_data = zlib.decompress(
103                     binascii.a2b_base64(row["telemetry"][0].encode())
104                 ).decode().split("\n")
105             except (binascii.Error, zlib.error, AttributeError, IndexError):
106                 tm_data = row["telemetry"]
107
108             # Pre-process telemetry data
109             for itm in tm_data:
110                 itm_lst = itm.replace("'", "").rsplit(" ", maxsplit=2)
111                 metric, labels = itm_lst[0].split("{")
112                 d_telemetry["metric"].append(metric)
113                 d_telemetry["labels"].append(
114                     [tuple(x.split("=")) for x in labels[:-1].split(",")]
115                 )
116                 d_telemetry["value"].append(itm_lst[1])
117                 d_telemetry["timestamp"].append(itm_lst[2])
118
119             metrics.update(d_telemetry["metric"])
120             lst_telemetry.append(pd.DataFrame(data=d_telemetry))
121         df["telemetry"] = lst_telemetry
122
123         self._data = df
124         self._unique_metrics = sorted(metrics)
125
126     def from_json(self, in_data: dict) -> None:
127         """Read the input data from json.
128         """
129
130         df = pd.read_json(in_data)
131         lst_telemetry = list()
132         metrics = set()  # A set of unique metrics
133         for _, row in df.iterrows():
134             telemetry = pd.DataFrame(row["telemetry"])
135             lst_telemetry.append(telemetry)
136             metrics.update(telemetry["metric"].to_list())
137         df["telemetry"] = lst_telemetry
138
139         self._data = df
140         self._unique_metrics = sorted(metrics)
141
142     def from_metrics(self, in_data: set) -> None:
143         """Read only the metrics.
144         """
145         self._unique_metrics = in_data
146
147     def from_metrics_with_labels(self, in_data: dict) -> None:
148         """Read only metrics with labels.
149         """
150         self._unique_metrics_labels = pd.DataFrame.from_dict(in_data)
151
152     def to_json(self) -> str:
153         """Return the data transformed from dataframe to json.
154
155         :returns: Telemetry data transformed to a json structure.
156         :rtype: dict
157         """
158         return self._data.to_json()
159
160     @property
161     def unique_metrics(self) -> list:
162         """Return a set of unique metrics.
163
164         :returns: A set of unique metrics.
165         :rtype: set
166         """
167         return self._unique_metrics
168
169     @property
170     def unique_metrics_with_labels(self) -> dict:
171         """
172         """
173         return self._unique_metrics_labels.to_dict()
174
175     def get_selected_labels(self, metrics: list) -> dict:
176         """Return a dictionary with labels (keys) and all their possible values
177         (values) for all selected 'metrics'.
178
179         :param metrics: List of metrics we are interested in.
180         :type metrics: list
181         :returns: A dictionary with labels and all their possible values.
182         :rtype: dict
183         """
184
185         df_labels = pd.DataFrame()
186         tmp_labels = dict()
187         for _, row in self._data.iterrows():
188             telemetry = row["telemetry"]
189             for itm in metrics:
190                 df = telemetry.loc[(telemetry["metric"] == itm)]
191                 df_labels = pd.concat(
192                     [df_labels, df],
193                     ignore_index=True,
194                     copy=False
195                 )
196                 for _, tm in df.iterrows():
197                     for label in tm["labels"]:
198                         if label[0] not in tmp_labels:
199                             tmp_labels[label[0]] = set()
200                         tmp_labels[label[0]].add(label[1])
201
202         selected_labels = dict()
203         for key in sorted(tmp_labels):
204             selected_labels[key] = sorted(tmp_labels[key])
205
206         self._unique_metrics_labels = df_labels[["metric", "labels"]].\
207             loc[df_labels[["metric", "labels"]].astype(str).\
208                 drop_duplicates().index]
209
210         return selected_labels
211
212     @property
213     def str_metrics(self) -> str:
214         """Returns all unique metrics as a string.
215         """
216         return TelemetryData.metrics_to_str(self._unique_metrics_labels)
217
218     @staticmethod
219     def metrics_to_str(in_data: pd.DataFrame) -> str:
220         """Convert metrics from pandas dataframe to string. Metrics in string
221         are separated by '\n'.
222
223         :param in_data: Metrics to be converted to a string.
224         :type in_data: pandas.DataFrame
225         :returns: Metrics as a string.
226         :rtype: str
227         """
228         metrics = str()
229         for _, row in in_data.iterrows():
230             labels = ','.join([f"{itm[0]}='{itm[1]}'" for itm in row["labels"]])
231             metrics += f"{row['metric']}{{{labels}}}\n"
232         return metrics[:-1]
233
234     def search_unique_metrics(self, string: str) -> list:
235         """Return a list of metrics which name includes the given string.
236
237         :param string: A string which must be in the name of metric.
238         :type string: str
239         :returns: A list of metrics which name includes the given string.
240         :rtype: list
241         """
242         return [itm for itm in self._unique_metrics if string in itm]
243
244     def filter_selected_metrics_by_labels(
245             self,
246             selection: dict
247         ) -> pd.DataFrame:
248         """Filter selected unique metrics by labels and their values.
249
250         :param selection: Labels and their values specified by the user.
251         :type selection: dict
252         :returns: Pandas dataframe with filtered metrics.
253         :rtype: pandas.DataFrame
254         """
255
256         def _is_selected(labels: list, sel: dict) -> bool:
257             """Check if the provided 'labels' are selected by the user.
258
259             :param labels: List of labels and their values from a metric. The
260                 items in this lists are two-item-lists whre the first item is
261                 the label and the second one is its value.
262             :param sel: User selection. The keys are the selected lables and the
263                 values are lists with label values.
264             :type labels: list
265             :type sel: dict
266             :returns: True if the 'labels' are selected by the user.
267             :rtype: bool
268             """
269             passed = list()
270             labels = dict(labels)
271             for key in sel.keys():
272                 if key in list(labels.keys()):
273                     if sel[key]:
274                         passed.append(labels[key] in sel[key])
275                     else:
276                         passed.append(True)
277                 else:
278                     passed.append(False)
279             return bool(passed and all(passed))
280
281         self._selected_metrics_labels = pd.DataFrame()
282         for _, row in self._unique_metrics_labels.iterrows():
283             if _is_selected(row["labels"], selection):
284                 self._selected_metrics_labels = pd.concat(
285                     [self._selected_metrics_labels, row.to_frame().T],
286                     ignore_index=True,
287                     axis=0,
288                     copy=False
289                 )
290         return self._selected_metrics_labels
291
292     def select_tm_trending_data(self, selection: dict) -> pd.DataFrame:
293         """Select telemetry data for trending based on user's 'selection'.
294
295         The output dataframe includes these columns:
296             - "job",
297             - "build",
298             - "dut_type",
299             - "dut_version",
300             - "start_time",
301             - "passed",
302             - "test_name",
303             - "test_id",
304             - "test_type",
305             - "result_receive_rate_rate_avg",
306             - "result_receive_rate_rate_stdev",
307             - "result_receive_rate_rate_unit",
308             - "result_pdr_lower_rate_value",
309             - "result_pdr_lower_rate_unit",
310             - "result_ndr_lower_rate_value",
311             - "result_ndr_lower_rate_unit",
312             - "tm_metric",
313             - "tm_value".
314
315         :param selection: User's selection (metrics and labels).
316         :type selection: dict
317         :returns: Dataframe with selected data.
318         :rtype: pandas.DataFrame
319         """
320
321         df = pd.DataFrame()
322
323         if self._data is None:
324             return df
325         if self._data.empty:
326             return df
327         if not selection:
328             return df
329
330         df_sel = pd.DataFrame.from_dict(selection)
331         for _, row in self._data.iterrows():
332             tm_row = row["telemetry"]
333             for _, tm_sel in df_sel.iterrows():
334                 df_tmp = tm_row.loc[tm_row["metric"] == tm_sel["metric"]]
335                 for _, tm in df_tmp.iterrows():
336                     if tm["labels"] == tm_sel["labels"]:
337                         labels = ','.join(
338                             [f"{itm[0]}='{itm[1]}'" for itm in tm["labels"]]
339                         )
340                         row["tm_metric"] = f"{tm['metric']}{{{labels}}}"
341                         row["tm_value"] = tm["value"]
342                         new_row = row.drop(labels=["telemetry", ])
343                         df = pd.concat(
344                             [df, new_row.to_frame().T],
345                             ignore_index=True,
346                             axis=0,
347                             copy=False
348                         )
349         return df