3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 This module implement Python access to the VPP statistics segment. It
19 accesses the data structures directly in shared memory.
20 VPP uses optimistic locking, so data structures may change underneath
21 us while we are reading. Data is copied out and it's important to
22 spend as little time as possible "holding the lock".
24 Counters are stored in VPP as a two dimensional array.
25 Index by thread and index (typically sw_if_index).
26 Simple counters count only packets, Combined counters count packets
29 Counters can be accessed in either dimension.
30 stat['/if/rx'] - returns 2D lists
31 stat['/if/rx'][0] - returns counters for all interfaces for thread 0
32 stat['/if/rx'][0][1] - returns counter for interface 1 on thread 0
33 stat['/if/rx'][0][1]['packets'] - returns the packet counter
34 for interface 1 on thread 0
35 stat['/if/rx'][:, 1] - returns the counters for interface 1 on all threads
36 stat['/if/rx'][:, 1].packets() - returns the packet counters for
37 interface 1 on all threads
38 stat['/if/rx'][:, 1].sum_packets() - returns the sum of packet counters for
39 interface 1 on all threads
40 stat['/if/rx-miss'][:, 1].sum() - returns the sum of packet counters for
41 interface 1 on all threads for simple counters
48 from struct import Struct
54 '''Get file descriptor for memory map'''
55 fds = array.array("i") # Array of ints
56 _, ancdata, _, _ = sock.recvmsg(0, socket.CMSG_LEN(4))
57 for cmsg_level, cmsg_type, cmsg_data in ancdata:
58 if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS:
59 fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
62 VEC_LEN_FMT = Struct('I')
63 def get_vec_len(stats, vector_offset):
64 '''Equivalent to VPP vec_len()'''
65 return VEC_LEN_FMT.unpack_from(stats.statseg, vector_offset - 8)[0]
67 def get_string(stats, ptr):
68 '''Get a string from a VPP vector'''
69 namevector = ptr - stats.base
70 namevectorlen = get_vec_len(stats, namevector)
71 if namevector + namevectorlen >= stats.size:
72 raise ValueError('String overruns stats segment')
73 return stats.statseg[namevector:namevector+namevectorlen-1].decode('ascii')
77 '''A class representing a VPP vector'''
79 def __init__(self, stats, ptr, fmt):
80 self.vec_start = ptr - stats.base
81 self.vec_len = get_vec_len(stats, ptr - stats.base)
82 self.struct = Struct(fmt)
83 self.fmtlen = len(fmt)
84 self.elementsize = self.struct.size
85 self.statseg = stats.statseg
88 if self.vec_start + self.vec_len * self.elementsize >= stats.size:
89 raise ValueError('Vector overruns stats segment')
93 return self.struct.iter_unpack(self.statseg[self.vec_start:self.vec_start +
94 self.elementsize*self.vec_len])
96 def __getitem__(self, index):
97 if index > self.vec_len:
98 raise ValueError('Index beyond end of vector')
101 return self.struct.unpack_from(self.statseg, self.vec_start +
102 (index * self.elementsize))[0]
103 return self.struct.unpack_from(self.statseg, self.vec_start +
104 (index * self.elementsize))
107 '''Main class implementing Python access to the VPP statistics segment'''
108 # pylint: disable=too-many-instance-attributes
109 shared_headerfmt = Struct('QPQQPP')
110 default_socketname = '/run/vpp/stats.sock'
112 def __init__(self, socketname=default_socketname, timeout=10):
113 self.socketname = socketname
114 self.timeout = timeout
116 self.lock = StatsLock(self)
117 self.connected = False
120 self.error_vectors = 0
124 '''Connect to stats segment'''
127 sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
128 sock.connect(self.socketname)
133 stat_result = os.fstat(mfd)
134 self.statseg = mmap.mmap(mfd, stat_result.st_size, mmap.PROT_READ, mmap.MAP_SHARED)
137 self.size = stat_result.st_size
138 if self.version != 2:
139 raise Exception('Incompatbile stat segment version {}'
140 .format(self.version))
143 self.connected = True
145 def disconnect(self):
146 '''Disconnect from stats segment'''
149 self.connected = False
153 '''Get version of stats segment'''
154 return self.shared_headerfmt.unpack_from(self.statseg)[0]
158 '''Get base pointer of stats segment'''
159 return self.shared_headerfmt.unpack_from(self.statseg)[1]
163 '''Get current epoch value from stats segment'''
164 return self.shared_headerfmt.unpack_from(self.statseg)[2]
167 def in_progress(self):
168 '''Get value of in_progress from stats segment'''
169 return self.shared_headerfmt.unpack_from(self.statseg)[3]
172 def directory_vector(self):
173 '''Get pointer of directory vector'''
174 return self.shared_headerfmt.unpack_from(self.statseg)[4]
177 def error_vector(self):
178 '''Get pointer of error vector'''
179 return self.shared_headerfmt.unpack_from(self.statseg)[5]
181 elementfmt = 'IQ128s'
183 def refresh(self, blocking=True):
184 '''Refresh directory vector cache (epoch changed)'''
186 directory_by_idx = {}
190 for i, direntry in enumerate(StatsVector(self, self.directory_vector, self.elementfmt)):
191 path_raw = direntry[2].find(b'\x00')
192 path = direntry[2][:path_raw].decode('ascii')
193 directory[path] = StatsEntry(direntry[0], direntry[1])
194 directory_by_idx[i] = path
195 self.last_epoch = self.epoch
196 self.directory = directory
197 self.directory_by_idx = directory_by_idx
199 # Cache the error index vectors
200 self.error_vectors = []
201 for threads in StatsVector(self, self.error_vector, 'P'):
202 self.error_vectors.append(StatsVector(self, threads[0], 'Q'))
208 def __getitem__(self, item, blocking=True):
209 if not self.connected:
213 if self.last_epoch != self.epoch:
214 self.refresh(blocking)
216 return self.directory[item].get_counter(self)
222 return iter(self.directory.items())
224 def set_errors(self, blocking=True):
225 '''Return dictionary of error counters > 0'''
226 if not self.connected:
229 errors = {k:v for k, v in self.directory.items() if k.startswith("/err/")}
233 if self.last_epoch != self.epoch:
234 self.refresh(blocking)
236 for k, entry in errors.items():
239 for per_thread in self.error_vectors:
240 total += per_thread[i]
248 def set_errors_str(self, blocking=True):
249 '''Return all errors counters > 0 pretty printed'''
250 error_string = ['ERRORS:']
251 error_counters = self.set_errors(blocking)
252 for k in sorted(error_counters):
253 error_string.append('{:<60}{:>10}'.format(k, error_counters[k]))
254 return '%s\n' % '\n'.join(error_string)
256 def get_counter(self, name, blocking=True):
257 '''Alternative call to __getitem__'''
258 return self.__getitem__(name, blocking)
260 def get_err_counter(self, name, blocking=True):
261 '''Return a single value (sum of all threads)'''
262 if not self.connected:
264 if name.startswith("/err/"):
267 if self.last_epoch != self.epoch:
268 self.refresh(blocking)
270 return sum(self.directory[name].get_counter(self))
275 def ls(self, patterns):
276 '''Returns list of counters matching pattern'''
277 # pylint: disable=invalid-name
278 if not self.connected:
280 if not isinstance(patterns, list):
281 patterns = [patterns]
282 regex = [re.compile(i) for i in patterns]
283 return [k for k, v in self.directory.items()
284 if any(re.match(pattern, k) for pattern in regex)]
286 def dump(self, counters, blocking=True):
287 '''Given a list of counters return a dictionary of results'''
288 if not self.connected:
292 result[cnt] = self.__getitem__(cnt,blocking)
296 '''Stat segment optimistic locking'''
298 def __init__(self, stats):
303 acquired = self.acquire(blocking=True)
304 assert acquired, "Lock wasn't acquired, but blocking=True"
307 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
310 def acquire(self, blocking=True, timeout=-1):
311 '''Acquire the lock. Await in progress to go false. Record epoch.'''
312 self.epoch = self.stats.epoch
314 start = time.monotonic()
315 while self.stats.in_progress:
319 if start + time.monotonic() > timeout:
324 '''Check if data read while locked is valid'''
325 if self.stats.in_progress or self.stats.epoch != self.epoch:
326 raise IOError('Optimistic lock failed, retry')
332 class StatsCombinedList(list):
333 '''Column slicing for Combined counters list'''
335 def __getitem__(self, item):
336 '''Supports partial numpy style 2d support. Slice by column [:,1]'''
337 if isinstance(item, int):
338 return list.__getitem__(self, item)
339 return CombinedList([row[item[1]] for row in self])
341 class CombinedList(list):
342 '''Combined Counters 2-dimensional by thread by index of packets/octets'''
345 '''Return column (2nd dimension). Packets for all threads'''
346 return [pair[0] for pair in self]
349 '''Return column (2nd dimension). Octets for all threads'''
350 return [pair[1] for pair in self]
352 def sum_packets(self):
353 '''Return column (2nd dimension). Sum of all packets for all threads'''
354 return sum(self.packets())
356 def sum_octets(self):
357 '''Return column (2nd dimension). Sum of all octets for all threads'''
358 return sum(self.octets())
360 class StatsTuple(tuple):
361 '''A Combined vector tuple (packets, octets)'''
362 def __init__(self, data):
363 self.dictionary = {'packets': data[0], 'bytes': data[1]}
367 return dict.__repr__(self.dictionary)
369 def __getitem__(self, item):
370 if isinstance(item, int):
371 return tuple.__getitem__(self, item)
372 if item == 'packets':
373 return tuple.__getitem__(self, 0)
374 return tuple.__getitem__(self, 1)
376 class StatsSimpleList(list):
377 '''Simple Counters 2-dimensional by thread by index of packets'''
379 def __getitem__(self, item):
380 '''Supports partial numpy style 2d support. Slice by column [:,1]'''
381 if isinstance(item, int):
382 return list.__getitem__(self, item)
383 return SimpleList([row[item[1]] for row in self])
385 class SimpleList(list):
393 '''An individual stats entry'''
394 # pylint: disable=unused-argument,no-self-use
396 def __init__(self, stattype, statvalue):
398 self.value = statvalue
401 self.function = self.scalar
403 self.function = self.simple
405 self.function = self.combined
407 self.function = self.error
409 self.function = self.name
411 self.function = self.symlink
413 self.function = self.illegal
415 def illegal(self, stats):
416 '''Invalid or unknown counter type'''
419 def scalar(self, stats):
423 def simple(self, stats):
425 counter = StatsSimpleList()
426 for threads in StatsVector(stats, self.value, 'P'):
427 clist = [v[0] for v in StatsVector(stats, threads[0], 'Q')]
428 counter.append(clist)
431 def combined(self, stats):
432 '''Combined counter'''
433 counter = StatsCombinedList()
434 for threads in StatsVector(stats, self.value, 'P'):
435 clist = [StatsTuple(cnt) for cnt in StatsVector(stats, threads[0], 'QQ')]
436 counter.append(clist)
439 def error(self, stats):
441 counter = SimpleList()
442 for clist in stats.error_vectors:
443 counter.append(clist[self.value])
446 def name(self, stats):
449 for name in StatsVector(stats, self.value, 'P'):
451 counter.append(get_string(stats, name[0]))
454 SYMLINK_FMT1 = Struct('II')
455 SYMLINK_FMT2 = Struct('Q')
456 def symlink(self, stats):
457 '''Symlink counter'''
458 b = self.SYMLINK_FMT2.pack(self.value)
459 index1, index2 = self.SYMLINK_FMT1.unpack(b)
460 name = stats.directory_by_idx[index1]
461 return stats[name][:,index2]
463 def get_counter(self, stats):
464 '''Return a list of counters'''
466 return self.function(stats)
468 class TestStats(unittest.TestCase):
469 '''Basic statseg tests'''
472 '''Connect to statseg'''
473 self.stat = VPPStats()
475 self.profile = cProfile.Profile()
476 self.profile.enable()
479 '''Disconnect from statseg'''
480 self.stat.disconnect()
481 profile = Stats(self.profile)
483 profile.sort_stats('cumtime')
484 profile.print_stats()
487 def test_counters(self):
488 '''Test access to statseg'''
490 print('/err/abf-input-ip4/missed', self.stat['/err/abf-input-ip4/missed'])
491 print('/sys/heartbeat', self.stat['/sys/heartbeat'])
492 print('/if/names', self.stat['/if/names'])
493 print('/if/rx-miss', self.stat['/if/rx-miss'])
494 print('/if/rx-miss', self.stat['/if/rx-miss'][1])
495 print('/nat44-ed/out2in/slowpath/drops', self.stat['/nat44-ed/out2in/slowpath/drops'])
496 print('Set Errors', self.stat.set_errors())
497 with self.assertRaises(KeyError):
498 print('NO SUCH COUNTER', self.stat['foobar'])
499 print('/if/rx', self.stat.get_counter('/if/rx'))
500 print('/err/ethernet-input/no error',
501 self.stat.get_err_counter('/err/ethernet-input/no error'))
503 def test_column(self):
504 '''Test column slicing'''
506 print('/if/rx-miss', self.stat['/if/rx-miss'])
507 print('/if/rx', self.stat['/if/rx']) # All interfaces for thread #1
508 print('/if/rx thread #1', self.stat['/if/rx'][0]) # All interfaces for thread #1
509 print('/if/rx thread #1, interface #1',
510 self.stat['/if/rx'][0][1]) # All interfaces for thread #1
511 print('/if/rx if_index #1', self.stat['/if/rx'][:, 1])
512 print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].packets())
513 print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].sum_packets())
514 print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].octets())
515 print('/if/rx-miss', self.stat['/if/rx-miss'])
516 print('/if/rx-miss if_index #1 packets', self.stat['/if/rx-miss'][:, 1].sum())
517 print('/if/rx if_index #1 packets', self.stat['/if/rx'][0][1]['packets'])
519 def test_error(self):
520 '''Test the error vector'''
522 print('/err/ethernet-input', self.stat['/err/ethernet-input/no error'])
523 print('/err/nat44-ei-ha/pkts-processed', self.stat['/err/nat44-ei-ha/pkts-processed'])
524 print('/err/ethernet-input', self.stat.get_err_counter('/err/ethernet-input/no error'))
525 print('/err/ethernet-input', self.stat['/err/ethernet-input/no error'].sum())
527 def test_nat44(self):
528 '''Test the nat counters'''
530 print('/nat44-ei/ha/del-event-recv', self.stat['/nat44-ei/ha/del-event-recv'])
531 print('/err/nat44-ei-ha/pkts-processed', self.stat['/err/nat44-ei-ha/pkts-processed'].sum())
533 def test_legacy(self):
534 '''Legacy interface'''
535 directory = self.stat.ls(["^/if", "/err/ip4-input", "/sys/node/ip4-input"])
536 data = self.stat.dump(directory)
538 print('Looking up sys node')
539 directory = self.stat.ls(["^/sys/node"])
540 print('Dumping sys node')
541 data = self.stat.dump(directory)
543 directory = self.stat.ls(["^/foobar"])
544 data = self.stat.dump(directory)
547 def test_sys_nodes(self):
548 '''Test /sys/nodes'''
549 counters = self.stat.ls('^/sys/node')
550 print('COUNTERS:', counters)
551 print('/sys/node', self.stat.dump(counters))
552 print('/net/route/to', self.stat['/net/route/to'])
554 def test_symlink(self):
556 print('/interface/local0/rx', self.stat['/interfaces/local0/rx'])
557 print('/sys/nodes/unix-epoll-input', self.stat['/nodes/unix-epoll-input/calls'])
559 if __name__ == '__main__':
561 from pstats import Stats