80187967facb6eca2b4aa5db96fc9a56f4483c0f
[csit.git] / csit.infra.dash / app / cdash / utils / telemetry_data.py
1 # Copyright (c) 2023 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """A module implementing the parsing of OpenMetrics data and elementary
15 operations with it.
16 """
17
18
19 import binascii
20 import zlib
21 import pandas as pd
22
23 from ..trending.graphs import select_trending_data
24
25
26 class TelemetryData:
27     """A class to store and manipulate the telemetry data.
28     """
29
30     def __init__(self, tests: list=list()) -> None:
31         """Initialize the object.
32
33         :param in_data: Input data.
34         :param tests: List of selected tests.
35         :type in_data: pandas.DataFrame
36         :type tests: list
37         """
38
39         self._tests = tests
40         self._data = None
41         self._unique_metrics = list()
42         self._unique_metrics_labels = pd.DataFrame()
43         self._selected_metrics_labels = pd.DataFrame()
44
45     def from_dataframe(self, in_data: pd.DataFrame=pd.DataFrame()) -> None:
46         """Read the input from pandas DataFrame.
47
48         This method must be called at the beginning to create all data
49         structures.
50         """
51
52         if in_data.empty:
53             return
54
55         metrics = set()  # A set of unique metrics
56
57         # Create a dataframe with metrics for selected tests:
58         lst_items = list()
59         for itm in self._tests:
60             sel_data = select_trending_data(in_data, itm)
61             if sel_data is not None:
62                 sel_data["test_name"] = itm["id"]
63                 lst_items.append(sel_data)
64         df = pd.concat(lst_items, ignore_index=True, copy=False)
65
66         # Use only neccessary data:
67         df = df[[
68             "job",
69             "build",
70             "dut_type",
71             "dut_version",
72             "start_time",
73             "passed",
74             "test_name",
75             "test_type",
76             "result_receive_rate_rate_avg",
77             "result_receive_rate_rate_stdev",
78             "result_receive_rate_rate_unit",
79             "result_pdr_lower_rate_value",
80             "result_pdr_lower_rate_unit",
81             "result_ndr_lower_rate_value",
82             "result_ndr_lower_rate_unit",
83             "telemetry"
84         ]]
85         # Transform metrics from strings to dataframes:
86         lst_telemetry = list()
87         for _, row in df.iterrows():
88             d_telemetry = {
89                 "metric": list(),
90                 "labels": list(),  # list of tuple(label, value)
91                 "value": list(),
92                 "timestamp": list()
93             }
94             
95             # If there is no telemetry data, use empty dictionary
96             if row["telemetry"] is None or isinstance(row["telemetry"], float):
97                 lst_telemetry.append(pd.DataFrame(data=d_telemetry))
98                 continue
99
100             # Read telemetry data
101             # - list of uncompressed strings List[str, ...], or
102             # - list with only one compressed string List[str]
103             try:
104                 tm_data = zlib.decompress(
105                     binascii.a2b_base64(row["telemetry"][0].encode())
106                 ).decode().split("\n")
107             except (binascii.Error, zlib.error, AttributeError, IndexError):
108                 tm_data = row["telemetry"]
109
110             # Pre-process telemetry data
111             for itm in tm_data:
112                 itm_lst = itm.replace("'", "").rsplit(" ", maxsplit=2)
113                 metric, labels = itm_lst[0].split("{")
114                 d_telemetry["metric"].append(metric)
115                 d_telemetry["labels"].append(
116                     [tuple(x.split("=")) for x in labels[:-1].split(",")]
117                 )
118                 d_telemetry["value"].append(itm_lst[1])
119                 d_telemetry["timestamp"].append(itm_lst[2])
120
121             metrics.update(d_telemetry["metric"])
122             lst_telemetry.append(pd.DataFrame(data=d_telemetry))
123         df["telemetry"] = lst_telemetry
124
125         self._data = df
126         self._unique_metrics = sorted(metrics)
127
128     def from_json(self, in_data: dict) -> None:
129         """Read the input data from json.
130         """
131
132         df = pd.read_json(in_data)
133         lst_telemetry = list()
134         metrics = set()  # A set of unique metrics
135         for _, row in df.iterrows():
136             telemetry = pd.DataFrame(row["telemetry"])
137             lst_telemetry.append(telemetry)
138             metrics.update(telemetry["metric"].to_list())
139         df["telemetry"] = lst_telemetry
140
141         self._data = df
142         self._unique_metrics = sorted(metrics)
143
144     def from_metrics(self, in_data: set) -> None:
145         """Read only the metrics.
146         """
147         self._unique_metrics = in_data
148
149     def from_metrics_with_labels(self, in_data: dict) -> None:
150         """Read only metrics with labels.
151         """
152         self._unique_metrics_labels = pd.DataFrame.from_dict(in_data)
153
154     def to_json(self) -> str:
155         """Return the data transformed from dataframe to json.
156
157         :returns: Telemetry data transformed to a json structure.
158         :rtype: dict
159         """
160         return self._data.to_json()
161
162     @property
163     def unique_metrics(self) -> list:
164         """Return a set of unique metrics.
165
166         :returns: A set of unique metrics.
167         :rtype: set
168         """
169         return self._unique_metrics
170
171     @property
172     def unique_metrics_with_labels(self) -> dict:
173         """
174         """
175         return self._unique_metrics_labels.to_dict()
176
177     def get_selected_labels(self, metrics: list) -> dict:
178         """Return a dictionary with labels (keys) and all their possible values
179         (values) for all selected 'metrics'.
180
181         :param metrics: List of metrics we are interested in.
182         :type metrics: list
183         :returns: A dictionary with labels and all their possible values.
184         :rtype: dict
185         """
186
187         lst_labels = list()
188         tmp_labels = dict()
189         for _, row in self._data.iterrows():
190             telemetry = row["telemetry"]
191             for itm in metrics:
192                 df = telemetry.loc[(telemetry["metric"] == itm)]
193                 lst_labels.append(df)
194                 for _, tm in df.iterrows():
195                     for label in tm["labels"]:
196                         if label[0] not in tmp_labels:
197                             tmp_labels[label[0]] = set()
198                         tmp_labels[label[0]].add(label[1])
199
200         df_labels = pd.concat(lst_labels, ignore_index=True, copy=False)
201         selected_labels = dict()
202         for key in sorted(tmp_labels):
203             selected_labels[key] = sorted(tmp_labels[key])
204
205         self._unique_metrics_labels = df_labels[["metric", "labels"]].\
206             loc[df_labels[["metric", "labels"]].astype(str).\
207                 drop_duplicates().index]
208
209         return selected_labels
210
211     @property
212     def str_metrics(self) -> str:
213         """Returns all unique metrics as a string.
214         """
215         return TelemetryData.metrics_to_str(self._unique_metrics_labels)
216
217     @staticmethod
218     def metrics_to_str(in_data: pd.DataFrame) -> str:
219         """Convert metrics from pandas dataframe to string. Metrics in string
220         are separated by '\n'.
221
222         :param in_data: Metrics to be converted to a string.
223         :type in_data: pandas.DataFrame
224         :returns: Metrics as a string.
225         :rtype: str
226         """
227         metrics = str()
228         for _, row in in_data.iterrows():
229             labels = ','.join([f"{itm[0]}='{itm[1]}'" for itm in row["labels"]])
230             metrics += f"{row['metric']}{{{labels}}}\n"
231         return metrics[:-1]
232
233     def search_unique_metrics(self, string: str) -> list:
234         """Return a list of metrics which name includes the given string.
235
236         :param string: A string which must be in the name of metric.
237         :type string: str
238         :returns: A list of metrics which name includes the given string.
239         :rtype: list
240         """
241         return [itm for itm in self._unique_metrics if string in itm]
242
243     def filter_selected_metrics_by_labels(
244             self,
245             selection: dict
246         ) -> pd.DataFrame:
247         """Filter selected unique metrics by labels and their values.
248
249         :param selection: Labels and their values specified by the user.
250         :type selection: dict
251         :returns: Pandas dataframe with filtered metrics.
252         :rtype: pandas.DataFrame
253         """
254
255         def _is_selected(labels: list, sel: dict) -> bool:
256             """Check if the provided 'labels' are selected by the user.
257
258             :param labels: List of labels and their values from a metric. The
259                 items in this lists are two-item-lists whre the first item is
260                 the label and the second one is its value.
261             :param sel: User selection. The keys are the selected lables and the
262                 values are lists with label values.
263             :type labels: list
264             :type sel: dict
265             :returns: True if the 'labels' are selected by the user.
266             :rtype: bool
267             """
268             passed = list()
269             labels = dict(labels)
270             for key in sel.keys():
271                 if key in list(labels.keys()):
272                     if sel[key]:
273                         passed.append(labels[key] in sel[key])
274                     else:
275                         passed.append(True)
276                 else:
277                     passed.append(False)
278             return bool(passed and all(passed))
279
280         self._selected_metrics_labels = pd.DataFrame()
281         lst_items = list()
282         for _, row in self._unique_metrics_labels.iterrows():
283             if _is_selected(row["labels"], selection):
284                 lst_items.append(row.to_frame().T)
285         self._selected_metrics_labels = \
286             pd.concat(lst_items, ignore_index=True, axis=0, copy=False)
287         return self._selected_metrics_labels
288
289     def select_tm_trending_data(
290             self,
291             selection: dict,
292             ignore_host: bool = False
293         ) -> pd.DataFrame:
294         """Select telemetry data for trending based on user's 'selection'.
295
296         The output dataframe includes these columns:
297             - "job",
298             - "build",
299             - "dut_type",
300             - "dut_version",
301             - "start_time",
302             - "passed",
303             - "test_name",
304             - "test_id",
305             - "test_type",
306             - "result_receive_rate_rate_avg",
307             - "result_receive_rate_rate_stdev",
308             - "result_receive_rate_rate_unit",
309             - "result_pdr_lower_rate_value",
310             - "result_pdr_lower_rate_unit",
311             - "result_ndr_lower_rate_value",
312             - "result_ndr_lower_rate_unit",
313             - "tm_metric",
314             - "tm_value".
315
316         :param selection: User's selection (metrics and labels).
317         :param ignore_host: Ignore 'hostname' and 'hook' labels in metrics.
318         :type selection: dict
319         :type ignore_host: bool
320         :returns: Dataframe with selected data.
321         :rtype: pandas.DataFrame
322         """
323
324         if self._data is None:
325             return pd.DataFrame()
326         if self._data.empty:
327             return pd.DataFrame()
328         if not selection:
329             return pd.DataFrame()
330
331         df_sel = pd.DataFrame.from_dict(selection)
332         lst_rows = list()
333         for _, row in self._data.iterrows():
334             tm_row = row["telemetry"]
335             for _, tm_sel in df_sel.iterrows():
336                 df_tmp = tm_row.loc[tm_row["metric"] == tm_sel["metric"]]
337                 for _, tm in df_tmp.iterrows():
338                     do_it = False
339                     if ignore_host:
340                         if tm["labels"][2:] == tm_sel["labels"][2:]:
341                             labels = ','.join(
342                                 [f"{i[0]}='{i[1]}'" for i in tm["labels"][2:]]
343                             )
344                             do_it = True
345                     else:
346                         if tm["labels"] == tm_sel["labels"]:
347                             labels = ','.join(
348                                 [f"{i[0]}='{i[1]}'" for i in tm["labels"]]
349                             )
350                             do_it = True
351                     if do_it:
352                         row["tm_metric"] = f"{tm['metric']}{{{labels}}}"
353                         row["tm_value"] = tm["value"]
354                         lst_rows.append(
355                             row.drop(labels=["telemetry", ]).to_frame().T
356                         )
357         if lst_rows:
358             return pd.concat(
359                 lst_rows, ignore_index=True, axis=0, copy=False
360             ).drop_duplicates()
361         else:
362             return pd.DataFrame()