1e5d23e59b724676129b14da8444fe5bfed32e82
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import ctypes
20 import ipaddress
21 import sys
22 import multiprocessing as mp
23 import os
24 import queue
25 import logging
26 import functools
27 import json
28 import threading
29 import fnmatch
30 import weakref
31 import atexit
32 import time
33 from .vpp_format import verify_enum_hint
34 from .vpp_serializer import VPPType, VPPEnumType, VPPEnumFlagType, VPPUnionType
35 from .vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
36
37 try:
38     import VppTransport
39 except ModuleNotFoundError:
40
41     class V:
42         """placeholder for VppTransport as the implementation is dependent on
43         VPPAPIClient's initialization values
44         """
45
46     VppTransport = V
47
48 from .vpp_transport_socket import VppTransport
49
50 logger = logging.getLogger("vpp_papi")
51 logger.addHandler(logging.NullHandler())
52
53 __all__ = (
54     "FuncWrapper",
55     "VppApiDynamicMethodHolder",
56     "VppEnum",
57     "VppEnumType",
58     "VppEnumFlag",
59     "VPPIOError",
60     "VPPRuntimeError",
61     "VPPValueError",
62     "VPPApiClient",
63 )
64
65
66 def metaclass(metaclass):
67     @functools.wraps(metaclass)
68     def wrapper(cls):
69         return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
70
71     return wrapper
72
73
74 class VppEnumType(type):
75     def __getattr__(cls, name):
76         t = vpp_get_type(name)
77         return t.enum
78
79
80 @metaclass(VppEnumType)
81 class VppEnum:
82     pass
83
84
85 @metaclass(VppEnumType)
86 class VppEnumFlag:
87     pass
88
89
90 def vpp_atexit(vpp_weakref):
91     """Clean up VPP connection on shutdown."""
92     vpp_instance = vpp_weakref()
93     if vpp_instance and vpp_instance.transport.connected:
94         logger.debug("Cleaning up VPP on exit")
95         vpp_instance.disconnect()
96
97
98 def add_convenience_methods():
99     # provide convenience methods to IP[46]Address.vapi_af
100     def _vapi_af(self):
101         if 6 == self._version:
102             return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
103         if 4 == self._version:
104             return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
105         raise ValueError("Invalid _version.")
106
107     def _vapi_af_name(self):
108         if 6 == self._version:
109             return "ip6"
110         if 4 == self._version:
111             return "ip4"
112         raise ValueError("Invalid _version.")
113
114     ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
115     ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
116
117
118 class VppApiDynamicMethodHolder:
119     pass
120
121
122 class FuncWrapper:
123     def __init__(self, func):
124         self._func = func
125         self.__name__ = func.__name__
126         self.__doc__ = func.__doc__
127
128     def __call__(self, **kwargs):
129         return self._func(**kwargs)
130
131     def __repr__(self):
132         return "<FuncWrapper(func=<%s(%s)>)>" % (self.__name__, self.__doc__)
133
134
135 class VPPApiError(Exception):
136     pass
137
138
139 class VPPNotImplementedError(NotImplementedError):
140     pass
141
142
143 class VPPIOError(IOError):
144     pass
145
146
147 class VPPRuntimeError(RuntimeError):
148     pass
149
150
151 class VPPValueError(ValueError):
152     pass
153
154
155 class VPPApiJSONFiles:
156     @classmethod
157     def find_api_dir(cls, dirs):
158         """Attempt to find the best directory in which API definition
159         files may reside. If the value VPP_API_DIR exists in the environment
160         then it is first on the search list. If we're inside a recognized
161         location in a VPP source tree (src/scripts and src/vpp-api/python)
162         then entries from there to the likely locations in build-root are
163         added. Finally the location used by system packages is added.
164
165         :returns: A single directory name, or None if no such directory
166             could be found.
167         """
168
169         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
170         # in which case, plot a course to likely places in the src tree
171         import __main__ as main
172
173         if hasattr(main, "__file__"):
174             # get the path of the calling script
175             localdir = os.path.dirname(os.path.realpath(main.__file__))
176         else:
177             # use cwd if there is no calling script
178             localdir = os.getcwd()
179         localdir_s = localdir.split(os.path.sep)
180
181         def dmatch(dir):
182             """Match dir against right-hand components of the script dir"""
183             d = dir.split("/")  # param 'dir' assumes a / separator
184             length = len(d)
185             return len(localdir_s) > length and localdir_s[-length:] == d
186
187         def sdir(srcdir, variant):
188             """Build a path from srcdir to the staged API files of
189             'variant'  (typically '' or '_debug')"""
190             # Since 'core' and 'plugin' files are staged
191             # in separate directories, we target the parent dir.
192             return os.path.sep.join(
193                 (
194                     srcdir,
195                     "build-root",
196                     "install-vpp%s-native" % variant,
197                     "vpp",
198                     "share",
199                     "vpp",
200                     "api",
201                 )
202             )
203
204         srcdir = None
205         if dmatch("src/scripts"):
206             srcdir = os.path.sep.join(localdir_s[:-2])
207         elif dmatch("src/vpp-api/python"):
208             srcdir = os.path.sep.join(localdir_s[:-3])
209         elif dmatch("test"):
210             # we're apparently running tests
211             srcdir = os.path.sep.join(localdir_s[:-1])
212
213         if srcdir:
214             # we're in the source tree, try both the debug and release
215             # variants.
216             dirs.append(sdir(srcdir, "_debug"))
217             dirs.append(sdir(srcdir, ""))
218
219         # Test for staged copies of the scripts
220         # For these, since we explicitly know if we're running a debug versus
221         # release variant, target only the relevant directory
222         if dmatch("build-root/install-vpp_debug-native/vpp/bin"):
223             srcdir = os.path.sep.join(localdir_s[:-4])
224             dirs.append(sdir(srcdir, "_debug"))
225         if dmatch("build-root/install-vpp-native/vpp/bin"):
226             srcdir = os.path.sep.join(localdir_s[:-4])
227             dirs.append(sdir(srcdir, ""))
228
229         # finally, try the location system packages typically install into
230         dirs.append(os.path.sep.join(("", "usr", "share", "vpp", "api")))
231
232         # check the directories for existence; first one wins
233         for dir in dirs:
234             if os.path.isdir(dir):
235                 return dir
236
237         return None
238
239     @classmethod
240     def find_api_files(cls, api_dir=None, patterns="*"):  # -> list
241         """Find API definition files from the given directory tree with the
242         given pattern. If no directory is given then find_api_dir() is used
243         to locate one. If no pattern is given then all definition files found
244         in the directory tree are used.
245
246         :param api_dir: A directory tree in which to locate API definition
247             files; subdirectories are descended into.
248             If this is None then find_api_dir() is called to discover it.
249         :param patterns: A list of patterns to use in each visited directory
250             when looking for files.
251             This can be a list/tuple object or a comma-separated string of
252             patterns. Each value in the list will have leading/trialing
253             whitespace stripped.
254             The pattern specifies the first part of the filename, '.api.json'
255             is appended.
256             The results are de-duplicated, thus overlapping patterns are fine.
257             If this is None it defaults to '*' meaning "all API files".
258         :returns: A list of file paths for the API files found.
259         """
260         if api_dir is None:
261             api_dir = cls.find_api_dir([])
262             if api_dir is None:
263                 raise VPPApiError("api_dir cannot be located")
264
265         if isinstance(patterns, list) or isinstance(patterns, tuple):
266             patterns = [p.strip() + ".api.json" for p in patterns]
267         else:
268             patterns = [p.strip() + ".api.json" for p in patterns.split(",")]
269
270         api_files = []
271         for root, dirnames, files in os.walk(api_dir):
272             # iterate all given patterns and de-dup the result
273             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
274             for filename in files:
275                 api_files.append(os.path.join(root, filename))
276
277         return api_files
278
279     @classmethod
280     def process_json_file(self, apidef_file):
281         api = json.load(apidef_file)
282         return self._process_json(api)
283
284     @classmethod
285     def process_json_str(self, json_str):
286         api = json.loads(json_str)
287         return self._process_json(api)
288
289     @staticmethod
290     def _process_json(api):  # -> Tuple[Dict, Dict]
291         types = {}
292         services = {}
293         messages = {}
294         try:
295             for t in api["enums"]:
296                 t[0] = "vl_api_" + t[0] + "_t"
297                 types[t[0]] = {"type": "enum", "data": t}
298         except KeyError:
299             pass
300         try:
301             for t in api["enumflags"]:
302                 t[0] = "vl_api_" + t[0] + "_t"
303                 types[t[0]] = {"type": "enum", "data": t}
304         except KeyError:
305             pass
306         try:
307             for t in api["unions"]:
308                 t[0] = "vl_api_" + t[0] + "_t"
309                 types[t[0]] = {"type": "union", "data": t}
310         except KeyError:
311             pass
312
313         try:
314             for t in api["types"]:
315                 t[0] = "vl_api_" + t[0] + "_t"
316                 types[t[0]] = {"type": "type", "data": t}
317         except KeyError:
318             pass
319
320         try:
321             for t, v in api["aliases"].items():
322                 types["vl_api_" + t + "_t"] = {"type": "alias", "data": v}
323         except KeyError:
324             pass
325
326         try:
327             services.update(api["services"])
328         except KeyError:
329             pass
330
331         i = 0
332         while True:
333             unresolved = {}
334             for k, v in types.items():
335                 t = v["data"]
336                 if not vpp_get_type(k):
337                     if v["type"] == "enum":
338                         try:
339                             VPPEnumType(t[0], t[1:])
340                         except ValueError:
341                             unresolved[k] = v
342                 if not vpp_get_type(k):
343                     if v["type"] == "enumflag":
344                         try:
345                             VPPEnumFlagType(t[0], t[1:])
346                         except ValueError:
347                             unresolved[k] = v
348                     elif v["type"] == "union":
349                         try:
350                             VPPUnionType(t[0], t[1:])
351                         except ValueError:
352                             unresolved[k] = v
353                     elif v["type"] == "type":
354                         try:
355                             VPPType(t[0], t[1:])
356                         except ValueError:
357                             unresolved[k] = v
358                     elif v["type"] == "alias":
359                         try:
360                             VPPTypeAlias(k, t)
361                         except ValueError:
362                             unresolved[k] = v
363             if len(unresolved) == 0:
364                 break
365             if i > 3:
366                 raise VPPValueError("Unresolved type definitions {}".format(unresolved))
367             types = unresolved
368             i += 1
369         try:
370             for m in api["messages"]:
371                 try:
372                     messages[m[0]] = VPPMessage(m[0], m[1:])
373                 except VPPNotImplementedError:
374                     ### OLE FIXME
375                     logger.error("Not implemented error for {}".format(m[0]))
376         except KeyError:
377             pass
378         return messages, services
379
380
381 class VPPApiClient:
382     """VPP interface.
383
384     This class provides the APIs to VPP.  The APIs are loaded
385     from provided .api.json files and makes functions accordingly.
386     These functions are documented in the VPP .api files, as they
387     are dynamically created.
388
389     Additionally, VPP can send callback messages; this class
390     provides a means to register a callback function to receive
391     these messages in a background thread.
392     """
393
394     apidir = None
395     VPPApiError = VPPApiError
396     VPPRuntimeError = VPPRuntimeError
397     VPPValueError = VPPValueError
398     VPPNotImplementedError = VPPNotImplementedError
399     VPPIOError = VPPIOError
400
401     def __init__(
402         self,
403         *,
404         apifiles=None,
405         testmode=False,
406         async_thread=True,
407         logger=None,
408         loglevel=None,
409         read_timeout=5,
410         use_socket=True,
411         server_address="/run/vpp/api.sock",
412     ):
413         """Create a VPP API object.
414
415         apifiles is a list of files containing API
416         descriptions that will be loaded - methods will be
417         dynamically created reflecting these APIs.  If not
418         provided this will load the API files from VPP's
419         default install location.
420
421         logger, if supplied, is the logging logger object to log to.
422         loglevel, if supplied, is the log level this logger is set
423         to report at (from the loglevels in the logging module).
424         """
425         if logger is None:
426             logger = logging.getLogger(
427                 "{}.{}".format(__name__, self.__class__.__name__)
428             )
429             if loglevel is not None:
430                 logger.setLevel(loglevel)
431         self.logger = logger
432
433         self.messages = {}
434         self.services = {}
435         self.id_names = []
436         self.id_msgdef = []
437         self.header = VPPType("header", [["u16", "msgid"], ["u32", "client_index"]])
438         self.apifiles = []
439         self.event_callback = None
440         self.message_queue = queue.Queue()
441         self.read_timeout = read_timeout
442         self.async_thread = async_thread
443         self.event_thread = None
444         self.testmode = testmode
445         self.server_address = server_address
446         self._apifiles = apifiles
447         self.stats = {}
448
449         if not apifiles:
450             # Pick up API definitions from default directory
451             try:
452                 apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
453             except (RuntimeError, VPPApiError):
454                 # In test mode we don't care that we can't find the API files
455                 if testmode:
456                     apifiles = []
457                 else:
458                     raise VPPRuntimeError
459
460         for file in apifiles:
461             with open(file) as apidef_file:
462                 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
463                 self.messages.update(m)
464                 self.services.update(s)
465
466         self.apifiles = apifiles
467
468         # Basic sanity check
469         if len(self.messages) == 0 and not testmode:
470             raise VPPValueError(1, "Missing JSON message definitions")
471         if not (verify_enum_hint(VppEnum.vl_api_address_family_t)):
472             raise VPPRuntimeError("Invalid address family hints. " "Cannot continue.")
473
474         self.transport = VppTransport(
475             self, read_timeout=read_timeout, server_address=server_address
476         )
477         # Make sure we allow VPP to clean up the message rings.
478         atexit.register(vpp_atexit, weakref.ref(self))
479
480         add_convenience_methods()
481
482     def get_function(self, name):
483         return getattr(self._api, name)
484
485     class ContextId:
486         """Multiprocessing-safe provider of unique context IDs."""
487
488         def __init__(self):
489             self.context = mp.Value(ctypes.c_uint, 0)
490             self.lock = mp.Lock()
491
492         def __call__(self):
493             """Get a new unique (or, at least, not recently used) context."""
494             with self.lock:
495                 self.context.value += 1
496                 return self.context.value
497
498     get_context = ContextId()
499
500     def get_type(self, name):
501         return vpp_get_type(name)
502
503     @property
504     def api(self):
505         if not hasattr(self, "_api"):
506             raise VPPApiError("Not connected, api definitions not available")
507         return self._api
508
509     def make_function(self, msg, i, multipart, do_async):
510         if do_async:
511
512             def f(**kwargs):
513                 return self._call_vpp_async(i, msg, **kwargs)
514
515         else:
516
517             def f(**kwargs):
518                 return self._call_vpp(i, msg, multipart, **kwargs)
519
520         f.__name__ = str(msg.name)
521         f.__doc__ = ", ".join(
522             ["%s %s" % (msg.fieldtypes[j], k) for j, k in enumerate(msg.fields)]
523         )
524         f.msg = msg
525
526         return f
527
528     def _register_functions(self, do_async=False):
529         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
530         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
531         self._api = VppApiDynamicMethodHolder()
532         for name, msg in self.messages.items():
533             n = name + "_" + msg.crc[2:]
534             i = self.transport.get_msg_index(n)
535             if i > 0:
536                 self.id_msgdef[i] = msg
537                 self.id_names[i] = name
538
539                 # Create function for client side messages.
540                 if name in self.services:
541                     f = self.make_function(msg, i, self.services[name], do_async)
542                     setattr(self._api, name, FuncWrapper(f))
543             else:
544                 self.logger.debug("No such message type or failed CRC checksum: %s", n)
545
546     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, do_async):
547         pfx = chroot_prefix.encode("utf-8") if chroot_prefix else None
548
549         rv = self.transport.connect(name, pfx, msg_handler, rx_qlen)
550         if rv != 0:
551             raise VPPIOError(2, "Connect failed")
552         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
553         self._register_functions(do_async=do_async)
554
555         # Initialise control ping
556         crc = self.messages["control_ping"].crc
557         self.control_ping_index = self.transport.get_msg_index(
558             ("control_ping" + "_" + crc[2:])
559         )
560         self.control_ping_msgdef = self.messages["control_ping"]
561         if self.async_thread:
562             self.event_thread = threading.Thread(target=self.thread_msg_handler)
563             self.event_thread.daemon = True
564             self.event_thread.start()
565         else:
566             self.event_thread = None
567         return rv
568
569     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
570         """Attach to VPP.
571
572         name - the name of the client.
573         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
574         do_async - if true, messages are sent without waiting for a reply
575         rx_qlen - the length of the VPP message receive queue between
576         client and server.
577         """
578         msg_handler = self.transport.get_callback(do_async)
579         return self.connect_internal(
580             name, msg_handler, chroot_prefix, rx_qlen, do_async
581         )
582
583     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
584         """Attach to VPP in synchronous mode. Application must poll for events.
585
586         name - the name of the client.
587         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
588         rx_qlen - the length of the VPP message receive queue between
589         client and server.
590         """
591
592         return self.connect_internal(name, None, chroot_prefix, rx_qlen, do_async=False)
593
594     def disconnect(self):
595         """Detach from VPP."""
596         rv = self.transport.disconnect()
597         if self.event_thread is not None:
598             self.message_queue.put("terminate event thread")
599         return rv
600
601     def msg_handler_sync(self, msg):
602         """Process an incoming message from VPP in sync mode.
603
604         The message may be a reply or it may be an async notification.
605         """
606         r = self.decode_incoming_msg(msg)
607         if r is None:
608             return
609
610         # If we have a context, then use the context to find any
611         # request waiting for a reply
612         context = 0
613         if hasattr(r, "context") and r.context > 0:
614             context = r.context
615
616         if context == 0:
617             # No context -> async notification that we feed to the callback
618             self.message_queue.put_nowait(r)
619         else:
620             raise VPPIOError(2, "RPC reply message received in event handler")
621
622     def has_context(self, msg):
623         if len(msg) < 10:
624             return False
625
626         header = VPPType(
627             "header_with_context",
628             [["u16", "msgid"], ["u32", "client_index"], ["u32", "context"]],
629         )
630
631         (i, ci, context), size = header.unpack(msg, 0)
632         if self.id_names[i] == "rx_thread_exit":
633             return
634
635         #
636         # Decode message and returns a tuple.
637         #
638         msgobj = self.id_msgdef[i]
639         if "context" in msgobj.field_by_name and context >= 0:
640             return True
641         return False
642
643     def decode_incoming_msg(self, msg, no_type_conversion=False):
644         if not msg:
645             logger.warning("vpp_api.read failed")
646             return
647
648         (i, ci), size = self.header.unpack(msg, 0)
649         if self.id_names[i] == "rx_thread_exit":
650             return
651
652         #
653         # Decode message and returns a tuple.
654         #
655         msgobj = self.id_msgdef[i]
656         if not msgobj:
657             raise VPPIOError(2, "Reply message undefined")
658
659         r, size = msgobj.unpack(msg, ntc=no_type_conversion)
660         return r
661
662     def msg_handler_async(self, msg):
663         """Process a message from VPP in async mode.
664
665         In async mode, all messages are returned to the callback.
666         """
667         r = self.decode_incoming_msg(msg)
668         if r is None:
669             return
670
671         msgname = type(r).__name__
672
673         if self.event_callback:
674             self.event_callback(msgname, r)
675
676     def _control_ping(self, context):
677         """Send a ping command."""
678         self._call_vpp_async(
679             self.control_ping_index, self.control_ping_msgdef, context=context
680         )
681
682     def validate_args(self, msg, kwargs):
683         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
684         if d:
685             raise VPPValueError("Invalid argument {} to {}".format(list(d), msg.name))
686
687     def _add_stat(self, name, ms):
688         if not name in self.stats:
689             self.stats[name] = {"max": ms, "count": 1, "avg": ms}
690         else:
691             if ms > self.stats[name]["max"]:
692                 self.stats[name]["max"] = ms
693             self.stats[name]["count"] += 1
694             n = self.stats[name]["count"]
695             self.stats[name]["avg"] = self.stats[name]["avg"] * (n - 1) / n + ms / n
696
697     def get_stats(self):
698         s = "\n=== API PAPI STATISTICS ===\n"
699         s += "{:<30} {:>4} {:>6} {:>6}\n".format("message", "cnt", "avg", "max")
700         for n in sorted(self.stats.items(), key=lambda v: v[1]["avg"], reverse=True):
701             s += "{:<30} {:>4} {:>6.2f} {:>6.2f}\n".format(
702                 n[0], n[1]["count"], n[1]["avg"], n[1]["max"]
703             )
704         return s
705
706     def get_field_options(self, msg, fld_name):
707         # when there is an option, the msgdef has 3 elements.
708         # ['u32', 'ring_size', {'default': 1024}]
709         for _def in self.messages[msg].msgdef:
710             if isinstance(_def, list) and len(_def) == 3 and _def[1] == fld_name:
711                 return _def[2]
712
713     def _call_vpp(self, i, msgdef, service, **kwargs):
714         """Given a message, send the message and await a reply.
715
716         msgdef - the message packing definition
717         i - the message type index
718         multipart - True if the message returns multiple
719         messages in return.
720         context - context number - chosen at random if not
721         supplied.
722         The remainder of the kwargs are the arguments to the API call.
723
724         The return value is the message or message array containing
725         the response.  It will raise an IOError exception if there was
726         no response within the timeout window.
727         """
728         ts = time.time()
729         if "context" not in kwargs:
730             context = self.get_context()
731             kwargs["context"] = context
732         else:
733             context = kwargs["context"]
734         kwargs["_vl_msg_id"] = i
735
736         no_type_conversion = kwargs.pop("_no_type_conversion", False)
737         timeout = kwargs.pop("_timeout", None)
738
739         try:
740             if self.transport.socket_index:
741                 kwargs["client_index"] = self.transport.socket_index
742         except AttributeError:
743             pass
744         self.validate_args(msgdef, kwargs)
745
746         s = "Calling {}({})".format(
747             msgdef.name, ",".join(["{!r}:{!r}".format(k, v) for k, v in kwargs.items()])
748         )
749         self.logger.debug(s)
750
751         b = msgdef.pack(kwargs)
752         self.transport.suspend()
753
754         self.transport.write(b)
755
756         msgreply = service["reply"]
757         stream = True if "stream" in service else False
758         if stream:
759             if "stream_msg" in service:
760                 # New service['reply'] = _reply and service['stream_message'] = _details
761                 stream_message = service["stream_msg"]
762                 modern = True
763             else:
764                 # Old  service['reply'] = _details
765                 stream_message = msgreply
766                 msgreply = "control_ping_reply"
767                 modern = False
768                 # Send a ping after the request - we use its response
769                 # to detect that we have seen all results.
770                 self._control_ping(context)
771
772         # Block until we get a reply.
773         rl = []
774         while True:
775             r = self.read_blocking(no_type_conversion, timeout)
776             if r is None:
777                 raise VPPIOError(2, "VPP API client: read failed")
778             msgname = type(r).__name__
779             if context not in r or r.context == 0 or context != r.context:
780                 # Message being queued
781                 self.message_queue.put_nowait(r)
782                 continue
783             if msgname != msgreply and (stream and (msgname != stream_message)):
784                 print("REPLY MISMATCH", msgreply, msgname, stream_message, stream)
785             if not stream:
786                 rl = r
787                 break
788             if msgname == msgreply:
789                 if modern:  # Return both reply and list
790                     rl = r, rl
791                 break
792
793             rl.append(r)
794
795         self.transport.resume()
796
797         s = "Return value: {!r}".format(r)
798         if len(s) > 80:
799             s = s[:80] + "..."
800         self.logger.debug(s)
801         te = time.time()
802         self._add_stat(msgdef.name, (te - ts) * 1000)
803         return rl
804
805     def _call_vpp_async(self, i, msg, **kwargs):
806         """Given a message, send the message and return the context.
807
808         msgdef - the message packing definition
809         i - the message type index
810         context - context number - chosen at random if not
811         supplied.
812         The remainder of the kwargs are the arguments to the API call.
813
814         The reply message(s) will be delivered later to the registered callback.
815         The returned context will help with assigning which call
816         the reply belongs to.
817         """
818         if "context" not in kwargs:
819             context = self.get_context()
820             kwargs["context"] = context
821         else:
822             context = kwargs["context"]
823         try:
824             if self.transport.socket_index:
825                 kwargs["client_index"] = self.transport.socket_index
826         except AttributeError:
827             kwargs["client_index"] = 0
828         kwargs["_vl_msg_id"] = i
829         b = msg.pack(kwargs)
830
831         self.transport.write(b)
832         return context
833
834     def read_blocking(self, no_type_conversion=False, timeout=None):
835         """Get next received message from transport within timeout, decoded.
836
837         Note that notifications have context zero
838         and are not put into receive queue (at least for socket transport),
839         use async_thread with registered callback for processing them.
840
841         If no message appears in the queue within timeout, return None.
842
843         Optionally, type conversion can be skipped,
844         as some of conversions are into less precise types.
845
846         When r is the return value of this, the caller can get message name as:
847             msgname = type(r).__name__
848         and context number (type long) as:
849             context = r.context
850
851         :param no_type_conversion: If false, type conversions are applied.
852         :type no_type_conversion: bool
853         :returns: Decoded message, or None if no message (within timeout).
854         :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
855         :raises VppTransportShmemIOError if timed out.
856         """
857         msg = self.transport.read(timeout=timeout)
858         if not msg:
859             return None
860         return self.decode_incoming_msg(msg, no_type_conversion)
861
862     def register_event_callback(self, callback):
863         """Register a callback for async messages.
864
865         This will be called for async notifications in sync mode,
866         and all messages in async mode.  In sync mode, replies to
867         requests will not come here.
868
869         callback is a fn(msg_type_name, msg_type) that will be
870         called when a message comes in.  While this function is
871         executing, note that (a) you are in a background thread and
872         may wish to use threading.Lock to protect your datastructures,
873         and (b) message processing from VPP will stop (so if you take
874         a long while about it you may provoke reply timeouts or cause
875         VPP to fill the RX buffer).  Passing None will disable the
876         callback.
877         """
878         self.event_callback = callback
879
880     def thread_msg_handler(self):
881         """Python thread calling the user registered message handler.
882
883         This is to emulate the old style event callback scheme. Modern
884         clients should provide their own thread to poll the event
885         queue.
886         """
887         while True:
888             r = self.message_queue.get()
889             if r == "terminate event thread":
890                 break
891             msgname = type(r).__name__
892             if self.event_callback:
893                 self.event_callback(msgname, r)
894
895     def validate_message_table(self, namecrctable):
896         """Take a dictionary of name_crc message names
897         and returns an array of missing messages"""
898
899         missing_table = []
900         for name_crc in namecrctable:
901             i = self.transport.get_msg_index(name_crc)
902             if i <= 0:
903                 missing_table.append(name_crc)
904         return missing_table
905
906     def dump_message_table(self):
907         """Return VPPs API message table as name_crc dictionary"""
908         return self.transport.message_table
909
910     def dump_message_table_filtered(self, msglist):
911         """Return VPPs API message table as name_crc dictionary,
912         filtered by message name list."""
913
914         replies = [self.services[n]["reply"] for n in msglist]
915         message_table_filtered = {}
916         for name in msglist + replies:
917             for k, v in self.transport.message_table.items():
918                 if k.startswith(name):
919                     message_table_filtered[k] = v
920                     break
921         return message_table_filtered
922
923     def __repr__(self):
924         return (
925             "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, "
926             "logger=%s, read_timeout=%s, "
927             "server_address='%s'>"
928             % (
929                 self._apifiles,
930                 self.testmode,
931                 self.async_thread,
932                 self.logger,
933                 self.read_timeout,
934                 self.server_address,
935             )
936         )
937
938     def details_iter(self, f, **kwargs):
939         cursor = 0
940         while True:
941             kwargs["cursor"] = cursor
942             rv, details = f(**kwargs)
943             for d in details:
944                 yield d
945             if rv.retval == 0 or rv.retval != -165:
946                 break
947             cursor = rv.cursor