3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 This module implement Python access to the VPP statistics segment. It
19 accesses the data structures directly in shared memory.
20 VPP uses optimistic locking, so data structures may change underneath
21 us while we are reading. Data is copied out and it's important to
22 spend as little time as possible "holding the lock".
24 Counters are stored in VPP as a two dimensional array.
25 Index by thread and index (typically sw_if_index).
26 Simple counters count only packets, Combined counters count packets
29 Counters can be accessed in either dimension.
30 stat['/if/rx'] - returns 2D lists
31 stat['/if/rx'][0] - returns counters for all interfaces for thread 0
32 stat['/if/rx'][0][1] - returns counter for interface 1 on thread 0
33 stat['/if/rx'][0][1]['packets'] - returns the packet counter
34 for interface 1 on thread 0
35 stat['/if/rx'][:, 1] - returns the counters for interface 1 on all threads
36 stat['/if/rx'][:, 1].packets() - returns the packet counters for
37 interface 1 on all threads
38 stat['/if/rx'][:, 1].sum_packets() - returns the sum of packet counters for
39 interface 1 on all threads
40 stat['/if/rx-miss'][:, 1].sum() - returns the sum of packet counters for
41 interface 1 on all threads for simple counters
48 from struct import Struct
54 '''Get file descriptor for memory map'''
55 fds = array.array("i") # Array of ints
56 _, ancdata, _, _ = sock.recvmsg(0, socket.CMSG_LEN(4))
57 for cmsg_level, cmsg_type, cmsg_data in ancdata:
58 if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS:
59 fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
62 VEC_LEN_FMT = Struct('I')
63 def get_vec_len(stats, vector_offset):
64 '''Equivalent to VPP vec_len()'''
65 return VEC_LEN_FMT.unpack_from(stats.statseg, vector_offset - 8)[0]
67 def get_string(stats, ptr):
68 '''Get a string from a VPP vector'''
69 namevector = ptr - stats.base
70 namevectorlen = get_vec_len(stats, namevector)
71 if namevector + namevectorlen >= stats.size:
72 raise IOError('String overruns stats segment')
73 return stats.statseg[namevector:namevector+namevectorlen-1].decode('ascii')
77 '''A class representing a VPP vector'''
79 def __init__(self, stats, ptr, fmt):
80 self.vec_start = ptr - stats.base
81 self.vec_len = get_vec_len(stats, ptr - stats.base)
82 self.struct = Struct(fmt)
83 self.fmtlen = len(fmt)
84 self.elementsize = self.struct.size
85 self.statseg = stats.statseg
88 if self.vec_start + self.vec_len * self.elementsize >= stats.size:
89 raise IOError('Vector overruns stats segment')
93 return self.struct.iter_unpack(self.statseg[self.vec_start:self.vec_start +
94 self.elementsize*self.vec_len])
96 def __getitem__(self, index):
97 if index > self.vec_len:
98 raise IOError('Index beyond end of vector')
101 return self.struct.unpack_from(self.statseg, self.vec_start +
102 (index * self.elementsize))[0]
103 return self.struct.unpack_from(self.statseg, self.vec_start +
104 (index * self.elementsize))
107 '''Main class implementing Python access to the VPP statistics segment'''
108 # pylint: disable=too-many-instance-attributes
109 shared_headerfmt = Struct('QPQQPP')
110 default_socketname = '/run/vpp/stats.sock'
112 def __init__(self, socketname=default_socketname, timeout=10):
113 self.socketname = socketname
114 self.timeout = timeout
116 self.lock = StatsLock(self)
117 self.connected = False
120 self.error_vectors = 0
124 '''Connect to stats segment'''
127 sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
128 sock.connect(self.socketname)
133 stat_result = os.fstat(mfd)
134 self.statseg = mmap.mmap(mfd, stat_result.st_size, mmap.PROT_READ, mmap.MAP_SHARED)
137 self.size = stat_result.st_size
138 if self.version != 2:
139 raise Exception('Incompatbile stat segment version {}'
140 .format(self.version))
143 self.connected = True
145 def disconnect(self):
146 '''Disconnect from stats segment'''
149 self.connected = False
153 '''Get version of stats segment'''
154 return self.shared_headerfmt.unpack_from(self.statseg)[0]
158 '''Get base pointer of stats segment'''
159 return self.shared_headerfmt.unpack_from(self.statseg)[1]
163 '''Get current epoch value from stats segment'''
164 return self.shared_headerfmt.unpack_from(self.statseg)[2]
167 def in_progress(self):
168 '''Get value of in_progress from stats segment'''
169 return self.shared_headerfmt.unpack_from(self.statseg)[3]
172 def directory_vector(self):
173 '''Get pointer of directory vector'''
174 return self.shared_headerfmt.unpack_from(self.statseg)[4]
177 def error_vector(self):
178 '''Get pointer of error vector'''
179 return self.shared_headerfmt.unpack_from(self.statseg)[5]
181 elementfmt = 'IQ128s'
183 def refresh(self, blocking=True):
184 '''Refresh directory vector cache (epoch changed)'''
186 directory_by_idx = {}
190 self.last_epoch = self.epoch
191 for i, direntry in enumerate(StatsVector(self, self.directory_vector, self.elementfmt)):
192 path_raw = direntry[2].find(b'\x00')
193 path = direntry[2][:path_raw].decode('ascii')
194 directory[path] = StatsEntry(direntry[0], direntry[1])
195 directory_by_idx[i] = path
196 self.directory = directory
197 self.directory_by_idx = directory_by_idx
199 # Cache the error index vectors
200 self.error_vectors = []
201 for threads in StatsVector(self, self.error_vector, 'P'):
202 self.error_vectors.append(StatsVector(self, threads[0], 'Q'))
203 # Return statement must be outside the lock block to be sure
204 # lock.release is executed
210 def __getitem__(self, item, blocking=True):
211 if not self.connected:
215 if self.last_epoch != self.epoch:
216 self.refresh(blocking)
218 result = self.directory[item].get_counter(self)
219 # Return statement must be outside the lock block to be sure
220 # lock.release is executed
227 return iter(self.directory.items())
229 def set_errors(self, blocking=True):
230 '''Return dictionary of error counters > 0'''
231 if not self.connected:
234 errors = {k:v for k, v in self.directory.items() if k.startswith("/err/")}
238 if self.last_epoch != self.epoch:
239 self.refresh(blocking)
241 for k, entry in errors.items():
244 for per_thread in self.error_vectors:
245 total += per_thread[i]
253 def set_errors_str(self, blocking=True):
254 '''Return all errors counters > 0 pretty printed'''
255 error_string = ['ERRORS:']
256 error_counters = self.set_errors(blocking)
257 for k in sorted(error_counters):
258 error_string.append('{:<60}{:>10}'.format(k, error_counters[k]))
259 return '%s\n' % '\n'.join(error_string)
261 def get_counter(self, name, blocking=True):
262 '''Alternative call to __getitem__'''
263 return self.__getitem__(name, blocking)
265 def get_err_counter(self, name, blocking=True):
266 '''Return a single value (sum of all threads)'''
267 if not self.connected:
269 if name.startswith("/err/"):
272 if self.last_epoch != self.epoch:
273 self.refresh(blocking)
275 result = sum(self.directory[name].get_counter(self))
276 # Return statement must be outside the lock block to be sure
277 # lock.release is executed
283 def ls(self, patterns):
284 '''Returns list of counters matching pattern'''
285 # pylint: disable=invalid-name
286 if not self.connected:
288 if not isinstance(patterns, list):
289 patterns = [patterns]
290 regex = [re.compile(i) for i in patterns]
291 return [k for k, v in self.directory.items()
292 if any(re.match(pattern, k) for pattern in regex)]
294 def dump(self, counters, blocking=True):
295 '''Given a list of counters return a dictionary of results'''
296 if not self.connected:
300 result[cnt] = self.__getitem__(cnt,blocking)
304 '''Stat segment optimistic locking'''
306 def __init__(self, stats):
311 acquired = self.acquire(blocking=True)
312 assert acquired, "Lock wasn't acquired, but blocking=True"
315 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
318 def acquire(self, blocking=True, timeout=-1):
319 '''Acquire the lock. Await in progress to go false. Record epoch.'''
320 self.epoch = self.stats.epoch
322 start = time.monotonic()
323 while self.stats.in_progress:
327 if start + time.monotonic() > timeout:
332 '''Check if data read while locked is valid'''
333 if self.stats.in_progress or self.stats.epoch != self.epoch:
334 raise IOError('Optimistic lock failed, retry')
340 class StatsCombinedList(list):
341 '''Column slicing for Combined counters list'''
343 def __getitem__(self, item):
344 '''Supports partial numpy style 2d support. Slice by column [:,1]'''
345 if isinstance(item, int):
346 return list.__getitem__(self, item)
347 return CombinedList([row[item[1]] for row in self])
349 class CombinedList(list):
350 '''Combined Counters 2-dimensional by thread by index of packets/octets'''
353 '''Return column (2nd dimension). Packets for all threads'''
354 return [pair[0] for pair in self]
357 '''Return column (2nd dimension). Octets for all threads'''
358 return [pair[1] for pair in self]
360 def sum_packets(self):
361 '''Return column (2nd dimension). Sum of all packets for all threads'''
362 return sum(self.packets())
364 def sum_octets(self):
365 '''Return column (2nd dimension). Sum of all octets for all threads'''
366 return sum(self.octets())
368 class StatsTuple(tuple):
369 '''A Combined vector tuple (packets, octets)'''
370 def __init__(self, data):
371 self.dictionary = {'packets': data[0], 'bytes': data[1]}
375 return dict.__repr__(self.dictionary)
377 def __getitem__(self, item):
378 if isinstance(item, int):
379 return tuple.__getitem__(self, item)
380 if item == 'packets':
381 return tuple.__getitem__(self, 0)
382 return tuple.__getitem__(self, 1)
384 class StatsSimpleList(list):
385 '''Simple Counters 2-dimensional by thread by index of packets'''
387 def __getitem__(self, item):
388 '''Supports partial numpy style 2d support. Slice by column [:,1]'''
389 if isinstance(item, int):
390 return list.__getitem__(self, item)
391 return SimpleList([row[item[1]] for row in self])
393 class SimpleList(list):
401 '''An individual stats entry'''
402 # pylint: disable=unused-argument,no-self-use
404 def __init__(self, stattype, statvalue):
406 self.value = statvalue
409 self.function = self.scalar
411 self.function = self.simple
413 self.function = self.combined
415 self.function = self.error
417 self.function = self.name
419 self.function = self.symlink
421 self.function = self.illegal
423 def illegal(self, stats):
424 '''Invalid or unknown counter type'''
427 def scalar(self, stats):
431 def simple(self, stats):
433 counter = StatsSimpleList()
434 for threads in StatsVector(stats, self.value, 'P'):
435 clist = [v[0] for v in StatsVector(stats, threads[0], 'Q')]
436 counter.append(clist)
439 def combined(self, stats):
440 '''Combined counter'''
441 counter = StatsCombinedList()
442 for threads in StatsVector(stats, self.value, 'P'):
443 clist = [StatsTuple(cnt) for cnt in StatsVector(stats, threads[0], 'QQ')]
444 counter.append(clist)
447 def error(self, stats):
449 counter = SimpleList()
450 for clist in stats.error_vectors:
451 counter.append(clist[self.value])
454 def name(self, stats):
457 for name in StatsVector(stats, self.value, 'P'):
459 counter.append(get_string(stats, name[0]))
462 SYMLINK_FMT1 = Struct('II')
463 SYMLINK_FMT2 = Struct('Q')
464 def symlink(self, stats):
465 '''Symlink counter'''
466 b = self.SYMLINK_FMT2.pack(self.value)
467 index1, index2 = self.SYMLINK_FMT1.unpack(b)
468 name = stats.directory_by_idx[index1]
469 return stats[name][:,index2]
471 def get_counter(self, stats):
472 '''Return a list of counters'''
474 return self.function(stats)
476 class TestStats(unittest.TestCase):
477 '''Basic statseg tests'''
480 '''Connect to statseg'''
481 self.stat = VPPStats()
483 self.profile = cProfile.Profile()
484 self.profile.enable()
487 '''Disconnect from statseg'''
488 self.stat.disconnect()
489 profile = Stats(self.profile)
491 profile.sort_stats('cumtime')
492 profile.print_stats()
495 def test_counters(self):
496 '''Test access to statseg'''
498 print('/err/abf-input-ip4/missed', self.stat['/err/abf-input-ip4/missed'])
499 print('/sys/heartbeat', self.stat['/sys/heartbeat'])
500 print('/if/names', self.stat['/if/names'])
501 print('/if/rx-miss', self.stat['/if/rx-miss'])
502 print('/if/rx-miss', self.stat['/if/rx-miss'][1])
503 print('/nat44-ed/out2in/slowpath/drops', self.stat['/nat44-ed/out2in/slowpath/drops'])
504 print('Set Errors', self.stat.set_errors())
505 with self.assertRaises(KeyError):
506 print('NO SUCH COUNTER', self.stat['foobar'])
507 print('/if/rx', self.stat.get_counter('/if/rx'))
508 print('/err/ethernet-input/no error',
509 self.stat.get_err_counter('/err/ethernet-input/no error'))
511 def test_column(self):
512 '''Test column slicing'''
514 print('/if/rx-miss', self.stat['/if/rx-miss'])
515 print('/if/rx', self.stat['/if/rx']) # All interfaces for thread #1
516 print('/if/rx thread #1', self.stat['/if/rx'][0]) # All interfaces for thread #1
517 print('/if/rx thread #1, interface #1',
518 self.stat['/if/rx'][0][1]) # All interfaces for thread #1
519 print('/if/rx if_index #1', self.stat['/if/rx'][:, 1])
520 print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].packets())
521 print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].sum_packets())
522 print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].octets())
523 print('/if/rx-miss', self.stat['/if/rx-miss'])
524 print('/if/rx-miss if_index #1 packets', self.stat['/if/rx-miss'][:, 1].sum())
525 print('/if/rx if_index #1 packets', self.stat['/if/rx'][0][1]['packets'])
527 def test_error(self):
528 '''Test the error vector'''
530 print('/err/ethernet-input', self.stat['/err/ethernet-input/no error'])
531 print('/err/nat44-ei-ha/pkts-processed', self.stat['/err/nat44-ei-ha/pkts-processed'])
532 print('/err/ethernet-input', self.stat.get_err_counter('/err/ethernet-input/no error'))
533 print('/err/ethernet-input', self.stat['/err/ethernet-input/no error'].sum())
535 def test_nat44(self):
536 '''Test the nat counters'''
538 print('/nat44-ei/ha/del-event-recv', self.stat['/nat44-ei/ha/del-event-recv'])
539 print('/err/nat44-ei-ha/pkts-processed', self.stat['/err/nat44-ei-ha/pkts-processed'].sum())
541 def test_legacy(self):
542 '''Legacy interface'''
543 directory = self.stat.ls(["^/if", "/err/ip4-input", "/sys/node/ip4-input"])
544 data = self.stat.dump(directory)
546 print('Looking up sys node')
547 directory = self.stat.ls(["^/sys/node"])
548 print('Dumping sys node')
549 data = self.stat.dump(directory)
551 directory = self.stat.ls(["^/foobar"])
552 data = self.stat.dump(directory)
555 def test_sys_nodes(self):
556 '''Test /sys/nodes'''
557 counters = self.stat.ls('^/sys/node')
558 print('COUNTERS:', counters)
559 print('/sys/node', self.stat.dump(counters))
560 print('/net/route/to', self.stat['/net/route/to'])
562 def test_symlink(self):
564 print('/interface/local0/rx', self.stat['/interfaces/local0/rx'])
565 print('/sys/nodes/unix-epoll-input', self.stat['/nodes/unix-epoll-input/calls'])
567 if __name__ == '__main__':
569 from pstats import Stats