1 # Copyright (c) 2022 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """VPP execution bundle."""
16 from copy import deepcopy
17 from logging import getLogger
18 from re import fullmatch, sub
22 from vpp_papi.vpp_papi import VPPApiClient as vpp_class
23 from .constants import Constants
27 r"(?P<thread_id>\d+)\s"
28 r"(?P<thread_name>\S+)\s.*"
29 r"(?P<thread_lcore>\d+).*"
36 r"(?P<state>\S+\s\S+|\S+)\s+"
38 r"(?P<vectors>\d+)\s+"
39 r"(?P<suspends>\d+)\s+"
41 r"(?P<vectors_calls>\S+)"
44 r"Time\s\S+,\s\d+\ssec\sinternal\snode\svector\srate\s"
45 r"(?P<rate>\S+)\sloops/sec\s"
53 r"(?P<counter>\S+\s\S+|\S+)\s+"
58 r"(?P<counter>\S+\s\S+|\S+)\s+"
61 M_NODE_COUNTERS_THREAD = (
63 r"(?P<thread_id>\d+)\s\("
64 r"(?P<thread_name>\S+)\):\s*"
70 r"(?P<reason>(\S+\s)+)\s+"
71 r"(?P<severity>\S+)\s+"
75 r"\s*per-thread\s+context\s+switches.*"
78 r"(?P<thread_name>\S+)\s+\("
79 r"(?P<thread_id>\S+)\)\s+\S+\s+"
80 r"(?P<context_switches>[\d\.]+)"
83 r"\s*per-thread\s+page\s+faults.*"
86 r"(?P<thread_name>\S+)\s+\("
87 r"(?P<thread_id>\S+)\)\s+\S+\s+"
88 r"(?P<minor_page_faults>[\d\.]+)\s+"
89 r"(?P<major_page_faults>[\d\.]+)"
93 r"(?P<thread_name>\S+)\s+\("
94 r"(?P<thread_id>\d+)\)\s*"
97 r"\s*instructions/packet,\s+cycles/packet\s+and\s+IPC.*"
101 r"(?P<node_name>\S+)\s+"
102 r"(?P<calls>[\d\.]+)\s+"
103 r"(?P<packets>[\d\.]+)\s+"
104 r"(?P<packets_per_call>[\d\.]+)\s+"
105 r"(?P<clocks_per_packets>[\d\.]+)\s+"
106 r"(?P<instructions_per_packets>[\d\.]+)\s+"
110 r"\s*cache\s+hits\s+and\s+misses.*"
114 r"(?P<node_name>\S+)\s+"
115 r"(?P<l1_hit>[\d\.]+)\s+"
116 r"(?P<l1_miss>[\d\.]+)\s+"
117 r"(?P<l2_hit>[\d\.]+)\s+"
118 r"(?P<l2_miss>[\d\.]+)\s+"
119 r"(?P<l3_hit>[\d\.]+)\s+"
120 r"(?P<l3_miss>[\d\.]+)"
123 r"\s*load\s+operations.*"
127 r"(?P<node_name>\S+)\s+"
128 r"(?P<calls>[\d\.]+)\s+"
129 r"(?P<packets>[\d\.]+)\s+"
130 r"(?P<one>[\d\.]+)\s+"
131 r"(?P<two>[\d\.]+)\s+"
132 r"(?P<three>[\d\.]+)"
135 r"\s*Branches,\s+branches\s+taken\s+and\s+mis-predictions.*"
139 r"(?P<node_name>\S+)\s+"
140 r"(?P<branches_per_call>[\d\.]+)\s+"
141 r"(?P<branches_per_packet>[\d\.]+)\s+"
142 r"(?P<taken_per_call>[\d\.]+)\s+"
143 r"(?P<taken_per_packet>[\d\.]+)\s+"
144 r"(?P<mis_predictions>[\d\.]+)"
147 r"\s*Thread\s+power\s+licensing.*"
151 r"(?P<node_name>\S+)\s+"
152 r"(?P<lvl0>[\d\.]+)\s+"
153 r"(?P<lvl1>[\d\.]+)\s+"
154 r"(?P<lvl2>[\d\.]+)\s+"
155 r"(?P<throttle>[\d\.]+)"
158 r"\s*memory\s+reads\s+and\s+writes\s+per\s+memory\s+controller.*"
163 r"(?P<runtime>[\d\.]+)\s+"
164 r"(?P<reads_mbs>[\d\.]+)\s+"
165 r"(?P<writes_mbs>[\d\.]+)\s+"
166 r"(?P<total_mbs>[\d\.]+)"
172 Creates a VPP object. This is the main object for defining a VPP program,
173 and interacting with its output.
175 def __init__(self, program, serializer, hook):
177 Initialize Bundle VPP class.
179 :param program: VPP instructions.
180 :param serializer: Metric serializer.
181 :param hook: VPP API socket.
183 :type serializer: Serializer
187 self.code = program[u"code"]
188 self.metrics = program[u"metrics"]
189 self.api_command_list = list()
190 self.api_replies_list = list()
191 self.serializer = serializer
193 vpp_class.apidir = u"/usr/share/vpp/api"
194 self.obj = vpp_class(
199 logger=getLogger(__name__)
202 def attach(self, duration):
204 Attach events to VPP.
206 :param duration: Trial duration.
210 self.obj.connect(name=u"telemetry")
211 except (ConnectionRefusedError, OSError):
212 getLogger("console_stderr").error(u"Could not connect to VPP!")
213 sys.exit(Constants.err_vpp_connect)
215 for command in self.code.splitlines():
216 api_name = u"cli_inband"
217 api_args = dict(cmd=command.format(duration=duration))
218 self.api_command_list.append(
219 dict(api_name=api_name, api_args=deepcopy(api_args))
227 self.obj.disconnect()
228 except (ConnectionRefusedError, OSError):
229 getLogger("console_stderr").error(u"Could not disconnect from VPP!")
230 sys.exit(Constants.err_vpp_disconnect)
232 def fetch_data(self):
234 Fetch data by invoking API calls to VPP socket.
236 for command in self.api_command_list:
238 papi_fn = getattr(self.obj.api, command[u"api_name"])
239 getLogger(__name__).info(command[u"api_args"][u"cmd"])
240 replies = papi_fn(**command[u"api_args"])
241 except (AssertionError, AttributeError, IOError, struct.error):
242 getLogger("console_stderr").error(
243 f"Failed when executing command: "
244 f"{command['api_args']['cmd']}"
246 sys.exit(Constants.err_vpp_execute)
248 if not isinstance(replies, list):
250 for reply in replies:
251 self.api_replies_list.append(reply)
252 reply = sub(r"\x1b[^m]*m", u"", reply.reply)
254 getLogger(__name__).info(reply)
256 getLogger(__name__).info(u"<no reply>")
257 self.serializer.create(metrics=self.metrics)
259 def process_data(self):
261 Post process command reply.
263 for command in zip(self.api_command_list, self.api_replies_list):
264 self_fn = command[0][u"api_args"][u"cmd"].replace(u" ", u"_")
265 self_method_list = [meth for meth in dir(self)
266 if callable(getattr(self, meth)) and
267 meth.startswith('__') is False]
268 if self_fn not in self_method_list:
271 self_fn = getattr(self, self_fn)
272 self_fn(command[1].reply)
273 except AttributeError:
275 except (KeyError, ValueError, TypeError) as exc:
276 getLogger("console_stderr").error(
277 f"Failed when processing data. Error message {exc}"
279 sys.exit(Constants.err_telemetry_process)
281 def show_interface(self, reply):
283 Parse the show interface output.
285 :param reply: API reply.
288 for line in reply.splitlines():
291 if fullmatch(M_INT_BEGIN, line):
292 ifc = fullmatch(M_INT_BEGIN, line).groupdict()
293 metric = ifc[u"counter"].replace(" ", "_").replace("-", "_")
294 item[u"name"] = metric
295 item[u"value"] = ifc[u"count"]
296 if fullmatch(M_INT_CONT, line):
297 ifc_cnt = fullmatch(M_INT_CONT, line).groupdict()
298 metric = ifc_cnt[u"counter"].replace(" ", "_").replace("-", "_")
299 item[u"name"] = metric
300 item[u"value"] = ifc_cnt[u"count"]
301 if fullmatch(M_INT_BEGIN, line) or fullmatch(M_INT_CONT, line):
302 labels[u"name"] = ifc[u"name"]
303 labels[u"index"] = ifc[u"index"]
304 item[u"labels"] = labels
305 self.serializer.serialize(
306 metric=metric, labels=labels, item=item
309 def show_runtime(self, reply):
311 Parse the show runtime output.
313 :param reply: API reply.
316 for line in reply.splitlines():
317 if fullmatch(M_RUN_THREAD, line):
318 thread = fullmatch(M_RUN_THREAD, line).groupdict()
319 if fullmatch(M_RUN_NODES, line):
320 nodes = fullmatch(M_RUN_NODES, line).groupdict()
321 for metric in self.serializer.metric_registry:
324 item[u"name"] = metric
325 labels[u"name"] = nodes[u"name"]
326 labels[u"state"] = nodes[u"state"]
328 labels[u"thread_name"] = thread[u"thread_name"]
329 labels[u"thread_id"] = thread[u"thread_id"]
330 labels[u"thread_lcore"] = thread[u"thread_lcore"]
331 except UnboundLocalError:
332 labels[u"thread_name"] = u"vpp_main"
333 labels[u"thread_id"] = u"0"
334 labels[u"thread_lcore"] = u"0"
335 item[u"labels"] = labels
336 item[u"value"] = nodes[metric]
337 self.serializer.serialize(
338 metric=metric, labels=labels, item=item
341 def show_node_counters_verbose(self, reply):
343 Parse the show node conuter output.
345 :param reply: API reply.
348 for line in reply.splitlines():
349 if fullmatch(M_NODE_COUNTERS_THREAD, line):
350 thread = fullmatch(M_NODE_COUNTERS_THREAD, line).groupdict()
351 if fullmatch(M_NODE_COUNTERS, line):
352 nodes = fullmatch(M_NODE_COUNTERS, line).groupdict()
353 for metric in self.serializer.metric_registry_registry:
356 item[u"name"] = metric
357 labels[u"name"] = nodes[u"name"]
358 labels[u"reason"] = nodes[u"reason"]
359 labels[u"severity"] = nodes[u"severity"]
361 labels[u"thread_name"] = thread[u"thread_name"]
362 labels[u"thread_id"] = thread[u"thread_id"]
363 except UnboundLocalError:
364 labels[u"thread_name"] = u"vpp_main"
365 labels[u"thread_id"] = u"0"
366 item[u"labels"] = labels
367 item[u"value"] = nodes[u"count"]
368 self.serializer.serialize(
369 metric=metric, labels=labels, item=item
372 def show_perfmon_statistics(self, reply):
374 Parse the permon output.
376 :param reply: API reply.
379 def perfmon_threads(reply, regex_threads):
380 for line in reply.splitlines():
381 if fullmatch(regex_threads, line):
382 threads = fullmatch(regex_threads, line).groupdict()
383 for metric in self.serializer.metric_registry:
386 item[u"name"] = metric
387 labels[u"name"] = threads[u"thread_name"]
388 labels[u"id"] = threads[u"thread_id"]
389 item[u"labels"] = labels
390 item[u"value"] = threads[metric]
391 self.serializer.serialize(
392 metric=metric, labels=labels, item=item
395 def perfmon_nodes(reply, regex_threads, regex_nodes):
396 for line in reply.splitlines():
397 if fullmatch(regex_threads, line):
398 thread = fullmatch(regex_threads, line).groupdict()
399 if fullmatch(regex_nodes, line):
400 node = fullmatch(regex_nodes, line).groupdict()
401 for metric in self.serializer.metric_registry:
404 item[u"name"] = metric
405 labels[u"name"] = node[u"node_name"]
406 labels[u"thread_name"] = thread[u"thread_name"]
407 labels[u"thread_id"] = thread[u"thread_id"]
408 item[u"labels"] = labels
409 item[u"value"] = node[metric]
410 self.serializer.serialize(
411 metric=metric, labels=labels, item=item
414 def perfmon_system(reply, regex_line):
415 for line in reply.splitlines():
416 if fullmatch(regex_line, line):
417 name = fullmatch(regex_line, line).groupdict()
418 for metric in self.serializer.metric_registry:
421 item[u"name"] = metric
422 labels[u"name"] = name[u"name"]
423 item[u"labels"] = labels
424 item[u"value"] = name[metric]
425 self.serializer.serialize(
426 metric=metric, labels=labels, item=item
429 reply = sub(r"\x1b[^m]*m", u"", reply)
431 if fullmatch(M_PMB_CS_HEADER, reply.splitlines()[0]):
432 perfmon_threads(reply, M_PMB_CS)
433 if fullmatch(M_PMB_PF_HEADER, reply.splitlines()[0]):
434 perfmon_threads(reply, M_PMB_PF)
435 if fullmatch(M_PMB_IC_HEADER, reply.splitlines()[0]):
436 perfmon_nodes(reply, M_PMB_THREAD, M_PMB_IC_NODE)
437 if fullmatch(M_PMB_CM_HEADER, reply.splitlines()[0]):
438 perfmon_nodes(reply, M_PMB_THREAD, M_PMB_CM_NODE)
439 if fullmatch(M_PMB_LO_HEADER, reply.splitlines()[0]):
440 perfmon_nodes(reply, M_PMB_THREAD, M_PMB_LO_NODE)
441 if fullmatch(M_PMB_BM_HEADER, reply.splitlines()[0]):
442 perfmon_nodes(reply, M_PMB_THREAD, M_PMB_BM_NODE)
443 if fullmatch(M_PMB_PL_HEADER, reply.splitlines()[0]):
444 perfmon_nodes(reply, M_PMB_THREAD, M_PMB_PL_NODE)
445 if fullmatch(M_PMB_MB_HEADER, reply.splitlines()[0]):
446 perfmon_system(reply, M_PMB_MB)
448 def show_version(self, reply):
450 Parse the version output.
452 :param reply: API reply.
455 for metric in self.serializer.metric_registry:
456 version = reply.split()[1]
459 item[u"name"] = metric
460 labels[u"version"] = version
461 item[u"labels"] = labels
463 self.serializer.serialize(
464 metric=metric, labels=labels, item=item