stats: check epoch in python vpp_stats ls
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_stats.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 '''
18 This module implement Python access to the VPP statistics segment. It
19 accesses the data structures directly in shared memory.
20 VPP uses optimistic locking, so data structures may change underneath
21 us while we are reading. Data is copied out and it's important to
22 spend as little time as possible "holding the lock".
23
24 Counters are stored in VPP as a two dimensional array.
25 Index by thread and index (typically sw_if_index).
26 Simple counters count only packets, Combined counters count packets
27 and octets.
28
29 Counters can be accessed in either dimension.
30 stat['/if/rx'] - returns 2D lists
31 stat['/if/rx'][0] - returns counters for all interfaces for thread 0
32 stat['/if/rx'][0][1] - returns counter for interface 1 on thread 0
33 stat['/if/rx'][0][1]['packets'] - returns the packet counter
34                                   for interface 1 on thread 0
35 stat['/if/rx'][:, 1] - returns the counters for interface 1 on all threads
36 stat['/if/rx'][:, 1].packets() - returns the packet counters for
37                                  interface 1 on all threads
38 stat['/if/rx'][:, 1].sum_packets() - returns the sum of packet counters for
39                                      interface 1 on all threads
40 stat['/if/rx-miss'][:, 1].sum() - returns the sum of packet counters for
41                                   interface 1 on all threads for simple counters
42 '''
43
44 import os
45 import socket
46 import array
47 import mmap
48 from struct import Struct
49 import time
50 import unittest
51 import re
52
53 def recv_fd(sock):
54     '''Get file descriptor for memory map'''
55     fds = array.array("i")   # Array of ints
56     _, ancdata, _, _ = sock.recvmsg(0, socket.CMSG_LEN(4))
57     for cmsg_level, cmsg_type, cmsg_data in ancdata:
58         if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS:
59             fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
60     return list(fds)[0]
61
62 VEC_LEN_FMT = Struct('I')
63 def get_vec_len(stats, vector_offset):
64     '''Equivalent to VPP vec_len()'''
65     return VEC_LEN_FMT.unpack_from(stats.statseg, vector_offset - 8)[0]
66
67 def get_string(stats, ptr):
68     '''Get a string from a VPP vector'''
69     namevector = ptr - stats.base
70     namevectorlen = get_vec_len(stats, namevector)
71     if namevector + namevectorlen >= stats.size:
72         raise IOError('String overruns stats segment')
73     return stats.statseg[namevector:namevector+namevectorlen-1].decode('ascii')
74
75
76 class StatsVector:
77     '''A class representing a VPP vector'''
78
79     def __init__(self, stats, ptr, fmt):
80         self.vec_start = ptr - stats.base
81         self.vec_len = get_vec_len(stats, ptr - stats.base)
82         self.struct = Struct(fmt)
83         self.fmtlen = len(fmt)
84         self.elementsize = self.struct.size
85         self.statseg = stats.statseg
86         self.stats = stats
87
88         if self.vec_start + self.vec_len * self.elementsize >= stats.size:
89             raise IOError('Vector overruns stats segment')
90
91     def __iter__(self):
92         with self.stats.lock:
93             return self.struct.iter_unpack(self.statseg[self.vec_start:self.vec_start +
94                                                         self.elementsize*self.vec_len])
95
96     def __getitem__(self, index):
97         if index > self.vec_len:
98             raise IOError('Index beyond end of vector')
99         with self.stats.lock:
100             if self.fmtlen == 1:
101                 return self.struct.unpack_from(self.statseg, self.vec_start +
102                                                (index * self.elementsize))[0]
103             return self.struct.unpack_from(self.statseg, self.vec_start +
104                                            (index * self.elementsize))
105
106 class VPPStats():
107     '''Main class implementing Python access to the VPP statistics segment'''
108     # pylint: disable=too-many-instance-attributes
109     shared_headerfmt = Struct('QPQQPP')
110     default_socketname = '/run/vpp/stats.sock'
111
112     def __init__(self, socketname=default_socketname, timeout=10):
113         self.socketname = socketname
114         self.timeout = timeout
115         self.directory = {}
116         self.lock = StatsLock(self)
117         self.connected = False
118         self.size = 0
119         self.last_epoch = 0
120         self.error_vectors = 0
121         self.statseg = 0
122
123     def connect(self):
124         '''Connect to stats segment'''
125         if self.connected:
126             return
127         sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
128         sock.connect(self.socketname)
129
130         mfd = recv_fd(sock)
131         sock.close()
132
133         stat_result = os.fstat(mfd)
134         self.statseg = mmap.mmap(mfd, stat_result.st_size, mmap.PROT_READ, mmap.MAP_SHARED)
135         os.close(mfd)
136
137         self.size = stat_result.st_size
138         if self.version != 2:
139             raise Exception('Incompatbile stat segment version {}'
140                             .format(self.version))
141
142         self.refresh()
143         self.connected = True
144
145     def disconnect(self):
146         '''Disconnect from stats segment'''
147         if self.connected:
148             self.statseg.close()
149             self.connected = False
150
151     @property
152     def version(self):
153         '''Get version of stats segment'''
154         return self.shared_headerfmt.unpack_from(self.statseg)[0]
155
156     @property
157     def base(self):
158         '''Get base pointer of stats segment'''
159         return self.shared_headerfmt.unpack_from(self.statseg)[1]
160
161     @property
162     def epoch(self):
163         '''Get current epoch value from stats segment'''
164         return self.shared_headerfmt.unpack_from(self.statseg)[2]
165
166     @property
167     def in_progress(self):
168         '''Get value of in_progress from stats segment'''
169         return self.shared_headerfmt.unpack_from(self.statseg)[3]
170
171     @property
172     def directory_vector(self):
173         '''Get pointer of directory vector'''
174         return self.shared_headerfmt.unpack_from(self.statseg)[4]
175
176     @property
177     def error_vector(self):
178         '''Get pointer of error vector'''
179         return self.shared_headerfmt.unpack_from(self.statseg)[5]
180
181     elementfmt = 'IQ128s'
182
183     def refresh(self, blocking=True):
184         '''Refresh directory vector cache (epoch changed)'''
185         directory = {}
186         directory_by_idx = {}
187         while True:
188             try:
189                 with self.lock:
190                     self.last_epoch = self.epoch
191                     for i, direntry in enumerate(StatsVector(self, self.directory_vector, self.elementfmt)):
192                         path_raw = direntry[2].find(b'\x00')
193                         path = direntry[2][:path_raw].decode('ascii')
194                         directory[path] = StatsEntry(direntry[0], direntry[1])
195                         directory_by_idx[i] = path
196                     self.directory = directory
197                     self.directory_by_idx = directory_by_idx
198
199                     # Cache the error index vectors
200                     self.error_vectors = []
201                     for threads in StatsVector(self, self.error_vector, 'P'):
202                         self.error_vectors.append(StatsVector(self, threads[0], 'Q'))
203                     return
204             except IOError:
205                 if not blocking:
206                     raise
207
208     def __getitem__(self, item, blocking=True):
209         if not self.connected:
210             self.connect()
211         while True:
212             try:
213                 if self.last_epoch != self.epoch:
214                     self.refresh(blocking)
215                 with self.lock:
216                     return self.directory[item].get_counter(self)
217             except IOError:
218                 if not blocking:
219                     raise
220
221     def __iter__(self):
222         return iter(self.directory.items())
223
224     def set_errors(self, blocking=True):
225         '''Return dictionary of error counters > 0'''
226         if not self.connected:
227             self.connect()
228
229         errors = {k:v for k, v in self.directory.items() if k.startswith("/err/")}
230         result = {}
231         while True:
232             try:
233                 if self.last_epoch != self.epoch:
234                     self.refresh(blocking)
235                 with self.lock:
236                     for k, entry in errors.items():
237                         total = 0
238                         i = entry.value
239                         for per_thread in self.error_vectors:
240                             total += per_thread[i]
241                         if total:
242                             result[k] = total
243                     return result
244             except IOError:
245                 if not blocking:
246                     raise
247
248     def set_errors_str(self, blocking=True):
249         '''Return all errors counters > 0 pretty printed'''
250         error_string = ['ERRORS:']
251         error_counters = self.set_errors(blocking)
252         for k in sorted(error_counters):
253             error_string.append('{:<60}{:>10}'.format(k, error_counters[k]))
254         return '%s\n' % '\n'.join(error_string)
255
256     def get_counter(self, name, blocking=True):
257         '''Alternative call to __getitem__'''
258         return self.__getitem__(name, blocking)
259
260     def get_err_counter(self, name, blocking=True):
261         '''Return a single value (sum of all threads)'''
262         if not self.connected:
263             self.connect()
264         if name.startswith("/err/"):
265             while True:
266                 try:
267                     if self.last_epoch != self.epoch:
268                         self.refresh(blocking)
269                     with self.lock:
270                         return sum(self.directory[name].get_counter(self))
271                 except IOError:
272                     if not blocking:
273                         raise
274
275     def ls(self, patterns):
276         '''Returns list of counters matching pattern'''
277         # pylint: disable=invalid-name
278         if not self.connected:
279             self.connect()
280         if not isinstance(patterns, list):
281             patterns = [patterns]
282         regex = [re.compile(i) for i in patterns]
283         if self.last_epoch != self.epoch:
284             self.refresh()
285
286         return [k for k, v in self.directory.items()
287                 if any(re.match(pattern, k) for pattern in regex)]
288
289     def dump(self, counters, blocking=True):
290         '''Given a list of counters return a dictionary of results'''
291         if not self.connected:
292             self.connect()
293         result = {}
294         for cnt in counters:
295             result[cnt] = self.__getitem__(cnt,blocking)
296         return result
297
298 class StatsLock():
299     '''Stat segment optimistic locking'''
300
301     def __init__(self, stats):
302         self.stats = stats
303         self.epoch = 0
304
305     def __enter__(self):
306         acquired = self.acquire(blocking=True)
307         assert acquired, "Lock wasn't acquired, but blocking=True"
308         return self
309
310     def __exit__(self, exc_type=None, exc_value=None, traceback=None):
311         self.release()
312
313     def acquire(self, blocking=True, timeout=-1):
314         '''Acquire the lock. Await in progress to go false. Record epoch.'''
315         self.epoch = self.stats.epoch
316         if timeout > 0:
317             start = time.monotonic()
318         while self.stats.in_progress:
319             if not blocking:
320                 time.sleep(0.01)
321                 if timeout > 0:
322                     if start + time.monotonic() > timeout:
323                         return False
324         return True
325
326     def release(self):
327         '''Check if data read while locked is valid'''
328         if self.stats.in_progress or self.stats.epoch != self.epoch:
329             raise IOError('Optimistic lock failed, retry')
330
331     def locked(self):
332         '''Not used'''
333
334
335 class StatsCombinedList(list):
336     '''Column slicing for Combined counters list'''
337
338     def __getitem__(self, item):
339         '''Supports partial numpy style 2d support. Slice by column [:,1]'''
340         if isinstance(item, int):
341             return list.__getitem__(self, item)
342         return CombinedList([row[item[1]] for row in self])
343
344 class CombinedList(list):
345     '''Combined Counters 2-dimensional by thread by index of packets/octets'''
346
347     def packets(self):
348         '''Return column (2nd dimension). Packets for all threads'''
349         return [pair[0] for pair in self]
350
351     def octets(self):
352         '''Return column (2nd dimension). Octets for all threads'''
353         return [pair[1] for pair in self]
354
355     def sum_packets(self):
356         '''Return column (2nd dimension). Sum of all packets for all threads'''
357         return sum(self.packets())
358
359     def sum_octets(self):
360         '''Return column (2nd dimension). Sum of all octets for all threads'''
361         return sum(self.octets())
362
363 class StatsTuple(tuple):
364     '''A Combined vector tuple (packets, octets)'''
365     def __init__(self, data):
366         self.dictionary = {'packets': data[0], 'bytes': data[1]}
367         super().__init__()
368
369     def __repr__(self):
370         return dict.__repr__(self.dictionary)
371
372     def __getitem__(self, item):
373         if isinstance(item, int):
374             return tuple.__getitem__(self, item)
375         if item == 'packets':
376             return tuple.__getitem__(self, 0)
377         return tuple.__getitem__(self, 1)
378
379 class StatsSimpleList(list):
380     '''Simple Counters 2-dimensional by thread by index of packets'''
381
382     def __getitem__(self, item):
383         '''Supports partial numpy style 2d support. Slice by column [:,1]'''
384         if isinstance(item, int):
385             return list.__getitem__(self, item)
386         return SimpleList([row[item[1]] for row in self])
387
388 class SimpleList(list):
389     '''Simple counter'''
390
391     def sum(self):
392         '''Sum the vector'''
393         return sum(self)
394
395 class StatsEntry():
396     '''An individual stats entry'''
397     # pylint: disable=unused-argument,no-self-use
398
399     def __init__(self, stattype, statvalue):
400         self.type = stattype
401         self.value = statvalue
402
403         if stattype == 1:
404             self.function = self.scalar
405         elif stattype == 2:
406             self.function = self.simple
407         elif stattype == 3:
408             self.function = self.combined
409         elif stattype == 4:
410             self.function = self.error
411         elif stattype == 5:
412             self.function = self.name
413         elif stattype == 7:
414             self.function = self.symlink
415         else:
416             self.function = self.illegal
417
418     def illegal(self, stats):
419         '''Invalid or unknown counter type'''
420         return None
421
422     def scalar(self, stats):
423         '''Scalar counter'''
424         return self.value
425
426     def simple(self, stats):
427         '''Simple counter'''
428         counter = StatsSimpleList()
429         for threads in StatsVector(stats, self.value, 'P'):
430             clist = [v[0] for v in StatsVector(stats, threads[0], 'Q')]
431             counter.append(clist)
432         return counter
433
434     def combined(self, stats):
435         '''Combined counter'''
436         counter = StatsCombinedList()
437         for threads in StatsVector(stats, self.value, 'P'):
438             clist = [StatsTuple(cnt) for cnt in StatsVector(stats, threads[0], 'QQ')]
439             counter.append(clist)
440         return counter
441
442     def error(self, stats):
443         '''Error counter'''
444         counter = SimpleList()
445         for clist in stats.error_vectors:
446             counter.append(clist[self.value])
447         return counter
448
449     def name(self, stats):
450         '''Name counter'''
451         counter = []
452         for name in StatsVector(stats, self.value, 'P'):
453             if name[0]:
454                 counter.append(get_string(stats, name[0]))
455         return counter
456
457     SYMLINK_FMT1 = Struct('II')
458     SYMLINK_FMT2 = Struct('Q')
459     def symlink(self, stats):
460         '''Symlink counter'''
461         b = self.SYMLINK_FMT2.pack(self.value)
462         index1, index2 = self.SYMLINK_FMT1.unpack(b)
463         name = stats.directory_by_idx[index1]
464         return stats[name][:,index2]
465
466     def get_counter(self, stats):
467         '''Return a list of counters'''
468         if stats:
469             return self.function(stats)
470
471 class TestStats(unittest.TestCase):
472     '''Basic statseg tests'''
473
474     def setUp(self):
475         '''Connect to statseg'''
476         self.stat = VPPStats()
477         self.stat.connect()
478         self.profile = cProfile.Profile()
479         self.profile.enable()
480
481     def tearDown(self):
482         '''Disconnect from statseg'''
483         self.stat.disconnect()
484         profile = Stats(self.profile)
485         profile.strip_dirs()
486         profile.sort_stats('cumtime')
487         profile.print_stats()
488         print("\n--->>>")
489
490     def test_counters(self):
491         '''Test access to statseg'''
492
493         print('/err/abf-input-ip4/missed', self.stat['/err/abf-input-ip4/missed'])
494         print('/sys/heartbeat', self.stat['/sys/heartbeat'])
495         print('/if/names', self.stat['/if/names'])
496         print('/if/rx-miss', self.stat['/if/rx-miss'])
497         print('/if/rx-miss', self.stat['/if/rx-miss'][1])
498         print('/nat44-ed/out2in/slowpath/drops', self.stat['/nat44-ed/out2in/slowpath/drops'])
499         print('Set Errors', self.stat.set_errors())
500         with self.assertRaises(KeyError):
501             print('NO SUCH COUNTER', self.stat['foobar'])
502         print('/if/rx', self.stat.get_counter('/if/rx'))
503         print('/err/ethernet-input/no error',
504               self.stat.get_err_counter('/err/ethernet-input/no error'))
505
506     def test_column(self):
507         '''Test column slicing'''
508
509         print('/if/rx-miss', self.stat['/if/rx-miss'])
510         print('/if/rx', self.stat['/if/rx'])  # All interfaces for thread #1
511         print('/if/rx thread #1', self.stat['/if/rx'][0])  # All interfaces for thread #1
512         print('/if/rx thread #1, interface #1',
513               self.stat['/if/rx'][0][1])  # All interfaces for thread #1
514         print('/if/rx if_index #1', self.stat['/if/rx'][:, 1])
515         print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].packets())
516         print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].sum_packets())
517         print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].octets())
518         print('/if/rx-miss', self.stat['/if/rx-miss'])
519         print('/if/rx-miss if_index #1 packets', self.stat['/if/rx-miss'][:, 1].sum())
520         print('/if/rx if_index #1 packets', self.stat['/if/rx'][0][1]['packets'])
521
522     def test_error(self):
523         '''Test the error vector'''
524
525         print('/err/ethernet-input', self.stat['/err/ethernet-input/no error'])
526         print('/err/nat44-ei-ha/pkts-processed', self.stat['/err/nat44-ei-ha/pkts-processed'])
527         print('/err/ethernet-input', self.stat.get_err_counter('/err/ethernet-input/no error'))
528         print('/err/ethernet-input', self.stat['/err/ethernet-input/no error'].sum())
529
530     def test_nat44(self):
531         '''Test the nat counters'''
532
533         print('/nat44-ei/ha/del-event-recv', self.stat['/nat44-ei/ha/del-event-recv'])
534         print('/err/nat44-ei-ha/pkts-processed', self.stat['/err/nat44-ei-ha/pkts-processed'].sum())
535
536     def test_legacy(self):
537         '''Legacy interface'''
538         directory = self.stat.ls(["^/if", "/err/ip4-input", "/sys/node/ip4-input"])
539         data = self.stat.dump(directory)
540         print(data)
541         print('Looking up sys node')
542         directory = self.stat.ls(["^/sys/node"])
543         print('Dumping sys node')
544         data = self.stat.dump(directory)
545         print(data)
546         directory = self.stat.ls(["^/foobar"])
547         data = self.stat.dump(directory)
548         print(data)
549
550     def test_sys_nodes(self):
551         '''Test /sys/nodes'''
552         counters = self.stat.ls('^/sys/node')
553         print('COUNTERS:', counters)
554         print('/sys/node', self.stat.dump(counters))
555         print('/net/route/to', self.stat['/net/route/to'])
556
557     def test_symlink(self):
558         '''Symbolic links'''
559         print('/interface/local0/rx', self.stat['/interfaces/local0/rx'])
560         print('/sys/nodes/unix-epoll-input', self.stat['/nodes/unix-epoll-input/calls'])
561
562 if __name__ == '__main__':
563     import cProfile
564     from pstats import Stats
565
566     unittest.main()