1 # Copyright (c) 2023 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """A module implementing the parsing of OpenMetrics data and elementary
23 from ..trending.graphs import select_trending_data
27 """A class to store and manipulate the telemetry data.
30 def __init__(self, tests: list=list()) -> None:
31 """Initialize the object.
33 :param in_data: Input data.
34 :param tests: List of selected tests.
35 :type in_data: pandas.DataFrame
41 self._unique_metrics = list()
42 self._unique_metrics_labels = pd.DataFrame()
43 self._selected_metrics_labels = pd.DataFrame()
45 def from_dataframe(self, in_data: pd.DataFrame=pd.DataFrame()) -> None:
46 """Read the input from pandas DataFrame.
48 This method must be called at the beginning to create all data
56 metrics = set() # A set of unique metrics
58 # Create a dataframe with metrics for selected tests:
59 for itm in self._tests:
60 sel_data = select_trending_data(in_data, itm)
61 if sel_data is not None:
62 sel_data["test_name"] = itm["id"]
63 df = pd.concat([df, sel_data], ignore_index=True, copy=False)
64 # Use only neccessary data:
74 "result_receive_rate_rate_avg",
75 "result_receive_rate_rate_stdev",
76 "result_receive_rate_rate_unit",
77 "result_pdr_lower_rate_value",
78 "result_pdr_lower_rate_unit",
79 "result_ndr_lower_rate_value",
80 "result_ndr_lower_rate_unit",
83 # Transform metrics from strings to dataframes:
84 lst_telemetry = list()
85 for _, row in df.iterrows():
88 "labels": list(), # list of tuple(label, value)
93 # If there is no telemetry data, use empty dictionary
94 if row["telemetry"] is None or isinstance(row["telemetry"], float):
95 lst_telemetry.append(pd.DataFrame(data=d_telemetry))
99 # - list of uncompressed strings List[str, ...], or
100 # - list with only one compressed string List[str]
102 tm_data = zlib.decompress(
103 binascii.a2b_base64(row["telemetry"][0].encode())
104 ).decode().split("\n")
105 except (binascii.Error, zlib.error, AttributeError, IndexError):
106 tm_data = row["telemetry"]
108 # Pre-process telemetry data
110 itm_lst = itm.replace("'", "").rsplit(" ", maxsplit=2)
111 metric, labels = itm_lst[0].split("{")
112 d_telemetry["metric"].append(metric)
113 d_telemetry["labels"].append(
114 [tuple(x.split("=")) for x in labels[:-1].split(",")]
116 d_telemetry["value"].append(itm_lst[1])
117 d_telemetry["timestamp"].append(itm_lst[2])
119 metrics.update(d_telemetry["metric"])
120 lst_telemetry.append(pd.DataFrame(data=d_telemetry))
121 df["telemetry"] = lst_telemetry
124 self._unique_metrics = sorted(metrics)
126 def from_json(self, in_data: dict) -> None:
127 """Read the input data from json.
130 df = pd.read_json(in_data)
131 lst_telemetry = list()
132 metrics = set() # A set of unique metrics
133 for _, row in df.iterrows():
134 telemetry = pd.DataFrame(row["telemetry"])
135 lst_telemetry.append(telemetry)
136 metrics.update(telemetry["metric"].to_list())
137 df["telemetry"] = lst_telemetry
140 self._unique_metrics = sorted(metrics)
142 def from_metrics(self, in_data: set) -> None:
143 """Read only the metrics.
145 self._unique_metrics = in_data
147 def from_metrics_with_labels(self, in_data: dict) -> None:
148 """Read only metrics with labels.
150 self._unique_metrics_labels = pd.DataFrame.from_dict(in_data)
152 def to_json(self) -> str:
153 """Return the data transformed from dataframe to json.
155 :returns: Telemetry data transformed to a json structure.
158 return self._data.to_json()
161 def unique_metrics(self) -> list:
162 """Return a set of unique metrics.
164 :returns: A set of unique metrics.
167 return self._unique_metrics
170 def unique_metrics_with_labels(self) -> dict:
173 return self._unique_metrics_labels.to_dict()
175 def get_selected_labels(self, metrics: list) -> dict:
176 """Return a dictionary with labels (keys) and all their possible values
177 (values) for all selected 'metrics'.
179 :param metrics: List of metrics we are interested in.
181 :returns: A dictionary with labels and all their possible values.
185 df_labels = pd.DataFrame()
187 for _, row in self._data.iterrows():
188 telemetry = row["telemetry"]
190 df = telemetry.loc[(telemetry["metric"] == itm)]
191 df_labels = pd.concat(
196 for _, tm in df.iterrows():
197 for label in tm["labels"]:
198 if label[0] not in tmp_labels:
199 tmp_labels[label[0]] = set()
200 tmp_labels[label[0]].add(label[1])
202 selected_labels = dict()
203 for key in sorted(tmp_labels):
204 selected_labels[key] = sorted(tmp_labels[key])
206 self._unique_metrics_labels = df_labels[["metric", "labels"]].\
207 loc[df_labels[["metric", "labels"]].astype(str).\
208 drop_duplicates().index]
210 return selected_labels
213 def str_metrics(self) -> str:
214 """Returns all unique metrics as a string.
216 return TelemetryData.metrics_to_str(self._unique_metrics_labels)
219 def metrics_to_str(in_data: pd.DataFrame) -> str:
220 """Convert metrics from pandas dataframe to string. Metrics in string
221 are separated by '\n'.
223 :param in_data: Metrics to be converted to a string.
224 :type in_data: pandas.DataFrame
225 :returns: Metrics as a string.
229 for _, row in in_data.iterrows():
230 labels = ','.join([f"{itm[0]}='{itm[1]}'" for itm in row["labels"]])
231 metrics += f"{row['metric']}{{{labels}}}\n"
234 def search_unique_metrics(self, string: str) -> list:
235 """Return a list of metrics which name includes the given string.
237 :param string: A string which must be in the name of metric.
239 :returns: A list of metrics which name includes the given string.
242 return [itm for itm in self._unique_metrics if string in itm]
244 def filter_selected_metrics_by_labels(
248 """Filter selected unique metrics by labels and their values.
250 :param selection: Labels and their values specified by the user.
251 :type selection: dict
252 :returns: Pandas dataframe with filtered metrics.
253 :rtype: pandas.DataFrame
256 def _is_selected(labels: list, sel: dict) -> bool:
257 """Check if the provided 'labels' are selected by the user.
259 :param labels: List of labels and their values from a metric. The
260 items in this lists are two-item-lists whre the first item is
261 the label and the second one is its value.
262 :param sel: User selection. The keys are the selected lables and the
263 values are lists with label values.
266 :returns: True if the 'labels' are selected by the user.
270 labels = dict(labels)
271 for key in sel.keys():
272 if key in list(labels.keys()):
274 passed.append(labels[key] in sel[key])
279 return bool(passed and all(passed))
281 self._selected_metrics_labels = pd.DataFrame()
282 for _, row in self._unique_metrics_labels.iterrows():
283 if _is_selected(row["labels"], selection):
284 self._selected_metrics_labels = pd.concat(
285 [self._selected_metrics_labels, row.to_frame().T],
290 return self._selected_metrics_labels
292 def select_tm_trending_data(self, selection: dict) -> pd.DataFrame:
293 """Select telemetry data for trending based on user's 'selection'.
295 The output dataframe includes these columns:
305 - "result_receive_rate_rate_avg",
306 - "result_receive_rate_rate_stdev",
307 - "result_receive_rate_rate_unit",
308 - "result_pdr_lower_rate_value",
309 - "result_pdr_lower_rate_unit",
310 - "result_ndr_lower_rate_value",
311 - "result_ndr_lower_rate_unit",
315 :param selection: User's selection (metrics and labels).
316 :type selection: dict
317 :returns: Dataframe with selected data.
318 :rtype: pandas.DataFrame
323 if self._data is None:
330 df_sel = pd.DataFrame.from_dict(selection)
331 for _, row in self._data.iterrows():
332 tm_row = row["telemetry"]
333 for _, tm_sel in df_sel.iterrows():
334 df_tmp = tm_row.loc[tm_row["metric"] == tm_sel["metric"]]
335 for _, tm in df_tmp.iterrows():
336 if tm["labels"] == tm_sel["labels"]:
338 [f"{itm[0]}='{itm[1]}'" for itm in tm["labels"]]
340 row["tm_metric"] = f"{tm['metric']}{{{labels}}}"
341 row["tm_value"] = tm["value"]
342 new_row = row.drop(labels=["telemetry", ])
344 [df, new_row.to_frame().T],