3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 This module implement Python access to the VPP statistics segment. It
19 accesses the data structures directly in shared memory.
20 VPP uses optimistic locking, so data structures may change underneath
21 us while we are reading. Data is copied out and it's important to
22 spend as little time as possible "holding the lock".
24 Counters are stored in VPP as a two dimensional array.
25 Index by thread and index (typically sw_if_index).
26 Simple counters count only packets, Combined counters count packets
29 Counters can be accessed in either dimension.
30 stat['/if/rx'] - returns 2D lists
31 stat['/if/rx'][0] - returns counters for all interfaces for thread 0
32 stat['/if/rx'][0][1] - returns counter for interface 1 on thread 0
33 stat['/if/rx'][0][1]['packets'] - returns the packet counter
34 for interface 1 on thread 0
35 stat['/if/rx'][:, 1] - returns the counters for interface 1 on all threads
36 stat['/if/rx'][:, 1].packets() - returns the packet counters for
37 interface 1 on all threads
38 stat['/if/rx'][:, 1].sum_packets() - returns the sum of packet counters for
39 interface 1 on all threads
40 stat['/if/rx-miss'][:, 1].sum() - returns the sum of packet counters for
41 interface 1 on all threads for simple counters
48 from struct import Struct
54 '''Get file descriptor for memory map'''
55 fds = array.array("i") # Array of ints
56 _, ancdata, _, _ = sock.recvmsg(0, socket.CMSG_LEN(4))
57 for cmsg_level, cmsg_type, cmsg_data in ancdata:
58 if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS:
59 fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
62 VEC_LEN_FMT = Struct('I')
63 def get_vec_len(stats, vector_offset):
64 '''Equivalent to VPP vec_len()'''
65 return VEC_LEN_FMT.unpack_from(stats.statseg, vector_offset - 8)[0]
67 def get_string(stats, ptr):
68 '''Get a string from a VPP vector'''
69 namevector = ptr - stats.base
70 namevectorlen = get_vec_len(stats, namevector)
71 if namevector + namevectorlen >= stats.size:
72 raise ValueError('String overruns stats segment')
73 return stats.statseg[namevector:namevector+namevectorlen-1].decode('ascii')
77 '''A class representing a VPP vector'''
79 def __init__(self, stats, ptr, fmt):
80 self.vec_start = ptr - stats.base
81 self.vec_len = get_vec_len(stats, ptr - stats.base)
82 self.struct = Struct(fmt)
83 self.fmtlen = len(fmt)
84 self.elementsize = self.struct.size
85 self.statseg = stats.statseg
88 if self.vec_start + self.vec_len * self.elementsize >= stats.size:
89 raise ValueError('Vector overruns stats segment')
93 return self.struct.iter_unpack(self.statseg[self.vec_start:self.vec_start +
94 self.elementsize*self.vec_len])
96 def __getitem__(self, index):
97 if index > self.vec_len:
98 raise ValueError('Index beyond end of vector')
101 return self.struct.unpack_from(self.statseg, self.vec_start +
102 (index * self.elementsize))[0]
103 return self.struct.unpack_from(self.statseg, self.vec_start +
104 (index * self.elementsize))
107 '''Main class implementing Python access to the VPP statistics segment'''
108 # pylint: disable=too-many-instance-attributes
109 shared_headerfmt = Struct('QPQQPP')
110 default_socketname = '/run/vpp/stats.sock'
112 def __init__(self, socketname=default_socketname, timeout=10):
113 self.socketname = socketname
114 self.timeout = timeout
116 self.lock = StatsLock(self)
117 self.connected = False
120 self.error_vectors = 0
124 '''Connect to stats segment'''
127 sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
128 sock.connect(self.socketname)
133 stat_result = os.fstat(mfd)
134 self.statseg = mmap.mmap(mfd, stat_result.st_size, mmap.PROT_READ, mmap.MAP_SHARED)
137 self.size = stat_result.st_size
138 if self.version != 2:
139 raise Exception('Incompatbile stat segment version {}'
140 .format(self.version))
143 self.connected = True
145 def disconnect(self):
146 '''Disconnect from stats segment'''
149 self.connected = False
153 '''Get version of stats segment'''
154 return self.shared_headerfmt.unpack_from(self.statseg)[0]
158 '''Get base pointer of stats segment'''
159 return self.shared_headerfmt.unpack_from(self.statseg)[1]
163 '''Get current epoch value from stats segment'''
164 return self.shared_headerfmt.unpack_from(self.statseg)[2]
167 def in_progress(self):
168 '''Get value of in_progress from stats segment'''
169 return self.shared_headerfmt.unpack_from(self.statseg)[3]
172 def directory_vector(self):
173 '''Get pointer of directory vector'''
174 return self.shared_headerfmt.unpack_from(self.statseg)[4]
177 def error_vector(self):
178 '''Get pointer of error vector'''
179 return self.shared_headerfmt.unpack_from(self.statseg)[5]
181 elementfmt = 'IQ128s'
184 '''Refresh directory vector cache (epoch changed)'''
187 for direntry in StatsVector(self, self.directory_vector, self.elementfmt):
188 path_raw = direntry[2].find(b'\x00')
189 path = direntry[2][:path_raw].decode('ascii')
190 directory[path] = StatsEntry(direntry[0], direntry[1])
191 self.last_epoch = self.epoch
192 self.directory = directory
194 # Cache the error index vectors
195 self.error_vectors = []
196 for threads in StatsVector(self, self.error_vector, 'P'):
197 self.error_vectors.append(StatsVector(self, threads[0], 'Q'))
199 def __getitem__(self, item):
200 if not self.connected:
202 if self.last_epoch != self.epoch:
205 return self.directory[item].get_counter(self)
208 return iter(self.directory.items())
210 def set_errors(self):
211 '''Return dictionary of error counters > 0'''
212 if not self.connected:
215 errors = {k:v for k, v in self.directory.items() if k.startswith("/err/")}
218 for k, entry in errors.items():
221 for per_thread in self.error_vectors:
222 total += per_thread[i]
227 def set_errors_str(self):
228 '''Return all errors counters > 0 pretty printed'''
229 error_string = ['ERRORS:']
230 error_counters = self.set_errors()
231 for k in sorted(error_counters):
232 error_string.append('{:<60}{:>10}'.format(k, error_counters[k]))
233 return '%s\n' % '\n'.join(error_string)
235 def get_counter(self, name):
236 '''Alternative call to __getitem__'''
237 return self.__getitem__(name)
239 def get_err_counter(self, name):
240 '''Return a single value (sum of all threads)'''
241 if not self.connected:
243 return sum(self.directory[name].get_counter(self))
245 def ls(self, patterns):
246 '''Returns list of counters matching pattern'''
247 # pylint: disable=invalid-name
248 if not self.connected:
250 regex = [re.compile(i) for i in patterns]
251 return [k for k, v in self.directory.items()
252 if any(re.match(pattern, k) for pattern in regex)]
254 def dump(self, counters):
255 '''Given a list of counters return a dictionary of results'''
256 if not self.connected:
260 result[cnt] = self.__getitem__(cnt)
264 '''Stat segment optimistic locking'''
266 def __init__(self, stats):
271 acquired = self.acquire(blocking=True)
272 assert acquired, "Lock wasn't acquired, but blocking=True"
275 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
278 def acquire(self, blocking=True, timeout=-1):
279 '''Acquire the lock. Await in progress to go false. Record epoch.'''
280 self.epoch = self.stats.epoch
282 start = time.monotonic()
283 while self.stats.in_progress:
287 if start + time.monotonic() > timeout:
292 '''Check if data read while locked is valid'''
293 if self.stats.in_progress or self.stats.epoch != self.epoch:
294 raise IOError('Optimistic lock failed, retry')
300 class StatsCombinedList(list):
301 '''Column slicing for Combined counters list'''
303 def __getitem__(self, item):
304 '''Supports partial numpy style 2d support. Slice by column [:,1]'''
305 if isinstance(item, int):
306 return list.__getitem__(self, item)
307 return CombinedList([row[item[1]] for row in self])
309 class CombinedList(list):
310 '''Combined Counters 2-dimensional by thread by index of packets/octets'''
313 '''Return column (2nd dimension). Packets for all threads'''
314 return [pair[0] for pair in self]
317 '''Return column (2nd dimension). Octets for all threads'''
318 return [pair[1] for pair in self]
320 def sum_packets(self):
321 '''Return column (2nd dimension). Sum of all packets for all threads'''
322 return sum(self.packets())
324 def sum_octets(self):
325 '''Return column (2nd dimension). Sum of all octets for all threads'''
326 return sum(self.octets())
328 class StatsTuple(tuple):
329 '''A Combined vector tuple (packets, octets)'''
330 def __init__(self, data):
331 self.dictionary = {'packets': data[0], 'bytes': data[1]}
335 return dict.__repr__(self.dictionary)
337 def __getitem__(self, item):
338 if isinstance(item, int):
339 return tuple.__getitem__(self, item)
340 if item == 'packets':
341 return tuple.__getitem__(self, 0)
342 return tuple.__getitem__(self, 1)
344 class StatsSimpleList(list):
345 '''Simple Counters 2-dimensional by thread by index of packets'''
347 def __getitem__(self, item):
348 '''Supports partial numpy style 2d support. Slice by column [:,1]'''
349 if isinstance(item, int):
350 return list.__getitem__(self, item)
351 return SimpleList([row[item[1]] for row in self])
353 class SimpleList(list):
361 '''An individual stats entry'''
362 # pylint: disable=unused-argument,no-self-use
364 def __init__(self, stattype, statvalue):
366 self.value = statvalue
369 self.function = self.scalar
371 self.function = self.simple
373 self.function = self.combined
375 self.function = self.error
377 self.function = self.name
379 self.function = self.illegal
381 def illegal(self, stats):
382 '''Invalid or unknown counter type'''
385 def scalar(self, stats):
389 def simple(self, stats):
391 counter = StatsSimpleList()
392 for threads in StatsVector(stats, self.value, 'P'):
393 clist = [v[0] for v in StatsVector(stats, threads[0], 'Q')]
394 counter.append(clist)
397 def combined(self, stats):
398 '''Combined counter'''
399 counter = StatsCombinedList()
400 for threads in StatsVector(stats, self.value, 'P'):
401 clist = [StatsTuple(cnt) for cnt in StatsVector(stats, threads[0], 'QQ')]
402 counter.append(clist)
405 def error(self, stats):
407 counter = SimpleList()
408 for clist in stats.error_vectors:
409 counter.append(clist[self.value])
412 def name(self, stats):
415 for name in StatsVector(stats, self.value, 'P'):
416 counter.append(get_string(stats, name[0]))
419 def get_counter(self, stats):
420 '''Return a list of counters'''
421 return self.function(stats)
423 class TestStats(unittest.TestCase):
424 '''Basic statseg tests'''
427 '''Connect to statseg'''
428 self.stat = VPPStats()
430 self.profile = cProfile.Profile()
431 self.profile.enable()
434 '''Disconnect from statseg'''
435 self.stat.disconnect()
436 profile = Stats(self.profile)
438 profile.sort_stats('cumtime')
439 profile.print_stats()
442 def test_counters(self):
443 '''Test access to statseg'''
445 print('/err/abf-input-ip4/missed', self.stat['/err/abf-input-ip4/missed'])
446 print('/sys/heartbeat', self.stat['/sys/heartbeat'])
447 print('/if/names', self.stat['/if/names'])
448 print('/if/rx-miss', self.stat['/if/rx-miss'])
449 print('/if/rx-miss', self.stat['/if/rx-miss'][1])
450 print('/nat44-ed/out2in/slowpath/drops', self.stat['/nat44-ed/out2in/slowpath/drops'])
451 print('Set Errors', self.stat.set_errors())
452 with self.assertRaises(KeyError):
453 print('NO SUCH COUNTER', self.stat['foobar'])
454 print('/if/rx', self.stat.get_counter('/if/rx'))
455 print('/err/ethernet-input/no error',
456 self.stat.get_err_counter('/err/ethernet-input/no error'))
458 def test_column(self):
459 '''Test column slicing'''
461 print('/if/rx-miss', self.stat['/if/rx-miss'])
462 print('/if/rx', self.stat['/if/rx']) # All interfaces for thread #1
463 print('/if/rx thread #1', self.stat['/if/rx'][0]) # All interfaces for thread #1
464 print('/if/rx thread #1, interface #1',
465 self.stat['/if/rx'][0][1]) # All interfaces for thread #1
466 print('/if/rx if_index #1', self.stat['/if/rx'][:, 1])
467 print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].packets())
468 print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].sum_packets())
469 print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].octets())
470 print('/if/rx-miss', self.stat['/if/rx-miss'])
471 print('/if/rx-miss if_index #1 packets', self.stat['/if/rx-miss'][:, 1].sum())
472 print('/if/rx if_index #1 packets', self.stat['/if/rx'][0][1]['packets'])
474 def test_error(self):
475 '''Test the error vector'''
477 print('/err/ethernet-input', self.stat['/err/ethernet-input/no error'])
478 print('/err/nat44-ei-ha/pkts-processed', self.stat['/err/nat44-ei-ha/pkts-processed'])
479 print('/err/ethernet-input', self.stat.get_err_counter('/err/ethernet-input/no error'))
480 print('/err/ethernet-input', self.stat['/err/ethernet-input/no error'].sum())
482 def test_nat44(self):
483 '''Test the nat counters'''
485 print('/nat44-ei/ha/del-event-recv', self.stat['/nat44-ei/ha/del-event-recv'])
486 print('/err/nat44-ei-ha/pkts-processed', self.stat['/err/nat44-ei-ha/pkts-processed'].sum())
488 def test_legacy(self):
489 '''Legacy interface'''
490 directory = self.stat.ls(["^/if", "/err/ip4-input", "/sys/node/ip4-input"])
491 data = self.stat.dump(directory)
493 print('Looking up sys node')
494 directory = self.stat.ls(["^/sys/node"])
495 print('Dumping sys node')
496 data = self.stat.dump(directory)
498 directory = self.stat.ls(["^/foobar"])
499 data = self.stat.dump(directory)
502 if __name__ == '__main__':
504 from pstats import Stats