stats: python vpp_stats rewrite to access stat segment directly
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_stats.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 '''
18 This module implement Python access to the VPP statistics segment. It
19 accesses the data structures directly in shared memory.
20 VPP uses optimistic locking, so data structures may change underneath
21 us while we are reading. Data is copied out and it's important to
22 spend as little time as possible "holding the lock".
23
24 Counters are stored in VPP as a two dimensional array.
25 Index by thread and index (typically sw_if_index).
26 Simple counters count only packets, Combined counters count packets
27 and octets.
28
29 Counters can be accessed in either dimension.
30 stat['/if/rx'] - returns 2D lists
31 stat['/if/rx'][0] - returns counters for all interfaces for thread 0
32 stat['/if/rx'][0][1] - returns counter for interface 1 on thread 0
33 stat['/if/rx'][0][1]['packets'] - returns the packet counter
34                                   for interface 1 on thread 0
35 stat['/if/rx'][:, 1] - returns the counters for interface 1 on all threads
36 stat['/if/rx'][:, 1].packets() - returns the packet counters for
37                                  interface 1 on all threads
38 stat['/if/rx'][:, 1].sum_packets() - returns the sum of packet counters for
39                                      interface 1 on all threads
40 stat['/if/rx-miss'][:, 1].sum() - returns the sum of packet counters for
41                                   interface 1 on all threads for simple counters
42 '''
43
44 import os
45 import socket
46 import array
47 import mmap
48 from struct import Struct
49 import time
50 import unittest
51 import re
52
53 def recv_fd(sock):
54     '''Get file descriptor for memory map'''
55     fds = array.array("i")   # Array of ints
56     _, ancdata, _, _ = sock.recvmsg(0, socket.CMSG_LEN(4))
57     for cmsg_level, cmsg_type, cmsg_data in ancdata:
58         if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS:
59             fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
60     return list(fds)[0]
61
62 VEC_LEN_FMT = Struct('I')
63 def get_vec_len(stats, vector_offset):
64     '''Equivalent to VPP vec_len()'''
65     return VEC_LEN_FMT.unpack_from(stats.statseg, vector_offset - 8)[0]
66
67 def get_string(stats, ptr):
68     '''Get a string from a VPP vector'''
69     namevector = ptr - stats.base
70     namevectorlen = get_vec_len(stats, namevector)
71     if namevector + namevectorlen >= stats.size:
72         raise ValueError('String overruns stats segment')
73     return stats.statseg[namevector:namevector+namevectorlen-1].decode('ascii')
74
75
76 class StatsVector:
77     '''A class representing a VPP vector'''
78
79     def __init__(self, stats, ptr, fmt):
80         self.vec_start = ptr - stats.base
81         self.vec_len = get_vec_len(stats, ptr - stats.base)
82         self.struct = Struct(fmt)
83         self.fmtlen = len(fmt)
84         self.elementsize = self.struct.size
85         self.statseg = stats.statseg
86         self.stats = stats
87
88         if self.vec_start + self.vec_len * self.elementsize >= stats.size:
89             raise ValueError('Vector overruns stats segment')
90
91     def __iter__(self):
92         with self.stats.lock:
93             return self.struct.iter_unpack(self.statseg[self.vec_start:self.vec_start +
94                                                         self.elementsize*self.vec_len])
95
96     def __getitem__(self, index):
97         if index > self.vec_len:
98             raise ValueError('Index beyond end of vector')
99         with self.stats.lock:
100             if self.fmtlen == 1:
101                 return self.struct.unpack_from(self.statseg, self.vec_start +
102                                                (index * self.elementsize))[0]
103             return self.struct.unpack_from(self.statseg, self.vec_start +
104                                            (index * self.elementsize))
105
106 class VPPStats():
107     '''Main class implementing Python access to the VPP statistics segment'''
108     # pylint: disable=too-many-instance-attributes
109     shared_headerfmt = Struct('QPQQPP')
110     default_socketname = '/run/vpp/stats.sock'
111
112     def __init__(self, socketname=default_socketname, timeout=10):
113         self.socketname = socketname
114         self.timeout = timeout
115         self.directory = {}
116         self.lock = StatsLock(self)
117         self.connected = False
118         self.size = 0
119         self.last_epoch = 0
120         self.error_vectors = 0
121         self.statseg = 0
122
123     def connect(self):
124         '''Connect to stats segment'''
125         if self.connected:
126             return
127         sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
128         sock.connect(self.socketname)
129
130         mfd = recv_fd(sock)
131         sock.close()
132
133         stat_result = os.fstat(mfd)
134         self.statseg = mmap.mmap(mfd, stat_result.st_size, mmap.PROT_READ, mmap.MAP_SHARED)
135         socket.close(mfd)
136
137         self.size = stat_result.st_size
138         if self.version != 2:
139             raise Exception('Incompatbile stat segment version {}'
140                             .format(self.version))
141
142         self.refresh()
143         self.connected = True
144
145     def disconnect(self):
146         '''Disconnect from stats segment'''
147         if self.connected:
148             self.statseg.close()
149             self.connected = False
150
151     @property
152     def version(self):
153         '''Get version of stats segment'''
154         return self.shared_headerfmt.unpack_from(self.statseg)[0]
155
156     @property
157     def base(self):
158         '''Get base pointer of stats segment'''
159         return self.shared_headerfmt.unpack_from(self.statseg)[1]
160
161     @property
162     def epoch(self):
163         '''Get current epoch value from stats segment'''
164         return self.shared_headerfmt.unpack_from(self.statseg)[2]
165
166     @property
167     def in_progress(self):
168         '''Get value of in_progress from stats segment'''
169         return self.shared_headerfmt.unpack_from(self.statseg)[3]
170
171     @property
172     def directory_vector(self):
173         '''Get pointer of directory vector'''
174         return self.shared_headerfmt.unpack_from(self.statseg)[4]
175
176     @property
177     def error_vector(self):
178         '''Get pointer of error vector'''
179         return self.shared_headerfmt.unpack_from(self.statseg)[5]
180
181     elementfmt = 'IQ128s'
182
183     def refresh(self):
184         '''Refresh directory vector cache (epoch changed)'''
185         directory = {}
186         with self.lock:
187             for direntry in StatsVector(self, self.directory_vector, self.elementfmt):
188                 path_raw = direntry[2].find(b'\x00')
189                 path = direntry[2][:path_raw].decode('ascii')
190                 directory[path] = StatsEntry(direntry[0], direntry[1])
191             self.last_epoch = self.epoch
192             self.directory = directory
193
194             # Cache the error index vectors
195             self.error_vectors = []
196             for threads in StatsVector(self, self.error_vector, 'P'):
197                 self.error_vectors.append(StatsVector(self, threads[0], 'Q'))
198
199     def __getitem__(self, item):
200         if not self.connected:
201             self.connect()
202         if self.last_epoch != self.epoch:
203             self.refresh()
204         with self.lock:
205             return self.directory[item].get_counter(self)
206
207     def __iter__(self):
208         return iter(self.directory.items())
209
210     def set_errors(self):
211         '''Return dictionary of error counters > 0'''
212         if not self.connected:
213             self.connect()
214
215         errors = {k:v for k, v in self.directory.items() if k.startswith("/err/")}
216         result = {}
217         with self.lock:
218             for k, entry in errors.items():
219                 total = 0
220                 i = entry.value
221                 for per_thread in self.error_vectors:
222                     total += per_thread[i]
223                 if total:
224                     result[k] = total
225         return result
226
227     def set_errors_str(self):
228         '''Return all errors counters > 0 pretty printed'''
229         error_string = ['ERRORS:']
230         error_counters = self.set_errors()
231         for k in sorted(error_counters):
232             error_string.append('{:<60}{:>10}'.format(k, error_counters[k]))
233         return '%s\n' % '\n'.join(error_string)
234
235     def get_counter(self, name):
236         '''Alternative call to __getitem__'''
237         return self.__getitem__(name)
238
239     def get_err_counter(self, name):
240         '''Return a single value (sum of all threads)'''
241         if not self.connected:
242             self.connect()
243         return sum(self.directory[name].get_counter(self))
244
245     def ls(self, patterns):
246         '''Returns list of counters matching pattern'''
247         # pylint: disable=invalid-name
248         if not self.connected:
249             self.connect()
250         regex = [re.compile(i) for i in patterns]
251         return [k for k, v in self.directory.items()
252                 if any(re.match(pattern, k) for pattern in regex)]
253
254     def dump(self, counters):
255         '''Given a list of counters return a dictionary of results'''
256         if not self.connected:
257             self.connect()
258         result = {}
259         for cnt in counters:
260             result[cnt] = self.__getitem__(cnt)
261         return result
262
263 class StatsLock():
264     '''Stat segment optimistic locking'''
265
266     def __init__(self, stats):
267         self.stats = stats
268         self.epoch = 0
269
270     def __enter__(self):
271         acquired = self.acquire(blocking=True)
272         assert acquired, "Lock wasn't acquired, but blocking=True"
273         return self
274
275     def __exit__(self, exc_type=None, exc_value=None, traceback=None):
276         self.release()
277
278     def acquire(self, blocking=True, timeout=-1):
279         '''Acquire the lock. Await in progress to go false. Record epoch.'''
280         self.epoch = self.stats.epoch
281         if timeout > 0:
282             start = time.monotonic()
283         while self.stats.in_progress:
284             if not blocking:
285                 time.sleep(0.01)
286                 if timeout > 0:
287                     if start + time.monotonic() > timeout:
288                         return False
289         return True
290
291     def release(self):
292         '''Check if data read while locked is valid'''
293         if self.stats.in_progress or self.stats.epoch != self.epoch:
294             raise IOError('Optimistic lock failed, retry')
295
296     def locked(self):
297         '''Not used'''
298
299
300 class StatsCombinedList(list):
301     '''Column slicing for Combined counters list'''
302
303     def __getitem__(self, item):
304         '''Supports partial numpy style 2d support. Slice by column [:,1]'''
305         if isinstance(item, int):
306             return list.__getitem__(self, item)
307         return CombinedList([row[item[1]] for row in self])
308
309 class CombinedList(list):
310     '''Combined Counters 2-dimensional by thread by index of packets/octets'''
311
312     def packets(self):
313         '''Return column (2nd dimension). Packets for all threads'''
314         return [pair[0] for pair in self]
315
316     def octets(self):
317         '''Return column (2nd dimension). Octets for all threads'''
318         return [pair[1] for pair in self]
319
320     def sum_packets(self):
321         '''Return column (2nd dimension). Sum of all packets for all threads'''
322         return sum(self.packets())
323
324     def sum_octets(self):
325         '''Return column (2nd dimension). Sum of all octets for all threads'''
326         return sum(self.octets())
327
328 class StatsTuple(tuple):
329     '''A Combined vector tuple (packets, octets)'''
330     def __init__(self, data):
331         self.dictionary = {'packets': data[0], 'bytes': data[1]}
332         super().__init__()
333
334     def __repr__(self):
335         return dict.__repr__(self.dictionary)
336
337     def __getitem__(self, item):
338         if isinstance(item, int):
339             return tuple.__getitem__(self, item)
340         if item == 'packets':
341             return tuple.__getitem__(self, 0)
342         return tuple.__getitem__(self, 1)
343
344 class StatsSimpleList(list):
345     '''Simple Counters 2-dimensional by thread by index of packets'''
346
347     def __getitem__(self, item):
348         '''Supports partial numpy style 2d support. Slice by column [:,1]'''
349         if isinstance(item, int):
350             return list.__getitem__(self, item)
351         return SimpleList([row[item[1]] for row in self])
352
353 class SimpleList(list):
354     '''Simple counter'''
355
356     def sum(self):
357         '''Sum the vector'''
358         return sum(self)
359
360 class StatsEntry():
361     '''An individual stats entry'''
362     # pylint: disable=unused-argument,no-self-use
363
364     def __init__(self, stattype, statvalue):
365         self.type = stattype
366         self.value = statvalue
367
368         if stattype == 1:
369             self.function = self.scalar
370         elif stattype == 2:
371             self.function = self.simple
372         elif stattype == 3:
373             self.function = self.combined
374         elif stattype == 4:
375             self.function = self.error
376         elif stattype == 5:
377             self.function = self.name
378         else:
379             self.function = self.illegal
380
381     def illegal(self, stats):
382         '''Invalid or unknown counter type'''
383         return None
384
385     def scalar(self, stats):
386         '''Scalar counter'''
387         return self.value
388
389     def simple(self, stats):
390         '''Simple counter'''
391         counter = StatsSimpleList()
392         for threads in StatsVector(stats, self.value, 'P'):
393             clist = [v[0] for v in StatsVector(stats, threads[0], 'Q')]
394             counter.append(clist)
395         return counter
396
397     def combined(self, stats):
398         '''Combined counter'''
399         counter = StatsCombinedList()
400         for threads in StatsVector(stats, self.value, 'P'):
401             clist = [StatsTuple(cnt) for cnt in StatsVector(stats, threads[0], 'QQ')]
402             counter.append(clist)
403         return counter
404
405     def error(self, stats):
406         '''Error counter'''
407         counter = SimpleList()
408         for clist in stats.error_vectors:
409             counter.append(clist[self.value])
410         return counter
411
412     def name(self, stats):
413         '''Name counter'''
414         counter = []
415         for name in StatsVector(stats, self.value, 'P'):
416             counter.append(get_string(stats, name[0]))
417         return counter
418
419     def get_counter(self, stats):
420         '''Return a list of counters'''
421         return self.function(stats)
422
423 class TestStats(unittest.TestCase):
424     '''Basic statseg tests'''
425
426     def setUp(self):
427         '''Connect to statseg'''
428         self.stat = VPPStats()
429         self.stat.connect()
430         self.profile = cProfile.Profile()
431         self.profile.enable()
432
433     def tearDown(self):
434         '''Disconnect from statseg'''
435         self.stat.disconnect()
436         profile = Stats(self.profile)
437         profile.strip_dirs()
438         profile.sort_stats('cumtime')
439         profile.print_stats()
440         print("\n--->>>")
441
442     def test_counters(self):
443         '''Test access to statseg'''
444
445         print('/err/abf-input-ip4/missed', self.stat['/err/abf-input-ip4/missed'])
446         print('/sys/heartbeat', self.stat['/sys/heartbeat'])
447         print('/if/names', self.stat['/if/names'])
448         print('/if/rx-miss', self.stat['/if/rx-miss'])
449         print('/if/rx-miss', self.stat['/if/rx-miss'][1])
450         print('/nat44-ed/out2in/slowpath/drops', self.stat['/nat44-ed/out2in/slowpath/drops'])
451         print('Set Errors', self.stat.set_errors())
452         with self.assertRaises(KeyError):
453             print('NO SUCH COUNTER', self.stat['foobar'])
454         print('/if/rx', self.stat.get_counter('/if/rx'))
455         print('/err/ethernet-input/no error',
456               self.stat.get_err_counter('/err/ethernet-input/no error'))
457
458     def test_column(self):
459         '''Test column slicing'''
460
461         print('/if/rx-miss', self.stat['/if/rx-miss'])
462         print('/if/rx', self.stat['/if/rx'])  # All interfaces for thread #1
463         print('/if/rx thread #1', self.stat['/if/rx'][0])  # All interfaces for thread #1
464         print('/if/rx thread #1, interface #1',
465               self.stat['/if/rx'][0][1])  # All interfaces for thread #1
466         print('/if/rx if_index #1', self.stat['/if/rx'][:, 1])
467         print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].packets())
468         print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].sum_packets())
469         print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].octets())
470         print('/if/rx-miss', self.stat['/if/rx-miss'])
471         print('/if/rx-miss if_index #1 packets', self.stat['/if/rx-miss'][:, 1].sum())
472         print('/if/rx if_index #1 packets', self.stat['/if/rx'][0][1]['packets'])
473
474     def test_error(self):
475         '''Test the error vector'''
476
477         print('/err/ethernet-input', self.stat['/err/ethernet-input/no error'])
478         print('/err/nat44-ei-ha/pkts-processed', self.stat['/err/nat44-ei-ha/pkts-processed'])
479         print('/err/ethernet-input', self.stat.get_err_counter('/err/ethernet-input/no error'))
480         print('/err/ethernet-input', self.stat['/err/ethernet-input/no error'].sum())
481
482     def test_nat44(self):
483         '''Test the nat counters'''
484
485         print('/nat44-ei/ha/del-event-recv', self.stat['/nat44-ei/ha/del-event-recv'])
486         print('/err/nat44-ei-ha/pkts-processed', self.stat['/err/nat44-ei-ha/pkts-processed'].sum())
487
488     def test_legacy(self):
489         '''Legacy interface'''
490         directory = self.stat.ls(["^/if", "/err/ip4-input", "/sys/node/ip4-input"])
491         data = self.stat.dump(directory)
492         print(data)
493         print('Looking up sys node')
494         directory = self.stat.ls(["^/sys/node"])
495         print('Dumping sys node')
496         data = self.stat.dump(directory)
497         print(data)
498         directory = self.stat.ls(["^/foobar"])
499         data = self.stat.dump(directory)
500         print(data)
501
502 if __name__ == '__main__':
503     import cProfile
504     from pstats import Stats
505
506     unittest.main()