3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 This module implement Python access to the VPP statistics segment. It
19 accesses the data structures directly in shared memory.
20 VPP uses optimistic locking, so data structures may change underneath
21 us while we are reading. Data is copied out and it's important to
22 spend as little time as possible "holding the lock".
24 Counters are stored in VPP as a two dimensional array.
25 Index by thread and index (typically sw_if_index).
26 Simple counters count only packets, Combined counters count packets
29 Counters can be accessed in either dimension.
30 stat['/if/rx'] - returns 2D lists
31 stat['/if/rx'][0] - returns counters for all interfaces for thread 0
32 stat['/if/rx'][0][1] - returns counter for interface 1 on thread 0
33 stat['/if/rx'][0][1]['packets'] - returns the packet counter
34 for interface 1 on thread 0
35 stat['/if/rx'][:, 1] - returns the counters for interface 1 on all threads
36 stat['/if/rx'][:, 1].packets() - returns the packet counters for
37 interface 1 on all threads
38 stat['/if/rx'][:, 1].sum_packets() - returns the sum of packet counters for
39 interface 1 on all threads
40 stat['/if/rx-miss'][:, 1].sum() - returns the sum of packet counters for
41 interface 1 on all threads for simple counters
48 from struct import Struct
55 """Get file descriptor for memory map"""
56 fds = array.array("i") # Array of ints
57 _, ancdata, _, _ = sock.recvmsg(0, socket.CMSG_LEN(4))
58 for cmsg_level, cmsg_type, cmsg_data in ancdata:
59 if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS:
60 fds.frombytes(cmsg_data[: len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
64 VEC_LEN_FMT = Struct("I")
67 def get_vec_len(stats, vector_offset):
68 """Equivalent to VPP vec_len()"""
69 return VEC_LEN_FMT.unpack_from(stats.statseg, vector_offset - 8)[0]
72 def get_string(stats, ptr):
73 """Get a string from a VPP vector"""
74 namevector = ptr - stats.base
75 namevectorlen = get_vec_len(stats, namevector)
76 if namevector + namevectorlen >= stats.size:
77 raise IOError("String overruns stats segment")
78 return stats.statseg[namevector : namevector + namevectorlen - 1].decode("ascii")
82 """A class representing a VPP vector"""
84 def __init__(self, stats, ptr, fmt):
85 self.vec_start = ptr - stats.base
86 self.vec_len = get_vec_len(stats, ptr - stats.base)
87 self.struct = Struct(fmt)
88 self.fmtlen = len(fmt)
89 self.elementsize = self.struct.size
90 self.statseg = stats.statseg
93 if self.vec_start + self.vec_len * self.elementsize >= stats.size:
94 raise IOError("Vector overruns stats segment")
98 return self.struct.iter_unpack(
100 self.vec_start : self.vec_start + self.elementsize * self.vec_len
104 def __getitem__(self, index):
105 if index > self.vec_len:
106 raise IOError("Index beyond end of vector")
107 with self.stats.lock:
109 return self.struct.unpack_from(
110 self.statseg, self.vec_start + (index * self.elementsize)
112 return self.struct.unpack_from(
113 self.statseg, self.vec_start + (index * self.elementsize)
118 """Main class implementing Python access to the VPP statistics segment"""
120 # pylint: disable=too-many-instance-attributes
121 shared_headerfmt = Struct("QPQQPP")
122 default_socketname = "/run/vpp/stats.sock"
124 def __init__(self, socketname=default_socketname, timeout=10):
125 self.socketname = socketname
126 self.timeout = timeout
128 self.lock = StatsLock(self)
129 self.connected = False
135 """Connect to stats segment"""
138 sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
139 sock.connect(self.socketname)
144 stat_result = os.fstat(mfd)
145 self.statseg = mmap.mmap(
146 mfd, stat_result.st_size, mmap.PROT_READ, mmap.MAP_SHARED
150 self.size = stat_result.st_size
151 if self.version != 2:
152 raise Exception("Incompatbile stat segment version {}".format(self.version))
155 self.connected = True
157 def disconnect(self):
158 """Disconnect from stats segment"""
161 self.connected = False
165 """Get version of stats segment"""
166 return self.shared_headerfmt.unpack_from(self.statseg)[0]
170 """Get base pointer of stats segment"""
171 return self.shared_headerfmt.unpack_from(self.statseg)[1]
175 """Get current epoch value from stats segment"""
176 return self.shared_headerfmt.unpack_from(self.statseg)[2]
179 def in_progress(self):
180 """Get value of in_progress from stats segment"""
181 return self.shared_headerfmt.unpack_from(self.statseg)[3]
184 def directory_vector(self):
185 """Get pointer of directory vector"""
186 return self.shared_headerfmt.unpack_from(self.statseg)[4]
188 elementfmt = "IQ128s"
190 def refresh(self, blocking=True):
191 """Refresh directory vector cache (epoch changed)"""
193 directory_by_idx = {}
197 self.last_epoch = self.epoch
198 for i, direntry in enumerate(
199 StatsVector(self, self.directory_vector, self.elementfmt)
201 path_raw = direntry[2].find(b"\x00")
202 path = direntry[2][:path_raw].decode("ascii")
203 directory[path] = StatsEntry(direntry[0], direntry[1])
204 directory_by_idx[i] = path
205 self.directory = directory
206 self.directory_by_idx = directory_by_idx
212 def __getitem__(self, item, blocking=True):
213 if not self.connected:
217 if self.last_epoch != self.epoch:
218 self.refresh(blocking)
220 return self.directory[item].get_counter(self)
226 return iter(self.directory.items())
228 def set_errors(self, blocking=True):
229 """Return dictionary of error counters > 0"""
230 if not self.connected:
233 errors = {k: v for k, v in self.directory.items() if k.startswith("/err/")}
237 total = self[k].sum()
244 def set_errors_str(self, blocking=True):
245 """Return all errors counters > 0 pretty printed"""
246 error_string = ["ERRORS:"]
247 error_counters = self.set_errors(blocking)
248 for k in sorted(error_counters):
249 error_string.append("{:<60}{:>10}".format(k, error_counters[k]))
250 return "%s\n" % "\n".join(error_string)
252 def get_counter(self, name, blocking=True):
253 """Alternative call to __getitem__"""
254 return self.__getitem__(name, blocking)
256 def get_err_counter(self, name, blocking=True):
257 """Alternative call to __getitem__"""
258 return self.__getitem__(name, blocking).sum()
260 def ls(self, patterns):
261 """Returns list of counters matching pattern"""
262 # pylint: disable=invalid-name
263 if not self.connected:
265 if not isinstance(patterns, list):
266 patterns = [patterns]
267 regex = [re.compile(i) for i in patterns]
268 if self.last_epoch != self.epoch:
273 for k, v in self.directory.items()
274 if any(re.match(pattern, k) for pattern in regex)
277 def dump(self, counters, blocking=True):
278 """Given a list of counters return a dictionary of results"""
279 if not self.connected:
283 result[cnt] = self.__getitem__(cnt, blocking)
288 """Stat segment optimistic locking"""
290 def __init__(self, stats):
295 acquired = self.acquire(blocking=True)
296 assert acquired, "Lock wasn't acquired, but blocking=True"
299 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
302 def acquire(self, blocking=True, timeout=-1):
303 """Acquire the lock. Await in progress to go false. Record epoch."""
304 self.epoch = self.stats.epoch
306 start = time.monotonic()
307 while self.stats.in_progress:
311 if start + time.monotonic() > timeout:
316 """Check if data read while locked is valid"""
317 if self.stats.in_progress or self.stats.epoch != self.epoch:
318 raise IOError("Optimistic lock failed, retry")
324 class StatsCombinedList(list):
325 """Column slicing for Combined counters list"""
327 def __getitem__(self, item):
328 """Supports partial numpy style 2d support. Slice by column [:,1]"""
329 if isinstance(item, int):
330 return list.__getitem__(self, item)
331 return CombinedList([row[item[1]] for row in self])
334 class CombinedList(list):
335 """Combined Counters 2-dimensional by thread by index of packets/octets"""
338 """Return column (2nd dimension). Packets for all threads"""
339 return [pair[0] for pair in self]
342 """Return column (2nd dimension). Octets for all threads"""
343 return [pair[1] for pair in self]
345 def sum_packets(self):
346 """Return column (2nd dimension). Sum of all packets for all threads"""
347 return sum(self.packets())
349 def sum_octets(self):
350 """Return column (2nd dimension). Sum of all octets for all threads"""
351 return sum(self.octets())
354 class StatsTuple(tuple):
355 """A Combined vector tuple (packets, octets)"""
357 def __init__(self, data):
358 self.dictionary = {"packets": data[0], "bytes": data[1]}
362 return dict.__repr__(self.dictionary)
364 def __getitem__(self, item):
365 if isinstance(item, int):
366 return tuple.__getitem__(self, item)
367 if item == "packets":
368 return tuple.__getitem__(self, 0)
369 return tuple.__getitem__(self, 1)
372 class StatsSimpleList(list):
373 """Simple Counters 2-dimensional by thread by index of packets"""
375 def __getitem__(self, item):
376 """Supports partial numpy style 2d support. Slice by column [:,1]"""
377 if isinstance(item, int):
378 return list.__getitem__(self, item)
379 return SimpleList([row[item[1]] for row in self])
382 class SimpleList(list):
391 """An individual stats entry"""
393 # pylint: disable=unused-argument,no-self-use
395 def __init__(self, stattype, statvalue):
397 self.value = statvalue
400 self.function = self.scalar
402 self.function = self.simple
404 self.function = self.combined
406 self.function = self.name
408 self.function = self.symlink
410 self.function = self.illegal
412 def illegal(self, stats):
413 """Invalid or unknown counter type"""
416 def scalar(self, stats):
420 def simple(self, stats):
422 counter = StatsSimpleList()
423 for threads in StatsVector(stats, self.value, "P"):
424 clist = [v[0] for v in StatsVector(stats, threads[0], "Q")]
425 counter.append(clist)
428 def combined(self, stats):
429 """Combined counter"""
430 counter = StatsCombinedList()
431 for threads in StatsVector(stats, self.value, "P"):
432 clist = [StatsTuple(cnt) for cnt in StatsVector(stats, threads[0], "QQ")]
433 counter.append(clist)
436 def name(self, stats):
439 for name in StatsVector(stats, self.value, "P"):
441 counter.append(get_string(stats, name[0]))
444 SYMLINK_FMT1 = Struct("II")
445 SYMLINK_FMT2 = Struct("Q")
447 def symlink(self, stats):
448 """Symlink counter"""
449 b = self.SYMLINK_FMT2.pack(self.value)
450 index1, index2 = self.SYMLINK_FMT1.unpack(b)
451 name = stats.directory_by_idx[index1]
452 return stats[name][:, index2]
454 def get_counter(self, stats):
455 """Return a list of counters"""
457 return self.function(stats)
460 class TestStats(unittest.TestCase):
461 """Basic statseg tests"""
464 """Connect to statseg"""
465 self.stat = VPPStats()
467 self.profile = cProfile.Profile()
468 self.profile.enable()
471 """Disconnect from statseg"""
472 self.stat.disconnect()
473 profile = Stats(self.profile)
475 profile.sort_stats("cumtime")
476 profile.print_stats()
479 def test_counters(self):
480 """Test access to statseg"""
482 print("/err/abf-input-ip4/missed", self.stat["/err/abf-input-ip4/missed"])
483 print("/sys/heartbeat", self.stat["/sys/heartbeat"])
484 print("/if/names", self.stat["/if/names"])
485 print("/if/rx-miss", self.stat["/if/rx-miss"])
486 print("/if/rx-miss", self.stat["/if/rx-miss"][1])
488 "/nat44-ed/out2in/slowpath/drops",
489 self.stat["/nat44-ed/out2in/slowpath/drops"],
491 with self.assertRaises(KeyError):
492 print("NO SUCH COUNTER", self.stat["foobar"])
493 print("/if/rx", self.stat.get_counter("/if/rx"))
495 "/err/ethernet-input/no_error",
496 self.stat.get_counter("/err/ethernet-input/no_error"),
499 def test_column(self):
500 """Test column slicing"""
502 print("/if/rx-miss", self.stat["/if/rx-miss"])
503 print("/if/rx", self.stat["/if/rx"]) # All interfaces for thread #1
505 "/if/rx thread #1", self.stat["/if/rx"][0]
506 ) # All interfaces for thread #1
508 "/if/rx thread #1, interface #1", self.stat["/if/rx"][0][1]
509 ) # All interfaces for thread #1
510 print("/if/rx if_index #1", self.stat["/if/rx"][:, 1])
511 print("/if/rx if_index #1 packets", self.stat["/if/rx"][:, 1].packets())
512 print("/if/rx if_index #1 packets", self.stat["/if/rx"][:, 1].sum_packets())
513 print("/if/rx if_index #1 packets", self.stat["/if/rx"][:, 1].octets())
514 print("/if/rx-miss", self.stat["/if/rx-miss"])
515 print("/if/rx-miss if_index #1 packets", self.stat["/if/rx-miss"][:, 1].sum())
516 print("/if/rx if_index #1 packets", self.stat["/if/rx"][0][1]["packets"])
518 def test_nat44(self):
519 """Test the nat counters"""
521 print("/nat44-ei/ha/del-event-recv", self.stat["/nat44-ei/ha/del-event-recv"])
523 "/err/nat44-ei-ha/pkts-processed",
524 self.stat["/err/nat44-ei-ha/pkts-processed"].sum(),
527 def test_legacy(self):
528 """Legacy interface"""
529 directory = self.stat.ls(["^/if", "/err/ip4-input", "/sys/node/ip4-input"])
530 data = self.stat.dump(directory)
532 print("Looking up sys node")
533 directory = self.stat.ls(["^/sys/node"])
534 print("Dumping sys node")
535 data = self.stat.dump(directory)
537 directory = self.stat.ls(["^/foobar"])
538 data = self.stat.dump(directory)
541 def test_sys_nodes(self):
542 """Test /sys/nodes"""
543 counters = self.stat.ls("^/sys/node")
544 print("COUNTERS:", counters)
545 print("/sys/node", self.stat.dump(counters))
546 print("/net/route/to", self.stat["/net/route/to"])
548 def test_symlink(self):
550 print("/interface/local0/rx", self.stat["/interfaces/local0/rx"])
551 print("/sys/nodes/unix-epoll-input", self.stat["/nodes/unix-epoll-input/calls"])
554 if __name__ == "__main__":
556 from pstats import Stats