3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 This module implement Python access to the VPP statistics segment. It
19 accesses the data structures directly in shared memory.
20 VPP uses optimistic locking, so data structures may change underneath
21 us while we are reading. Data is copied out and it's important to
22 spend as little time as possible "holding the lock".
24 Counters are stored in VPP as a two dimensional array.
25 Index by thread and index (typically sw_if_index).
26 Simple counters count only packets, Combined counters count packets
29 Counters can be accessed in either dimension.
30 stat['/if/rx'] - returns 2D lists
31 stat['/if/rx'][0] - returns counters for all interfaces for thread 0
32 stat['/if/rx'][0][1] - returns counter for interface 1 on thread 0
33 stat['/if/rx'][0][1]['packets'] - returns the packet counter
34 for interface 1 on thread 0
35 stat['/if/rx'][:, 1] - returns the counters for interface 1 on all threads
36 stat['/if/rx'][:, 1].packets() - returns the packet counters for
37 interface 1 on all threads
38 stat['/if/rx'][:, 1].sum_packets() - returns the sum of packet counters for
39 interface 1 on all threads
40 stat['/if/rx-miss'][:, 1].sum() - returns the sum of packet counters for
41 interface 1 on all threads for simple counters
48 from struct import Struct
54 '''Get file descriptor for memory map'''
55 fds = array.array("i") # Array of ints
56 _, ancdata, _, _ = sock.recvmsg(0, socket.CMSG_LEN(4))
57 for cmsg_level, cmsg_type, cmsg_data in ancdata:
58 if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS:
59 fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
62 VEC_LEN_FMT = Struct('I')
63 def get_vec_len(stats, vector_offset):
64 '''Equivalent to VPP vec_len()'''
65 return VEC_LEN_FMT.unpack_from(stats.statseg, vector_offset - 8)[0]
67 def get_string(stats, ptr):
68 '''Get a string from a VPP vector'''
69 namevector = ptr - stats.base
70 namevectorlen = get_vec_len(stats, namevector)
71 if namevector + namevectorlen >= stats.size:
72 raise IOError('String overruns stats segment')
73 return stats.statseg[namevector:namevector+namevectorlen-1].decode('ascii')
77 '''A class representing a VPP vector'''
79 def __init__(self, stats, ptr, fmt):
80 self.vec_start = ptr - stats.base
81 self.vec_len = get_vec_len(stats, ptr - stats.base)
82 self.struct = Struct(fmt)
83 self.fmtlen = len(fmt)
84 self.elementsize = self.struct.size
85 self.statseg = stats.statseg
88 if self.vec_start + self.vec_len * self.elementsize >= stats.size:
89 raise IOError('Vector overruns stats segment')
93 return self.struct.iter_unpack(self.statseg[self.vec_start:self.vec_start +
94 self.elementsize*self.vec_len])
96 def __getitem__(self, index):
97 if index > self.vec_len:
98 raise IOError('Index beyond end of vector')
101 return self.struct.unpack_from(self.statseg, self.vec_start +
102 (index * self.elementsize))[0]
103 return self.struct.unpack_from(self.statseg, self.vec_start +
104 (index * self.elementsize))
107 '''Main class implementing Python access to the VPP statistics segment'''
108 # pylint: disable=too-many-instance-attributes
109 shared_headerfmt = Struct('QPQQPP')
110 default_socketname = '/run/vpp/stats.sock'
112 def __init__(self, socketname=default_socketname, timeout=10):
113 self.socketname = socketname
114 self.timeout = timeout
116 self.lock = StatsLock(self)
117 self.connected = False
123 '''Connect to stats segment'''
126 sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
127 sock.connect(self.socketname)
132 stat_result = os.fstat(mfd)
133 self.statseg = mmap.mmap(mfd, stat_result.st_size, mmap.PROT_READ, mmap.MAP_SHARED)
136 self.size = stat_result.st_size
137 if self.version != 2:
138 raise Exception('Incompatbile stat segment version {}'
139 .format(self.version))
142 self.connected = True
144 def disconnect(self):
145 '''Disconnect from stats segment'''
148 self.connected = False
152 '''Get version of stats segment'''
153 return self.shared_headerfmt.unpack_from(self.statseg)[0]
157 '''Get base pointer of stats segment'''
158 return self.shared_headerfmt.unpack_from(self.statseg)[1]
162 '''Get current epoch value from stats segment'''
163 return self.shared_headerfmt.unpack_from(self.statseg)[2]
166 def in_progress(self):
167 '''Get value of in_progress from stats segment'''
168 return self.shared_headerfmt.unpack_from(self.statseg)[3]
171 def directory_vector(self):
172 '''Get pointer of directory vector'''
173 return self.shared_headerfmt.unpack_from(self.statseg)[4]
175 elementfmt = 'IQ128s'
177 def refresh(self, blocking=True):
178 '''Refresh directory vector cache (epoch changed)'''
180 directory_by_idx = {}
184 self.last_epoch = self.epoch
185 for i, direntry in enumerate(StatsVector(self, self.directory_vector, self.elementfmt)):
186 path_raw = direntry[2].find(b'\x00')
187 path = direntry[2][:path_raw].decode('ascii')
188 directory[path] = StatsEntry(direntry[0], direntry[1])
189 directory_by_idx[i] = path
190 self.directory = directory
191 self.directory_by_idx = directory_by_idx
197 def __getitem__(self, item, blocking=True):
198 if not self.connected:
202 if self.last_epoch != self.epoch:
203 self.refresh(blocking)
205 return self.directory[item].get_counter(self)
211 return iter(self.directory.items())
214 def set_errors(self, blocking=True):
215 '''Return dictionary of error counters > 0'''
216 if not self.connected:
219 errors = {k: v for k, v in self.directory.items()
220 if k.startswith("/err/")}
224 total = self[k].sum()
231 def set_errors_str(self, blocking=True):
232 '''Return all errors counters > 0 pretty printed'''
233 error_string = ['ERRORS:']
234 error_counters = self.set_errors(blocking)
235 for k in sorted(error_counters):
236 error_string.append('{:<60}{:>10}'.format(k, error_counters[k]))
237 return '%s\n' % '\n'.join(error_string)
239 def get_counter(self, name, blocking=True):
240 '''Alternative call to __getitem__'''
241 return self.__getitem__(name, blocking)
243 def get_err_counter(self, name, blocking=True):
244 '''Alternative call to __getitem__'''
245 return self.__getitem__(name, blocking).sum()
247 def ls(self, patterns):
248 '''Returns list of counters matching pattern'''
249 # pylint: disable=invalid-name
250 if not self.connected:
252 if not isinstance(patterns, list):
253 patterns = [patterns]
254 regex = [re.compile(i) for i in patterns]
255 if self.last_epoch != self.epoch:
258 return [k for k, v in self.directory.items()
259 if any(re.match(pattern, k) for pattern in regex)]
261 def dump(self, counters, blocking=True):
262 '''Given a list of counters return a dictionary of results'''
263 if not self.connected:
267 result[cnt] = self.__getitem__(cnt,blocking)
271 '''Stat segment optimistic locking'''
273 def __init__(self, stats):
278 acquired = self.acquire(blocking=True)
279 assert acquired, "Lock wasn't acquired, but blocking=True"
282 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
285 def acquire(self, blocking=True, timeout=-1):
286 '''Acquire the lock. Await in progress to go false. Record epoch.'''
287 self.epoch = self.stats.epoch
289 start = time.monotonic()
290 while self.stats.in_progress:
294 if start + time.monotonic() > timeout:
299 '''Check if data read while locked is valid'''
300 if self.stats.in_progress or self.stats.epoch != self.epoch:
301 raise IOError('Optimistic lock failed, retry')
307 class StatsCombinedList(list):
308 '''Column slicing for Combined counters list'''
310 def __getitem__(self, item):
311 '''Supports partial numpy style 2d support. Slice by column [:,1]'''
312 if isinstance(item, int):
313 return list.__getitem__(self, item)
314 return CombinedList([row[item[1]] for row in self])
316 class CombinedList(list):
317 '''Combined Counters 2-dimensional by thread by index of packets/octets'''
320 '''Return column (2nd dimension). Packets for all threads'''
321 return [pair[0] for pair in self]
324 '''Return column (2nd dimension). Octets for all threads'''
325 return [pair[1] for pair in self]
327 def sum_packets(self):
328 '''Return column (2nd dimension). Sum of all packets for all threads'''
329 return sum(self.packets())
331 def sum_octets(self):
332 '''Return column (2nd dimension). Sum of all octets for all threads'''
333 return sum(self.octets())
335 class StatsTuple(tuple):
336 '''A Combined vector tuple (packets, octets)'''
337 def __init__(self, data):
338 self.dictionary = {'packets': data[0], 'bytes': data[1]}
342 return dict.__repr__(self.dictionary)
344 def __getitem__(self, item):
345 if isinstance(item, int):
346 return tuple.__getitem__(self, item)
347 if item == 'packets':
348 return tuple.__getitem__(self, 0)
349 return tuple.__getitem__(self, 1)
351 class StatsSimpleList(list):
352 '''Simple Counters 2-dimensional by thread by index of packets'''
354 def __getitem__(self, item):
355 '''Supports partial numpy style 2d support. Slice by column [:,1]'''
356 if isinstance(item, int):
357 return list.__getitem__(self, item)
358 return SimpleList([row[item[1]] for row in self])
360 class SimpleList(list):
368 '''An individual stats entry'''
369 # pylint: disable=unused-argument,no-self-use
371 def __init__(self, stattype, statvalue):
373 self.value = statvalue
376 self.function = self.scalar
378 self.function = self.simple
380 self.function = self.combined
382 self.function = self.name
384 self.function = self.symlink
386 self.function = self.illegal
388 def illegal(self, stats):
389 '''Invalid or unknown counter type'''
392 def scalar(self, stats):
396 def simple(self, stats):
398 counter = StatsSimpleList()
399 for threads in StatsVector(stats, self.value, 'P'):
400 clist = [v[0] for v in StatsVector(stats, threads[0], 'Q')]
401 counter.append(clist)
404 def combined(self, stats):
405 '''Combined counter'''
406 counter = StatsCombinedList()
407 for threads in StatsVector(stats, self.value, 'P'):
408 clist = [StatsTuple(cnt) for cnt in StatsVector(stats, threads[0], 'QQ')]
409 counter.append(clist)
412 def name(self, stats):
415 for name in StatsVector(stats, self.value, 'P'):
417 counter.append(get_string(stats, name[0]))
420 SYMLINK_FMT1 = Struct('II')
421 SYMLINK_FMT2 = Struct('Q')
422 def symlink(self, stats):
423 '''Symlink counter'''
424 b = self.SYMLINK_FMT2.pack(self.value)
425 index1, index2 = self.SYMLINK_FMT1.unpack(b)
426 name = stats.directory_by_idx[index1]
427 return stats[name][:,index2]
429 def get_counter(self, stats):
430 '''Return a list of counters'''
432 return self.function(stats)
434 class TestStats(unittest.TestCase):
435 '''Basic statseg tests'''
438 '''Connect to statseg'''
439 self.stat = VPPStats()
441 self.profile = cProfile.Profile()
442 self.profile.enable()
445 '''Disconnect from statseg'''
446 self.stat.disconnect()
447 profile = Stats(self.profile)
449 profile.sort_stats('cumtime')
450 profile.print_stats()
453 def test_counters(self):
454 '''Test access to statseg'''
456 print('/err/abf-input-ip4/missed', self.stat['/err/abf-input-ip4/missed'])
457 print('/sys/heartbeat', self.stat['/sys/heartbeat'])
458 print('/if/names', self.stat['/if/names'])
459 print('/if/rx-miss', self.stat['/if/rx-miss'])
460 print('/if/rx-miss', self.stat['/if/rx-miss'][1])
461 print('/nat44-ed/out2in/slowpath/drops', self.stat['/nat44-ed/out2in/slowpath/drops'])
462 with self.assertRaises(KeyError):
463 print('NO SUCH COUNTER', self.stat['foobar'])
464 print('/if/rx', self.stat.get_counter('/if/rx'))
465 print('/err/ethernet-input/no_error',
466 self.stat.get_counter('/err/ethernet-input/no_error'))
468 def test_column(self):
469 '''Test column slicing'''
471 print('/if/rx-miss', self.stat['/if/rx-miss'])
472 print('/if/rx', self.stat['/if/rx']) # All interfaces for thread #1
473 print('/if/rx thread #1', self.stat['/if/rx'][0]) # All interfaces for thread #1
474 print('/if/rx thread #1, interface #1',
475 self.stat['/if/rx'][0][1]) # All interfaces for thread #1
476 print('/if/rx if_index #1', self.stat['/if/rx'][:, 1])
477 print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].packets())
478 print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].sum_packets())
479 print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].octets())
480 print('/if/rx-miss', self.stat['/if/rx-miss'])
481 print('/if/rx-miss if_index #1 packets', self.stat['/if/rx-miss'][:, 1].sum())
482 print('/if/rx if_index #1 packets', self.stat['/if/rx'][0][1]['packets'])
484 def test_nat44(self):
485 '''Test the nat counters'''
487 print('/nat44-ei/ha/del-event-recv', self.stat['/nat44-ei/ha/del-event-recv'])
488 print('/err/nat44-ei-ha/pkts-processed', self.stat['/err/nat44-ei-ha/pkts-processed'].sum())
490 def test_legacy(self):
491 '''Legacy interface'''
492 directory = self.stat.ls(["^/if", "/err/ip4-input", "/sys/node/ip4-input"])
493 data = self.stat.dump(directory)
495 print('Looking up sys node')
496 directory = self.stat.ls(["^/sys/node"])
497 print('Dumping sys node')
498 data = self.stat.dump(directory)
500 directory = self.stat.ls(["^/foobar"])
501 data = self.stat.dump(directory)
504 def test_sys_nodes(self):
505 '''Test /sys/nodes'''
506 counters = self.stat.ls('^/sys/node')
507 print('COUNTERS:', counters)
508 print('/sys/node', self.stat.dump(counters))
509 print('/net/route/to', self.stat['/net/route/to'])
511 def test_symlink(self):
513 print('/interface/local0/rx', self.stat['/interfaces/local0/rx'])
514 print('/sys/nodes/unix-epoll-input', self.stat['/nodes/unix-epoll-input/calls'])
516 if __name__ == '__main__':
518 from pstats import Stats