tests: Add a socket timeout
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_stats.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 """
18 This module implement Python access to the VPP statistics segment. It
19 accesses the data structures directly in shared memory.
20 VPP uses optimistic locking, so data structures may change underneath
21 us while we are reading. Data is copied out and it's important to
22 spend as little time as possible "holding the lock".
23
24 Counters are stored in VPP as a two dimensional array.
25 Index by thread and index (typically sw_if_index).
26 Simple counters count only packets, Combined counters count packets
27 and octets.
28
29 Counters can be accessed in either dimension.
30 stat['/if/rx'] - returns 2D lists
31 stat['/if/rx'][0] - returns counters for all interfaces for thread 0
32 stat['/if/rx'][0][1] - returns counter for interface 1 on thread 0
33 stat['/if/rx'][0][1]['packets'] - returns the packet counter
34                                   for interface 1 on thread 0
35 stat['/if/rx'][:, 1] - returns the counters for interface 1 on all threads
36 stat['/if/rx'][:, 1].packets() - returns the packet counters for
37                                  interface 1 on all threads
38 stat['/if/rx'][:, 1].sum_packets() - returns the sum of packet counters for
39                                      interface 1 on all threads
40 stat['/if/rx-miss'][:, 1].sum() - returns the sum of packet counters for
41                                   interface 1 on all threads for simple counters
42 """
43
44 import os
45 import socket
46 import array
47 import mmap
48 from struct import Struct
49 import time
50 import unittest
51 import re
52
53
54 def recv_fd(sock):
55     """Get file descriptor for memory map"""
56     fds = array.array("i")  # Array of ints
57     _, ancdata, _, _ = sock.recvmsg(0, socket.CMSG_SPACE(4))
58     for cmsg_level, cmsg_type, cmsg_data in ancdata:
59         if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS:
60             fds.frombytes(cmsg_data[: len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
61     return list(fds)[0]
62
63
64 VEC_LEN_FMT = Struct("I")
65
66
67 def get_vec_len(stats, vector_offset):
68     """Equivalent to VPP vec_len()"""
69     return VEC_LEN_FMT.unpack_from(stats.statseg, vector_offset - 8)[0]
70
71
72 def get_string(stats, ptr):
73     """Get a string from a VPP vector"""
74     namevector = ptr - stats.base
75     namevectorlen = get_vec_len(stats, namevector)
76     if namevector + namevectorlen >= stats.size:
77         raise IOError("String overruns stats segment")
78     return stats.statseg[namevector : namevector + namevectorlen - 1].decode("ascii")
79
80
81 class StatsVector:
82     """A class representing a VPP vector"""
83
84     def __init__(self, stats, ptr, fmt):
85         self.vec_start = ptr - stats.base
86         self.vec_len = get_vec_len(stats, ptr - stats.base)
87         self.struct = Struct(fmt)
88         self.fmtlen = len(fmt)
89         self.elementsize = self.struct.size
90         self.statseg = stats.statseg
91         self.stats = stats
92
93         if self.vec_start + self.vec_len * self.elementsize >= stats.size:
94             raise IOError("Vector overruns stats segment")
95
96     def __iter__(self):
97         with self.stats.lock:
98             return self.struct.iter_unpack(
99                 self.statseg[
100                     self.vec_start : self.vec_start + self.elementsize * self.vec_len
101                 ]
102             )
103
104     def __getitem__(self, index):
105         if index > self.vec_len:
106             raise IOError("Index beyond end of vector")
107         with self.stats.lock:
108             if self.fmtlen == 1:
109                 return self.struct.unpack_from(
110                     self.statseg, self.vec_start + (index * self.elementsize)
111                 )[0]
112             return self.struct.unpack_from(
113                 self.statseg, self.vec_start + (index * self.elementsize)
114             )
115
116
117 class VPPStats:
118     """Main class implementing Python access to the VPP statistics segment"""
119
120     # pylint: disable=too-many-instance-attributes
121     shared_headerfmt = Struct("QPQQPP")
122     default_socketname = "/run/vpp/stats.sock"
123
124     def __init__(self, socketname=default_socketname, timeout=10):
125         self.socketname = socketname
126         self.timeout = timeout
127         self.directory = {}
128         self.lock = StatsLock(self)
129         self.connected = False
130         self.size = 0
131         self.last_epoch = 0
132         self.statseg = 0
133
134     def connect(self):
135         """Connect to stats segment"""
136         if self.connected:
137             return
138         sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
139
140         # Our connect races the corresponding recv_fds call in VPP, if we beat
141         # VPP then we will try (unsuccessfully) to receive file descriptors and
142         # will have gone away before VPP can respond to our connect.  A short
143         # timeout here stops this error occurring.
144         sock.settimeout(1)
145         sock.connect(self.socketname)
146
147         mfd = recv_fd(sock)
148         sock.close()
149
150         stat_result = os.fstat(mfd)
151         self.statseg = mmap.mmap(
152             mfd, stat_result.st_size, mmap.PROT_READ, mmap.MAP_SHARED
153         )
154         os.close(mfd)
155
156         self.size = stat_result.st_size
157         if self.version != 2:
158             raise Exception("Incompatbile stat segment version {}".format(self.version))
159
160         self.refresh()
161         self.connected = True
162
163     def disconnect(self):
164         """Disconnect from stats segment"""
165         if self.connected:
166             self.statseg.close()
167             self.connected = False
168
169     @property
170     def version(self):
171         """Get version of stats segment"""
172         return self.shared_headerfmt.unpack_from(self.statseg)[0]
173
174     @property
175     def base(self):
176         """Get base pointer of stats segment"""
177         return self.shared_headerfmt.unpack_from(self.statseg)[1]
178
179     @property
180     def epoch(self):
181         """Get current epoch value from stats segment"""
182         return self.shared_headerfmt.unpack_from(self.statseg)[2]
183
184     @property
185     def in_progress(self):
186         """Get value of in_progress from stats segment"""
187         return self.shared_headerfmt.unpack_from(self.statseg)[3]
188
189     @property
190     def directory_vector(self):
191         """Get pointer of directory vector"""
192         return self.shared_headerfmt.unpack_from(self.statseg)[4]
193
194     elementfmt = "IQ128s"
195
196     def refresh(self, blocking=True):
197         """Refresh directory vector cache (epoch changed)"""
198         directory = {}
199         directory_by_idx = {}
200         while True:
201             try:
202                 with self.lock:
203                     self.last_epoch = self.epoch
204                     for i, direntry in enumerate(
205                         StatsVector(self, self.directory_vector, self.elementfmt)
206                     ):
207                         path_raw = direntry[2].find(b"\x00")
208                         path = direntry[2][:path_raw].decode("ascii")
209                         directory[path] = StatsEntry(direntry[0], direntry[1])
210                         directory_by_idx[i] = path
211                     self.directory = directory
212                     self.directory_by_idx = directory_by_idx
213                     return
214             except IOError:
215                 if not blocking:
216                     raise
217
218     def __getitem__(self, item, blocking=True):
219         if not self.connected:
220             self.connect()
221         while True:
222             try:
223                 if self.last_epoch != self.epoch:
224                     self.refresh(blocking)
225                 with self.lock:
226                     return self.directory[item].get_counter(self)
227             except IOError:
228                 if not blocking:
229                     raise
230
231     def __iter__(self):
232         return iter(self.directory.items())
233
234     def set_errors(self, blocking=True):
235         """Return dictionary of error counters > 0"""
236         if not self.connected:
237             self.connect()
238
239         errors = {k: v for k, v in self.directory.items() if k.startswith("/err/")}
240         result = {}
241         for k in errors:
242             try:
243                 total = self[k].sum()
244                 if total:
245                     result[k] = total
246             except KeyError:
247                 pass
248         return result
249
250     def set_errors_str(self, blocking=True):
251         """Return all errors counters > 0 pretty printed"""
252         error_string = ["ERRORS:"]
253         error_counters = self.set_errors(blocking)
254         for k in sorted(error_counters):
255             error_string.append("{:<60}{:>10}".format(k, error_counters[k]))
256         return "%s\n" % "\n".join(error_string)
257
258     def get_counter(self, name, blocking=True):
259         """Alternative call to __getitem__"""
260         return self.__getitem__(name, blocking)
261
262     def get_err_counter(self, name, blocking=True):
263         """Alternative call to __getitem__"""
264         return self.__getitem__(name, blocking).sum()
265
266     def ls(self, patterns):
267         """Returns list of counters matching pattern"""
268         # pylint: disable=invalid-name
269         if not self.connected:
270             self.connect()
271         if not isinstance(patterns, list):
272             patterns = [patterns]
273         regex = [re.compile(i) for i in patterns]
274         if self.last_epoch != self.epoch:
275             self.refresh()
276
277         return [
278             k
279             for k, v in self.directory.items()
280             if any(re.match(pattern, k) for pattern in regex)
281         ]
282
283     def dump(self, counters, blocking=True):
284         """Given a list of counters return a dictionary of results"""
285         if not self.connected:
286             self.connect()
287         result = {}
288         for cnt in counters:
289             result[cnt] = self.__getitem__(cnt, blocking)
290         return result
291
292
293 class StatsLock:
294     """Stat segment optimistic locking"""
295
296     def __init__(self, stats):
297         self.stats = stats
298         self.epoch = 0
299
300     def __enter__(self):
301         acquired = self.acquire(blocking=True)
302         assert acquired, "Lock wasn't acquired, but blocking=True"
303         return self
304
305     def __exit__(self, exc_type=None, exc_value=None, traceback=None):
306         self.release()
307
308     def acquire(self, blocking=True, timeout=-1):
309         """Acquire the lock. Await in progress to go false. Record epoch."""
310         self.epoch = self.stats.epoch
311         if timeout > 0:
312             start = time.monotonic()
313         while self.stats.in_progress:
314             if not blocking:
315                 time.sleep(0.01)
316                 if timeout > 0:
317                     if start + time.monotonic() > timeout:
318                         return False
319         return True
320
321     def release(self):
322         """Check if data read while locked is valid"""
323         if self.stats.in_progress or self.stats.epoch != self.epoch:
324             raise IOError("Optimistic lock failed, retry")
325
326     def locked(self):
327         """Not used"""
328
329
330 class StatsCombinedList(list):
331     """Column slicing for Combined counters list"""
332
333     def __getitem__(self, item):
334         """Supports partial numpy style 2d support. Slice by column [:,1]"""
335         if isinstance(item, int):
336             return list.__getitem__(self, item)
337         return CombinedList([row[item[1]] for row in self])
338
339
340 class CombinedList(list):
341     """Combined Counters 2-dimensional by thread by index of packets/octets"""
342
343     def packets(self):
344         """Return column (2nd dimension). Packets for all threads"""
345         return [pair[0] for pair in self]
346
347     def octets(self):
348         """Return column (2nd dimension). Octets for all threads"""
349         return [pair[1] for pair in self]
350
351     def sum_packets(self):
352         """Return column (2nd dimension). Sum of all packets for all threads"""
353         return sum(self.packets())
354
355     def sum_octets(self):
356         """Return column (2nd dimension). Sum of all octets for all threads"""
357         return sum(self.octets())
358
359
360 class StatsTuple(tuple):
361     """A Combined vector tuple (packets, octets)"""
362
363     def __init__(self, data):
364         self.dictionary = {"packets": data[0], "bytes": data[1]}
365         super().__init__()
366
367     def __repr__(self):
368         return dict.__repr__(self.dictionary)
369
370     def __getitem__(self, item):
371         if isinstance(item, int):
372             return tuple.__getitem__(self, item)
373         if item == "packets":
374             return tuple.__getitem__(self, 0)
375         return tuple.__getitem__(self, 1)
376
377
378 class StatsSimpleList(list):
379     """Simple Counters 2-dimensional by thread by index of packets"""
380
381     def __getitem__(self, item):
382         """Supports partial numpy style 2d support. Slice by column [:,1]"""
383         if isinstance(item, int):
384             return list.__getitem__(self, item)
385         return SimpleList([row[item[1]] for row in self])
386
387
388 class SimpleList(list):
389     """Simple counter"""
390
391     def sum(self):
392         """Sum the vector"""
393         return sum(self)
394
395
396 class StatsEntry:
397     """An individual stats entry"""
398
399     # pylint: disable=unused-argument,no-self-use
400
401     def __init__(self, stattype, statvalue):
402         self.type = stattype
403         self.value = statvalue
404
405         if stattype == 1:
406             self.function = self.scalar
407         elif stattype == 2:
408             self.function = self.simple
409         elif stattype == 3:
410             self.function = self.combined
411         elif stattype == 4:
412             self.function = self.name
413         elif stattype == 6:
414             self.function = self.symlink
415         else:
416             self.function = self.illegal
417
418     def illegal(self, stats):
419         """Invalid or unknown counter type"""
420         return None
421
422     def scalar(self, stats):
423         """Scalar counter"""
424         return self.value
425
426     def simple(self, stats):
427         """Simple counter"""
428         counter = StatsSimpleList()
429         for threads in StatsVector(stats, self.value, "P"):
430             clist = [v[0] for v in StatsVector(stats, threads[0], "Q")]
431             counter.append(clist)
432         return counter
433
434     def combined(self, stats):
435         """Combined counter"""
436         counter = StatsCombinedList()
437         for threads in StatsVector(stats, self.value, "P"):
438             clist = [StatsTuple(cnt) for cnt in StatsVector(stats, threads[0], "QQ")]
439             counter.append(clist)
440         return counter
441
442     def name(self, stats):
443         """Name counter"""
444         counter = []
445         for name in StatsVector(stats, self.value, "P"):
446             if name[0]:
447                 counter.append(get_string(stats, name[0]))
448         return counter
449
450     SYMLINK_FMT1 = Struct("II")
451     SYMLINK_FMT2 = Struct("Q")
452
453     def symlink(self, stats):
454         """Symlink counter"""
455         b = self.SYMLINK_FMT2.pack(self.value)
456         index1, index2 = self.SYMLINK_FMT1.unpack(b)
457         name = stats.directory_by_idx[index1]
458         return stats[name][:, index2]
459
460     def get_counter(self, stats):
461         """Return a list of counters"""
462         if stats:
463             return self.function(stats)
464
465
466 class TestStats(unittest.TestCase):
467     """Basic statseg tests"""
468
469     def setUp(self):
470         """Connect to statseg"""
471         self.stat = VPPStats()
472         self.stat.connect()
473         self.profile = cProfile.Profile()
474         self.profile.enable()
475
476     def tearDown(self):
477         """Disconnect from statseg"""
478         self.stat.disconnect()
479         profile = Stats(self.profile)
480         profile.strip_dirs()
481         profile.sort_stats("cumtime")
482         profile.print_stats()
483         print("\n--->>>")
484
485     def test_counters(self):
486         """Test access to statseg"""
487
488         print("/err/abf-input-ip4/missed", self.stat["/err/abf-input-ip4/missed"])
489         print("/sys/heartbeat", self.stat["/sys/heartbeat"])
490         print("/if/names", self.stat["/if/names"])
491         print("/if/rx-miss", self.stat["/if/rx-miss"])
492         print("/if/rx-miss", self.stat["/if/rx-miss"][1])
493         print(
494             "/nat44-ed/out2in/slowpath/drops",
495             self.stat["/nat44-ed/out2in/slowpath/drops"],
496         )
497         with self.assertRaises(KeyError):
498             print("NO SUCH COUNTER", self.stat["foobar"])
499         print("/if/rx", self.stat.get_counter("/if/rx"))
500         print(
501             "/err/ethernet-input/no_error",
502             self.stat.get_counter("/err/ethernet-input/no_error"),
503         )
504
505     def test_column(self):
506         """Test column slicing"""
507
508         print("/if/rx-miss", self.stat["/if/rx-miss"])
509         print("/if/rx", self.stat["/if/rx"])  # All interfaces for thread #1
510         print(
511             "/if/rx thread #1", self.stat["/if/rx"][0]
512         )  # All interfaces for thread #1
513         print(
514             "/if/rx thread #1, interface #1", self.stat["/if/rx"][0][1]
515         )  # All interfaces for thread #1
516         print("/if/rx if_index #1", self.stat["/if/rx"][:, 1])
517         print("/if/rx if_index #1 packets", self.stat["/if/rx"][:, 1].packets())
518         print("/if/rx if_index #1 packets", self.stat["/if/rx"][:, 1].sum_packets())
519         print("/if/rx if_index #1 packets", self.stat["/if/rx"][:, 1].octets())
520         print("/if/rx-miss", self.stat["/if/rx-miss"])
521         print("/if/rx-miss if_index #1 packets", self.stat["/if/rx-miss"][:, 1].sum())
522         print("/if/rx if_index #1 packets", self.stat["/if/rx"][0][1]["packets"])
523
524     def test_nat44(self):
525         """Test the nat counters"""
526
527         print("/nat44-ei/ha/del-event-recv", self.stat["/nat44-ei/ha/del-event-recv"])
528         print(
529             "/err/nat44-ei-ha/pkts-processed",
530             self.stat["/err/nat44-ei-ha/pkts-processed"].sum(),
531         )
532
533     def test_legacy(self):
534         """Legacy interface"""
535         directory = self.stat.ls(["^/if", "/err/ip4-input", "/sys/node/ip4-input"])
536         data = self.stat.dump(directory)
537         print(data)
538         print("Looking up sys node")
539         directory = self.stat.ls(["^/sys/node"])
540         print("Dumping sys node")
541         data = self.stat.dump(directory)
542         print(data)
543         directory = self.stat.ls(["^/foobar"])
544         data = self.stat.dump(directory)
545         print(data)
546
547     def test_sys_nodes(self):
548         """Test /sys/nodes"""
549         counters = self.stat.ls("^/sys/node")
550         print("COUNTERS:", counters)
551         print("/sys/node", self.stat.dump(counters))
552         print("/net/route/to", self.stat["/net/route/to"])
553
554     def test_symlink(self):
555         """Symbolic links"""
556         print("/interface/local0/rx", self.stat["/interfaces/local0/rx"])
557         print("/sys/nodes/unix-epoll-input", self.stat["/nodes/unix-epoll-input/calls"])
558
559
560 if __name__ == "__main__":
561     import cProfile
562     from pstats import Stats
563
564     unittest.main()