3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 This module implement Python access to the VPP statistics segment. It
19 accesses the data structures directly in shared memory.
20 VPP uses optimistic locking, so data structures may change underneath
21 us while we are reading. Data is copied out and it's important to
22 spend as little time as possible "holding the lock".
24 Counters are stored in VPP as a two dimensional array.
25 Index by thread and index (typically sw_if_index).
26 Simple counters count only packets, Combined counters count packets
29 Counters can be accessed in either dimension.
30 stat['/if/rx'] - returns 2D lists
31 stat['/if/rx'][0] - returns counters for all interfaces for thread 0
32 stat['/if/rx'][0][1] - returns counter for interface 1 on thread 0
33 stat['/if/rx'][0][1]['packets'] - returns the packet counter
34 for interface 1 on thread 0
35 stat['/if/rx'][:, 1] - returns the counters for interface 1 on all threads
36 stat['/if/rx'][:, 1].packets() - returns the packet counters for
37 interface 1 on all threads
38 stat['/if/rx'][:, 1].sum_packets() - returns the sum of packet counters for
39 interface 1 on all threads
40 stat['/if/rx-miss'][:, 1].sum() - returns the sum of packet counters for
41 interface 1 on all threads for simple counters
48 from struct import Struct
54 '''Get file descriptor for memory map'''
55 fds = array.array("i") # Array of ints
56 _, ancdata, _, _ = sock.recvmsg(0, socket.CMSG_LEN(4))
57 for cmsg_level, cmsg_type, cmsg_data in ancdata:
58 if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS:
59 fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
62 VEC_LEN_FMT = Struct('I')
63 def get_vec_len(stats, vector_offset):
64 '''Equivalent to VPP vec_len()'''
65 return VEC_LEN_FMT.unpack_from(stats.statseg, vector_offset - 8)[0]
67 def get_string(stats, ptr):
68 '''Get a string from a VPP vector'''
69 namevector = ptr - stats.base
70 namevectorlen = get_vec_len(stats, namevector)
71 if namevector + namevectorlen >= stats.size:
72 raise IOError('String overruns stats segment')
73 return stats.statseg[namevector:namevector+namevectorlen-1].decode('ascii')
77 '''A class representing a VPP vector'''
79 def __init__(self, stats, ptr, fmt):
80 self.vec_start = ptr - stats.base
81 self.vec_len = get_vec_len(stats, ptr - stats.base)
82 self.struct = Struct(fmt)
83 self.fmtlen = len(fmt)
84 self.elementsize = self.struct.size
85 self.statseg = stats.statseg
88 if self.vec_start + self.vec_len * self.elementsize >= stats.size:
89 raise IOError('Vector overruns stats segment')
93 return self.struct.iter_unpack(self.statseg[self.vec_start:self.vec_start +
94 self.elementsize*self.vec_len])
96 def __getitem__(self, index):
97 if index > self.vec_len:
98 raise IOError('Index beyond end of vector')
101 return self.struct.unpack_from(self.statseg, self.vec_start +
102 (index * self.elementsize))[0]
103 return self.struct.unpack_from(self.statseg, self.vec_start +
104 (index * self.elementsize))
107 '''Main class implementing Python access to the VPP statistics segment'''
108 # pylint: disable=too-many-instance-attributes
109 shared_headerfmt = Struct('QPQQPP')
110 default_socketname = '/run/vpp/stats.sock'
112 def __init__(self, socketname=default_socketname, timeout=10):
113 self.socketname = socketname
114 self.timeout = timeout
116 self.lock = StatsLock(self)
117 self.connected = False
120 self.error_vectors = 0
124 '''Connect to stats segment'''
127 sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
128 sock.connect(self.socketname)
133 stat_result = os.fstat(mfd)
134 self.statseg = mmap.mmap(mfd, stat_result.st_size, mmap.PROT_READ, mmap.MAP_SHARED)
137 self.size = stat_result.st_size
138 if self.version != 2:
139 raise Exception('Incompatbile stat segment version {}'
140 .format(self.version))
143 self.connected = True
145 def disconnect(self):
146 '''Disconnect from stats segment'''
149 self.connected = False
153 '''Get version of stats segment'''
154 return self.shared_headerfmt.unpack_from(self.statseg)[0]
158 '''Get base pointer of stats segment'''
159 return self.shared_headerfmt.unpack_from(self.statseg)[1]
163 '''Get current epoch value from stats segment'''
164 return self.shared_headerfmt.unpack_from(self.statseg)[2]
167 def in_progress(self):
168 '''Get value of in_progress from stats segment'''
169 return self.shared_headerfmt.unpack_from(self.statseg)[3]
172 def directory_vector(self):
173 '''Get pointer of directory vector'''
174 return self.shared_headerfmt.unpack_from(self.statseg)[4]
177 def error_vector(self):
178 '''Get pointer of error vector'''
179 return self.shared_headerfmt.unpack_from(self.statseg)[5]
181 elementfmt = 'IQ128s'
183 def refresh(self, blocking=True):
184 '''Refresh directory vector cache (epoch changed)'''
186 directory_by_idx = {}
190 self.last_epoch = self.epoch
191 for i, direntry in enumerate(StatsVector(self, self.directory_vector, self.elementfmt)):
192 path_raw = direntry[2].find(b'\x00')
193 path = direntry[2][:path_raw].decode('ascii')
194 directory[path] = StatsEntry(direntry[0], direntry[1])
195 directory_by_idx[i] = path
196 self.directory = directory
197 self.directory_by_idx = directory_by_idx
199 # Cache the error index vectors
200 self.error_vectors = []
201 for threads in StatsVector(self, self.error_vector, 'P'):
202 self.error_vectors.append(StatsVector(self, threads[0], 'Q'))
208 def __getitem__(self, item, blocking=True):
209 if not self.connected:
213 if self.last_epoch != self.epoch:
214 self.refresh(blocking)
216 return self.directory[item].get_counter(self)
222 return iter(self.directory.items())
224 def set_errors(self, blocking=True):
225 '''Return dictionary of error counters > 0'''
226 if not self.connected:
229 errors = {k:v for k, v in self.directory.items() if k.startswith("/err/")}
233 if self.last_epoch != self.epoch:
234 self.refresh(blocking)
236 for k, entry in errors.items():
239 for per_thread in self.error_vectors:
240 total += per_thread[i]
248 def set_errors_str(self, blocking=True):
249 '''Return all errors counters > 0 pretty printed'''
250 error_string = ['ERRORS:']
251 error_counters = self.set_errors(blocking)
252 for k in sorted(error_counters):
253 error_string.append('{:<60}{:>10}'.format(k, error_counters[k]))
254 return '%s\n' % '\n'.join(error_string)
256 def get_counter(self, name, blocking=True):
257 '''Alternative call to __getitem__'''
258 return self.__getitem__(name, blocking)
260 def get_err_counter(self, name, blocking=True):
261 '''Return a single value (sum of all threads)'''
262 if not self.connected:
264 if name.startswith("/err/"):
267 if self.last_epoch != self.epoch:
268 self.refresh(blocking)
270 return sum(self.directory[name].get_counter(self))
275 def ls(self, patterns):
276 '''Returns list of counters matching pattern'''
277 # pylint: disable=invalid-name
278 if not self.connected:
280 if not isinstance(patterns, list):
281 patterns = [patterns]
282 regex = [re.compile(i) for i in patterns]
283 if self.last_epoch != self.epoch:
286 return [k for k, v in self.directory.items()
287 if any(re.match(pattern, k) for pattern in regex)]
289 def dump(self, counters, blocking=True):
290 '''Given a list of counters return a dictionary of results'''
291 if not self.connected:
295 result[cnt] = self.__getitem__(cnt,blocking)
299 '''Stat segment optimistic locking'''
301 def __init__(self, stats):
306 acquired = self.acquire(blocking=True)
307 assert acquired, "Lock wasn't acquired, but blocking=True"
310 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
313 def acquire(self, blocking=True, timeout=-1):
314 '''Acquire the lock. Await in progress to go false. Record epoch.'''
315 self.epoch = self.stats.epoch
317 start = time.monotonic()
318 while self.stats.in_progress:
322 if start + time.monotonic() > timeout:
327 '''Check if data read while locked is valid'''
328 if self.stats.in_progress or self.stats.epoch != self.epoch:
329 raise IOError('Optimistic lock failed, retry')
335 class StatsCombinedList(list):
336 '''Column slicing for Combined counters list'''
338 def __getitem__(self, item):
339 '''Supports partial numpy style 2d support. Slice by column [:,1]'''
340 if isinstance(item, int):
341 return list.__getitem__(self, item)
342 return CombinedList([row[item[1]] for row in self])
344 class CombinedList(list):
345 '''Combined Counters 2-dimensional by thread by index of packets/octets'''
348 '''Return column (2nd dimension). Packets for all threads'''
349 return [pair[0] for pair in self]
352 '''Return column (2nd dimension). Octets for all threads'''
353 return [pair[1] for pair in self]
355 def sum_packets(self):
356 '''Return column (2nd dimension). Sum of all packets for all threads'''
357 return sum(self.packets())
359 def sum_octets(self):
360 '''Return column (2nd dimension). Sum of all octets for all threads'''
361 return sum(self.octets())
363 class StatsTuple(tuple):
364 '''A Combined vector tuple (packets, octets)'''
365 def __init__(self, data):
366 self.dictionary = {'packets': data[0], 'bytes': data[1]}
370 return dict.__repr__(self.dictionary)
372 def __getitem__(self, item):
373 if isinstance(item, int):
374 return tuple.__getitem__(self, item)
375 if item == 'packets':
376 return tuple.__getitem__(self, 0)
377 return tuple.__getitem__(self, 1)
379 class StatsSimpleList(list):
380 '''Simple Counters 2-dimensional by thread by index of packets'''
382 def __getitem__(self, item):
383 '''Supports partial numpy style 2d support. Slice by column [:,1]'''
384 if isinstance(item, int):
385 return list.__getitem__(self, item)
386 return SimpleList([row[item[1]] for row in self])
388 class SimpleList(list):
396 '''An individual stats entry'''
397 # pylint: disable=unused-argument,no-self-use
399 def __init__(self, stattype, statvalue):
401 self.value = statvalue
404 self.function = self.scalar
406 self.function = self.simple
408 self.function = self.combined
410 self.function = self.error
412 self.function = self.name
414 self.function = self.symlink
416 self.function = self.illegal
418 def illegal(self, stats):
419 '''Invalid or unknown counter type'''
422 def scalar(self, stats):
426 def simple(self, stats):
428 counter = StatsSimpleList()
429 for threads in StatsVector(stats, self.value, 'P'):
430 clist = [v[0] for v in StatsVector(stats, threads[0], 'Q')]
431 counter.append(clist)
434 def combined(self, stats):
435 '''Combined counter'''
436 counter = StatsCombinedList()
437 for threads in StatsVector(stats, self.value, 'P'):
438 clist = [StatsTuple(cnt) for cnt in StatsVector(stats, threads[0], 'QQ')]
439 counter.append(clist)
442 def error(self, stats):
444 counter = SimpleList()
445 for clist in stats.error_vectors:
446 counter.append(clist[self.value])
449 def name(self, stats):
452 for name in StatsVector(stats, self.value, 'P'):
454 counter.append(get_string(stats, name[0]))
457 SYMLINK_FMT1 = Struct('II')
458 SYMLINK_FMT2 = Struct('Q')
459 def symlink(self, stats):
460 '''Symlink counter'''
461 b = self.SYMLINK_FMT2.pack(self.value)
462 index1, index2 = self.SYMLINK_FMT1.unpack(b)
463 name = stats.directory_by_idx[index1]
464 return stats[name][:,index2]
466 def get_counter(self, stats):
467 '''Return a list of counters'''
469 return self.function(stats)
471 class TestStats(unittest.TestCase):
472 '''Basic statseg tests'''
475 '''Connect to statseg'''
476 self.stat = VPPStats()
478 self.profile = cProfile.Profile()
479 self.profile.enable()
482 '''Disconnect from statseg'''
483 self.stat.disconnect()
484 profile = Stats(self.profile)
486 profile.sort_stats('cumtime')
487 profile.print_stats()
490 def test_counters(self):
491 '''Test access to statseg'''
493 print('/err/abf-input-ip4/missed', self.stat['/err/abf-input-ip4/missed'])
494 print('/sys/heartbeat', self.stat['/sys/heartbeat'])
495 print('/if/names', self.stat['/if/names'])
496 print('/if/rx-miss', self.stat['/if/rx-miss'])
497 print('/if/rx-miss', self.stat['/if/rx-miss'][1])
498 print('/nat44-ed/out2in/slowpath/drops', self.stat['/nat44-ed/out2in/slowpath/drops'])
499 print('Set Errors', self.stat.set_errors())
500 with self.assertRaises(KeyError):
501 print('NO SUCH COUNTER', self.stat['foobar'])
502 print('/if/rx', self.stat.get_counter('/if/rx'))
503 print('/err/ethernet-input/no error',
504 self.stat.get_err_counter('/err/ethernet-input/no error'))
506 def test_column(self):
507 '''Test column slicing'''
509 print('/if/rx-miss', self.stat['/if/rx-miss'])
510 print('/if/rx', self.stat['/if/rx']) # All interfaces for thread #1
511 print('/if/rx thread #1', self.stat['/if/rx'][0]) # All interfaces for thread #1
512 print('/if/rx thread #1, interface #1',
513 self.stat['/if/rx'][0][1]) # All interfaces for thread #1
514 print('/if/rx if_index #1', self.stat['/if/rx'][:, 1])
515 print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].packets())
516 print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].sum_packets())
517 print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].octets())
518 print('/if/rx-miss', self.stat['/if/rx-miss'])
519 print('/if/rx-miss if_index #1 packets', self.stat['/if/rx-miss'][:, 1].sum())
520 print('/if/rx if_index #1 packets', self.stat['/if/rx'][0][1]['packets'])
522 def test_error(self):
523 '''Test the error vector'''
525 print('/err/ethernet-input', self.stat['/err/ethernet-input/no error'])
526 print('/err/nat44-ei-ha/pkts-processed', self.stat['/err/nat44-ei-ha/pkts-processed'])
527 print('/err/ethernet-input', self.stat.get_err_counter('/err/ethernet-input/no error'))
528 print('/err/ethernet-input', self.stat['/err/ethernet-input/no error'].sum())
530 def test_nat44(self):
531 '''Test the nat counters'''
533 print('/nat44-ei/ha/del-event-recv', self.stat['/nat44-ei/ha/del-event-recv'])
534 print('/err/nat44-ei-ha/pkts-processed', self.stat['/err/nat44-ei-ha/pkts-processed'].sum())
536 def test_legacy(self):
537 '''Legacy interface'''
538 directory = self.stat.ls(["^/if", "/err/ip4-input", "/sys/node/ip4-input"])
539 data = self.stat.dump(directory)
541 print('Looking up sys node')
542 directory = self.stat.ls(["^/sys/node"])
543 print('Dumping sys node')
544 data = self.stat.dump(directory)
546 directory = self.stat.ls(["^/foobar"])
547 data = self.stat.dump(directory)
550 def test_sys_nodes(self):
551 '''Test /sys/nodes'''
552 counters = self.stat.ls('^/sys/node')
553 print('COUNTERS:', counters)
554 print('/sys/node', self.stat.dump(counters))
555 print('/net/route/to', self.stat['/net/route/to'])
557 def test_symlink(self):
559 print('/interface/local0/rx', self.stat['/interfaces/local0/rx'])
560 print('/sys/nodes/unix-epoll-input', self.stat['/nodes/unix-epoll-input/calls'])
562 if __name__ == '__main__':
564 from pstats import Stats