stats: adding symlinks for nodes and interfaces in the stat segment
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_stats.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 '''
18 This module implement Python access to the VPP statistics segment. It
19 accesses the data structures directly in shared memory.
20 VPP uses optimistic locking, so data structures may change underneath
21 us while we are reading. Data is copied out and it's important to
22 spend as little time as possible "holding the lock".
23
24 Counters are stored in VPP as a two dimensional array.
25 Index by thread and index (typically sw_if_index).
26 Simple counters count only packets, Combined counters count packets
27 and octets.
28
29 Counters can be accessed in either dimension.
30 stat['/if/rx'] - returns 2D lists
31 stat['/if/rx'][0] - returns counters for all interfaces for thread 0
32 stat['/if/rx'][0][1] - returns counter for interface 1 on thread 0
33 stat['/if/rx'][0][1]['packets'] - returns the packet counter
34                                   for interface 1 on thread 0
35 stat['/if/rx'][:, 1] - returns the counters for interface 1 on all threads
36 stat['/if/rx'][:, 1].packets() - returns the packet counters for
37                                  interface 1 on all threads
38 stat['/if/rx'][:, 1].sum_packets() - returns the sum of packet counters for
39                                      interface 1 on all threads
40 stat['/if/rx-miss'][:, 1].sum() - returns the sum of packet counters for
41                                   interface 1 on all threads for simple counters
42 '''
43
44 import os
45 import socket
46 import array
47 import mmap
48 from struct import Struct
49 import time
50 import unittest
51 import re
52
53 def recv_fd(sock):
54     '''Get file descriptor for memory map'''
55     fds = array.array("i")   # Array of ints
56     _, ancdata, _, _ = sock.recvmsg(0, socket.CMSG_LEN(4))
57     for cmsg_level, cmsg_type, cmsg_data in ancdata:
58         if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS:
59             fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
60     return list(fds)[0]
61
62 VEC_LEN_FMT = Struct('I')
63 def get_vec_len(stats, vector_offset):
64     '''Equivalent to VPP vec_len()'''
65     return VEC_LEN_FMT.unpack_from(stats.statseg, vector_offset - 8)[0]
66
67 def get_string(stats, ptr):
68     '''Get a string from a VPP vector'''
69     namevector = ptr - stats.base
70     namevectorlen = get_vec_len(stats, namevector)
71     if namevector + namevectorlen >= stats.size:
72         raise ValueError('String overruns stats segment')
73     return stats.statseg[namevector:namevector+namevectorlen-1].decode('ascii')
74
75
76 class StatsVector:
77     '''A class representing a VPP vector'''
78
79     def __init__(self, stats, ptr, fmt):
80         self.vec_start = ptr - stats.base
81         self.vec_len = get_vec_len(stats, ptr - stats.base)
82         self.struct = Struct(fmt)
83         self.fmtlen = len(fmt)
84         self.elementsize = self.struct.size
85         self.statseg = stats.statseg
86         self.stats = stats
87
88         if self.vec_start + self.vec_len * self.elementsize >= stats.size:
89             raise ValueError('Vector overruns stats segment')
90
91     def __iter__(self):
92         with self.stats.lock:
93             return self.struct.iter_unpack(self.statseg[self.vec_start:self.vec_start +
94                                                         self.elementsize*self.vec_len])
95
96     def __getitem__(self, index):
97         if index > self.vec_len:
98             raise ValueError('Index beyond end of vector')
99         with self.stats.lock:
100             if self.fmtlen == 1:
101                 return self.struct.unpack_from(self.statseg, self.vec_start +
102                                                (index * self.elementsize))[0]
103             return self.struct.unpack_from(self.statseg, self.vec_start +
104                                            (index * self.elementsize))
105
106 class VPPStats():
107     '''Main class implementing Python access to the VPP statistics segment'''
108     # pylint: disable=too-many-instance-attributes
109     shared_headerfmt = Struct('QPQQPP')
110     default_socketname = '/run/vpp/stats.sock'
111
112     def __init__(self, socketname=default_socketname, timeout=10):
113         self.socketname = socketname
114         self.timeout = timeout
115         self.directory = {}
116         self.lock = StatsLock(self)
117         self.connected = False
118         self.size = 0
119         self.last_epoch = 0
120         self.error_vectors = 0
121         self.statseg = 0
122
123     def connect(self):
124         '''Connect to stats segment'''
125         if self.connected:
126             return
127         sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
128         sock.connect(self.socketname)
129
130         mfd = recv_fd(sock)
131         sock.close()
132
133         stat_result = os.fstat(mfd)
134         self.statseg = mmap.mmap(mfd, stat_result.st_size, mmap.PROT_READ, mmap.MAP_SHARED)
135         os.close(mfd)
136
137         self.size = stat_result.st_size
138         if self.version != 2:
139             raise Exception('Incompatbile stat segment version {}'
140                             .format(self.version))
141
142         self.refresh()
143         self.connected = True
144
145     def disconnect(self):
146         '''Disconnect from stats segment'''
147         if self.connected:
148             self.statseg.close()
149             self.connected = False
150
151     @property
152     def version(self):
153         '''Get version of stats segment'''
154         return self.shared_headerfmt.unpack_from(self.statseg)[0]
155
156     @property
157     def base(self):
158         '''Get base pointer of stats segment'''
159         return self.shared_headerfmt.unpack_from(self.statseg)[1]
160
161     @property
162     def epoch(self):
163         '''Get current epoch value from stats segment'''
164         return self.shared_headerfmt.unpack_from(self.statseg)[2]
165
166     @property
167     def in_progress(self):
168         '''Get value of in_progress from stats segment'''
169         return self.shared_headerfmt.unpack_from(self.statseg)[3]
170
171     @property
172     def directory_vector(self):
173         '''Get pointer of directory vector'''
174         return self.shared_headerfmt.unpack_from(self.statseg)[4]
175
176     @property
177     def error_vector(self):
178         '''Get pointer of error vector'''
179         return self.shared_headerfmt.unpack_from(self.statseg)[5]
180
181     elementfmt = 'IQ128s'
182
183     def refresh(self, blocking=True):
184         '''Refresh directory vector cache (epoch changed)'''
185         directory = {}
186         directory_by_idx = {}
187         while True:
188             try:
189                 with self.lock:
190                     for i, direntry in enumerate(StatsVector(self, self.directory_vector, self.elementfmt)):
191                         path_raw = direntry[2].find(b'\x00')
192                         path = direntry[2][:path_raw].decode('ascii')
193                         directory[path] = StatsEntry(direntry[0], direntry[1])
194                         directory_by_idx[i] = path
195                     self.last_epoch = self.epoch
196                     self.directory = directory
197                     self.directory_by_idx = directory_by_idx
198
199                     # Cache the error index vectors
200                     self.error_vectors = []
201                     for threads in StatsVector(self, self.error_vector, 'P'):
202                         self.error_vectors.append(StatsVector(self, threads[0], 'Q'))
203                     return
204             except IOError:
205                 if not blocking:
206                     raise
207
208     def __getitem__(self, item, blocking=True):
209         if not self.connected:
210             self.connect()
211         while True:
212             try:
213                 if self.last_epoch != self.epoch:
214                     self.refresh(blocking)
215                 with self.lock:
216                     return self.directory[item].get_counter(self)
217             except IOError:
218                 if not blocking:
219                     raise
220
221     def __iter__(self):
222         return iter(self.directory.items())
223
224     def set_errors(self, blocking=True):
225         '''Return dictionary of error counters > 0'''
226         if not self.connected:
227             self.connect()
228
229         errors = {k:v for k, v in self.directory.items() if k.startswith("/err/")}
230         result = {}
231         while True:
232             try:
233                 if self.last_epoch != self.epoch:
234                     self.refresh(blocking)
235                 with self.lock:
236                     for k, entry in errors.items():
237                         total = 0
238                         i = entry.value
239                         for per_thread in self.error_vectors:
240                             total += per_thread[i]
241                         if total:
242                             result[k] = total
243                 return result
244             except IOError:
245                 if not blocking:
246                     raise
247
248     def set_errors_str(self, blocking=True):
249         '''Return all errors counters > 0 pretty printed'''
250         error_string = ['ERRORS:']
251         error_counters = self.set_errors(blocking)
252         for k in sorted(error_counters):
253             error_string.append('{:<60}{:>10}'.format(k, error_counters[k]))
254         return '%s\n' % '\n'.join(error_string)
255
256     def get_counter(self, name, blocking=True):
257         '''Alternative call to __getitem__'''
258         return self.__getitem__(name, blocking)
259
260     def get_err_counter(self, name, blocking=True):
261         '''Return a single value (sum of all threads)'''
262         if not self.connected:
263             self.connect()
264         if name.startswith("/err/"):
265             while True:
266                 try:
267                     if self.last_epoch != self.epoch:
268                         self.refresh(blocking)
269                     with self.lock:
270                         return sum(self.directory[name].get_counter(self))
271                 except IOError:
272                     if not blocking:
273                         raise
274
275     def ls(self, patterns):
276         '''Returns list of counters matching pattern'''
277         # pylint: disable=invalid-name
278         if not self.connected:
279             self.connect()
280         if not isinstance(patterns, list):
281             patterns = [patterns]
282         regex = [re.compile(i) for i in patterns]
283         return [k for k, v in self.directory.items()
284                 if any(re.match(pattern, k) for pattern in regex)]
285
286     def dump(self, counters, blocking=True):
287         '''Given a list of counters return a dictionary of results'''
288         if not self.connected:
289             self.connect()
290         result = {}
291         for cnt in counters:
292             result[cnt] = self.__getitem__(cnt,blocking)
293         return result
294
295 class StatsLock():
296     '''Stat segment optimistic locking'''
297
298     def __init__(self, stats):
299         self.stats = stats
300         self.epoch = 0
301
302     def __enter__(self):
303         acquired = self.acquire(blocking=True)
304         assert acquired, "Lock wasn't acquired, but blocking=True"
305         return self
306
307     def __exit__(self, exc_type=None, exc_value=None, traceback=None):
308         self.release()
309
310     def acquire(self, blocking=True, timeout=-1):
311         '''Acquire the lock. Await in progress to go false. Record epoch.'''
312         self.epoch = self.stats.epoch
313         if timeout > 0:
314             start = time.monotonic()
315         while self.stats.in_progress:
316             if not blocking:
317                 time.sleep(0.01)
318                 if timeout > 0:
319                     if start + time.monotonic() > timeout:
320                         return False
321         return True
322
323     def release(self):
324         '''Check if data read while locked is valid'''
325         if self.stats.in_progress or self.stats.epoch != self.epoch:
326             raise IOError('Optimistic lock failed, retry')
327
328     def locked(self):
329         '''Not used'''
330
331
332 class StatsCombinedList(list):
333     '''Column slicing for Combined counters list'''
334
335     def __getitem__(self, item):
336         '''Supports partial numpy style 2d support. Slice by column [:,1]'''
337         if isinstance(item, int):
338             return list.__getitem__(self, item)
339         return CombinedList([row[item[1]] for row in self])
340
341 class CombinedList(list):
342     '''Combined Counters 2-dimensional by thread by index of packets/octets'''
343
344     def packets(self):
345         '''Return column (2nd dimension). Packets for all threads'''
346         return [pair[0] for pair in self]
347
348     def octets(self):
349         '''Return column (2nd dimension). Octets for all threads'''
350         return [pair[1] for pair in self]
351
352     def sum_packets(self):
353         '''Return column (2nd dimension). Sum of all packets for all threads'''
354         return sum(self.packets())
355
356     def sum_octets(self):
357         '''Return column (2nd dimension). Sum of all octets for all threads'''
358         return sum(self.octets())
359
360 class StatsTuple(tuple):
361     '''A Combined vector tuple (packets, octets)'''
362     def __init__(self, data):
363         self.dictionary = {'packets': data[0], 'bytes': data[1]}
364         super().__init__()
365
366     def __repr__(self):
367         return dict.__repr__(self.dictionary)
368
369     def __getitem__(self, item):
370         if isinstance(item, int):
371             return tuple.__getitem__(self, item)
372         if item == 'packets':
373             return tuple.__getitem__(self, 0)
374         return tuple.__getitem__(self, 1)
375
376 class StatsSimpleList(list):
377     '''Simple Counters 2-dimensional by thread by index of packets'''
378
379     def __getitem__(self, item):
380         '''Supports partial numpy style 2d support. Slice by column [:,1]'''
381         if isinstance(item, int):
382             return list.__getitem__(self, item)
383         return SimpleList([row[item[1]] for row in self])
384
385 class SimpleList(list):
386     '''Simple counter'''
387
388     def sum(self):
389         '''Sum the vector'''
390         return sum(self)
391
392 class StatsEntry():
393     '''An individual stats entry'''
394     # pylint: disable=unused-argument,no-self-use
395
396     def __init__(self, stattype, statvalue):
397         self.type = stattype
398         self.value = statvalue
399
400         if stattype == 1:
401             self.function = self.scalar
402         elif stattype == 2:
403             self.function = self.simple
404         elif stattype == 3:
405             self.function = self.combined
406         elif stattype == 4:
407             self.function = self.error
408         elif stattype == 5:
409             self.function = self.name
410         elif stattype == 7:
411             self.function = self.symlink
412         else:
413             self.function = self.illegal
414
415     def illegal(self, stats):
416         '''Invalid or unknown counter type'''
417         return None
418
419     def scalar(self, stats):
420         '''Scalar counter'''
421         return self.value
422
423     def simple(self, stats):
424         '''Simple counter'''
425         counter = StatsSimpleList()
426         for threads in StatsVector(stats, self.value, 'P'):
427             clist = [v[0] for v in StatsVector(stats, threads[0], 'Q')]
428             counter.append(clist)
429         return counter
430
431     def combined(self, stats):
432         '''Combined counter'''
433         counter = StatsCombinedList()
434         for threads in StatsVector(stats, self.value, 'P'):
435             clist = [StatsTuple(cnt) for cnt in StatsVector(stats, threads[0], 'QQ')]
436             counter.append(clist)
437         return counter
438
439     def error(self, stats):
440         '''Error counter'''
441         counter = SimpleList()
442         for clist in stats.error_vectors:
443             counter.append(clist[self.value])
444         return counter
445
446     def name(self, stats):
447         '''Name counter'''
448         counter = []
449         for name in StatsVector(stats, self.value, 'P'):
450             if name[0]:
451                 counter.append(get_string(stats, name[0]))
452         return counter
453
454     SYMLINK_FMT1 = Struct('II')
455     SYMLINK_FMT2 = Struct('Q')
456     def symlink(self, stats):
457         '''Symlink counter'''
458         b = self.SYMLINK_FMT2.pack(self.value)
459         index1, index2 = self.SYMLINK_FMT1.unpack(b)
460         name = stats.directory_by_idx[index1]
461         return stats[name][:,index2]
462
463     def get_counter(self, stats):
464         '''Return a list of counters'''
465         if stats:
466             return self.function(stats)
467
468 class TestStats(unittest.TestCase):
469     '''Basic statseg tests'''
470
471     def setUp(self):
472         '''Connect to statseg'''
473         self.stat = VPPStats()
474         self.stat.connect()
475         self.profile = cProfile.Profile()
476         self.profile.enable()
477
478     def tearDown(self):
479         '''Disconnect from statseg'''
480         self.stat.disconnect()
481         profile = Stats(self.profile)
482         profile.strip_dirs()
483         profile.sort_stats('cumtime')
484         profile.print_stats()
485         print("\n--->>>")
486
487     def test_counters(self):
488         '''Test access to statseg'''
489
490         print('/err/abf-input-ip4/missed', self.stat['/err/abf-input-ip4/missed'])
491         print('/sys/heartbeat', self.stat['/sys/heartbeat'])
492         print('/if/names', self.stat['/if/names'])
493         print('/if/rx-miss', self.stat['/if/rx-miss'])
494         print('/if/rx-miss', self.stat['/if/rx-miss'][1])
495         print('/nat44-ed/out2in/slowpath/drops', self.stat['/nat44-ed/out2in/slowpath/drops'])
496         print('Set Errors', self.stat.set_errors())
497         with self.assertRaises(KeyError):
498             print('NO SUCH COUNTER', self.stat['foobar'])
499         print('/if/rx', self.stat.get_counter('/if/rx'))
500         print('/err/ethernet-input/no error',
501               self.stat.get_err_counter('/err/ethernet-input/no error'))
502
503     def test_column(self):
504         '''Test column slicing'''
505
506         print('/if/rx-miss', self.stat['/if/rx-miss'])
507         print('/if/rx', self.stat['/if/rx'])  # All interfaces for thread #1
508         print('/if/rx thread #1', self.stat['/if/rx'][0])  # All interfaces for thread #1
509         print('/if/rx thread #1, interface #1',
510               self.stat['/if/rx'][0][1])  # All interfaces for thread #1
511         print('/if/rx if_index #1', self.stat['/if/rx'][:, 1])
512         print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].packets())
513         print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].sum_packets())
514         print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].octets())
515         print('/if/rx-miss', self.stat['/if/rx-miss'])
516         print('/if/rx-miss if_index #1 packets', self.stat['/if/rx-miss'][:, 1].sum())
517         print('/if/rx if_index #1 packets', self.stat['/if/rx'][0][1]['packets'])
518
519     def test_error(self):
520         '''Test the error vector'''
521
522         print('/err/ethernet-input', self.stat['/err/ethernet-input/no error'])
523         print('/err/nat44-ei-ha/pkts-processed', self.stat['/err/nat44-ei-ha/pkts-processed'])
524         print('/err/ethernet-input', self.stat.get_err_counter('/err/ethernet-input/no error'))
525         print('/err/ethernet-input', self.stat['/err/ethernet-input/no error'].sum())
526
527     def test_nat44(self):
528         '''Test the nat counters'''
529
530         print('/nat44-ei/ha/del-event-recv', self.stat['/nat44-ei/ha/del-event-recv'])
531         print('/err/nat44-ei-ha/pkts-processed', self.stat['/err/nat44-ei-ha/pkts-processed'].sum())
532
533     def test_legacy(self):
534         '''Legacy interface'''
535         directory = self.stat.ls(["^/if", "/err/ip4-input", "/sys/node/ip4-input"])
536         data = self.stat.dump(directory)
537         print(data)
538         print('Looking up sys node')
539         directory = self.stat.ls(["^/sys/node"])
540         print('Dumping sys node')
541         data = self.stat.dump(directory)
542         print(data)
543         directory = self.stat.ls(["^/foobar"])
544         data = self.stat.dump(directory)
545         print(data)
546
547     def test_sys_nodes(self):
548         '''Test /sys/nodes'''
549         counters = self.stat.ls('^/sys/node')
550         print('COUNTERS:', counters)
551         print('/sys/node', self.stat.dump(counters))
552         print('/net/route/to', self.stat['/net/route/to'])
553
554     def test_symlink(self):
555         '''Symbolic links'''
556         print('/interface/local0/rx', self.stat['/interfaces/local0/rx'])
557         print('/sys/nodes/unix-epoll-input', self.stat['/nodes/unix-epoll-input/calls'])
558
559 if __name__ == '__main__':
560     import cProfile
561     from pstats import Stats
562
563     unittest.main()