34d57232865b821d4db243615dec201f9ac3bea1
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import ctypes
20 import ipaddress
21 import sys
22 import multiprocessing as mp
23 import os
24 import queue
25 import logging
26 import functools
27 import json
28 import threading
29 import fnmatch
30 import weakref
31 import atexit
32 import time
33 from .vpp_format import verify_enum_hint
34 from .vpp_serializer import VPPType, VPPEnumType, VPPEnumFlagType, VPPUnionType
35 from .vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
36
37 try:
38     import VppTransport
39 except ModuleNotFoundError:
40
41     class V:
42         """placeholder for VppTransport as the implementation is dependent on
43         VPPAPIClient's initialization values
44         """
45
46     VppTransport = V
47
48 from .vpp_transport_socket import VppTransport
49
50 logger = logging.getLogger("vpp_papi")
51 logger.addHandler(logging.NullHandler())
52
53 __all__ = (
54     "FuncWrapper",
55     "VppApiDynamicMethodHolder",
56     "VppEnum",
57     "VppEnumType",
58     "VppEnumFlag",
59     "VPPIOError",
60     "VPPRuntimeError",
61     "VPPValueError",
62     "VPPApiClient",
63 )
64
65
66 def metaclass(metaclass):
67     @functools.wraps(metaclass)
68     def wrapper(cls):
69         return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
70
71     return wrapper
72
73
74 class VppEnumType(type):
75     def __getattr__(cls, name):
76         t = vpp_get_type(name)
77         return t.enum
78
79
80 @metaclass(VppEnumType)
81 class VppEnum:
82     pass
83
84
85 @metaclass(VppEnumType)
86 class VppEnumFlag:
87     pass
88
89
90 def vpp_atexit(vpp_weakref):
91     """Clean up VPP connection on shutdown."""
92     vpp_instance = vpp_weakref()
93     if vpp_instance and vpp_instance.transport.connected:
94         logger.debug("Cleaning up VPP on exit")
95         vpp_instance.disconnect()
96
97
98 def add_convenience_methods():
99     # provide convenience methods to IP[46]Address.vapi_af
100     def _vapi_af(self):
101         if 6 == self._version:
102             return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
103         if 4 == self._version:
104             return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
105         raise ValueError("Invalid _version.")
106
107     def _vapi_af_name(self):
108         if 6 == self._version:
109             return "ip6"
110         if 4 == self._version:
111             return "ip4"
112         raise ValueError("Invalid _version.")
113
114     ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
115     ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
116
117
118 class VppApiDynamicMethodHolder:
119     pass
120
121
122 class FuncWrapper:
123     def __init__(self, func):
124         self._func = func
125         self.__name__ = func.__name__
126         self.__doc__ = func.__doc__
127
128     def __call__(self, **kwargs):
129         return self._func(**kwargs)
130
131     def __repr__(self):
132         return "<FuncWrapper(func=<%s(%s)>)>" % (self.__name__, self.__doc__)
133
134
135 class VPPApiError(Exception):
136     pass
137
138
139 class VPPNotImplementedError(NotImplementedError):
140     pass
141
142
143 class VPPIOError(IOError):
144     pass
145
146
147 class VPPRuntimeError(RuntimeError):
148     pass
149
150
151 class VPPValueError(ValueError):
152     pass
153
154
155 class VPPApiJSONFiles:
156     @classmethod
157     def find_api_dir(cls, dirs):
158         """Attempt to find the best directory in which API definition
159         files may reside. If the value VPP_API_DIR exists in the environment
160         then it is first on the search list. If we're inside a recognized
161         location in a VPP source tree (src/scripts and src/vpp-api/python)
162         then entries from there to the likely locations in build-root are
163         added. Finally the location used by system packages is added.
164
165         :returns: A single directory name, or None if no such directory
166             could be found.
167         """
168
169         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
170         # in which case, plot a course to likely places in the src tree
171         import __main__ as main
172
173         if hasattr(main, "__file__"):
174             # get the path of the calling script
175             localdir = os.path.dirname(os.path.realpath(main.__file__))
176         else:
177             # use cwd if there is no calling script
178             localdir = os.getcwd()
179         localdir_s = localdir.split(os.path.sep)
180
181         def dmatch(dir):
182             """Match dir against right-hand components of the script dir"""
183             d = dir.split("/")  # param 'dir' assumes a / separator
184             length = len(d)
185             return len(localdir_s) > length and localdir_s[-length:] == d
186
187         def sdir(srcdir, variant):
188             """Build a path from srcdir to the staged API files of
189             'variant'  (typically '' or '_debug')"""
190             # Since 'core' and 'plugin' files are staged
191             # in separate directories, we target the parent dir.
192             return os.path.sep.join(
193                 (
194                     srcdir,
195                     "build-root",
196                     "install-vpp%s-native" % variant,
197                     "vpp",
198                     "share",
199                     "vpp",
200                     "api",
201                 )
202             )
203
204         srcdir = None
205         if dmatch("src/scripts"):
206             srcdir = os.path.sep.join(localdir_s[:-2])
207         elif dmatch("src/vpp-api/python"):
208             srcdir = os.path.sep.join(localdir_s[:-3])
209         elif dmatch("test"):
210             # we're apparently running tests
211             srcdir = os.path.sep.join(localdir_s[:-1])
212
213         if srcdir:
214             # we're in the source tree, try both the debug and release
215             # variants.
216             dirs.append(sdir(srcdir, "_debug"))
217             dirs.append(sdir(srcdir, ""))
218
219         # Test for staged copies of the scripts
220         # For these, since we explicitly know if we're running a debug versus
221         # release variant, target only the relevant directory
222         if dmatch("build-root/install-vpp_debug-native/vpp/bin"):
223             srcdir = os.path.sep.join(localdir_s[:-4])
224             dirs.append(sdir(srcdir, "_debug"))
225         if dmatch("build-root/install-vpp-native/vpp/bin"):
226             srcdir = os.path.sep.join(localdir_s[:-4])
227             dirs.append(sdir(srcdir, ""))
228
229         # finally, try the location system packages typically install into
230         dirs.append(os.path.sep.join(("", "usr", "share", "vpp", "api")))
231
232         # check the directories for existence; first one wins
233         for dir in dirs:
234             if os.path.isdir(dir):
235                 return dir
236
237         return None
238
239     @classmethod
240     def find_api_files(cls, api_dir=None, patterns="*"):  # -> list
241         """Find API definition files from the given directory tree with the
242         given pattern. If no directory is given then find_api_dir() is used
243         to locate one. If no pattern is given then all definition files found
244         in the directory tree are used.
245
246         :param api_dir: A directory tree in which to locate API definition
247             files; subdirectories are descended into.
248             If this is None then find_api_dir() is called to discover it.
249         :param patterns: A list of patterns to use in each visited directory
250             when looking for files.
251             This can be a list/tuple object or a comma-separated string of
252             patterns. Each value in the list will have leading/trialing
253             whitespace stripped.
254             The pattern specifies the first part of the filename, '.api.json'
255             is appended.
256             The results are de-duplicated, thus overlapping patterns are fine.
257             If this is None it defaults to '*' meaning "all API files".
258         :returns: A list of file paths for the API files found.
259         """
260         if api_dir is None:
261             api_dir = cls.find_api_dir([])
262             if api_dir is None:
263                 raise VPPApiError("api_dir cannot be located")
264
265         if isinstance(patterns, list) or isinstance(patterns, tuple):
266             patterns = [p.strip() + ".api.json" for p in patterns]
267         else:
268             patterns = [p.strip() + ".api.json" for p in patterns.split(",")]
269
270         api_files = []
271         for root, dirnames, files in os.walk(api_dir):
272             # iterate all given patterns and de-dup the result
273             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
274             for filename in files:
275                 api_files.append(os.path.join(root, filename))
276
277         return api_files
278
279     @classmethod
280     def process_json_file(self, apidef_file):
281         api = json.load(apidef_file)
282         return self._process_json(api)
283
284     @classmethod
285     def process_json_str(self, json_str):
286         api = json.loads(json_str)
287         return self._process_json(api)
288
289     @staticmethod
290     def _process_json(api):  # -> Tuple[Dict, Dict]
291         types = {}
292         services = {}
293         messages = {}
294         try:
295             for t in api["enums"]:
296                 t[0] = "vl_api_" + t[0] + "_t"
297                 types[t[0]] = {"type": "enum", "data": t}
298         except KeyError:
299             pass
300         try:
301             for t in api["enumflags"]:
302                 t[0] = "vl_api_" + t[0] + "_t"
303                 types[t[0]] = {"type": "enum", "data": t}
304         except KeyError:
305             pass
306         try:
307             for t in api["unions"]:
308                 t[0] = "vl_api_" + t[0] + "_t"
309                 types[t[0]] = {"type": "union", "data": t}
310         except KeyError:
311             pass
312
313         try:
314             for t in api["types"]:
315                 t[0] = "vl_api_" + t[0] + "_t"
316                 types[t[0]] = {"type": "type", "data": t}
317         except KeyError:
318             pass
319
320         try:
321             for t, v in api["aliases"].items():
322                 types["vl_api_" + t + "_t"] = {"type": "alias", "data": v}
323         except KeyError:
324             pass
325
326         try:
327             services.update(api["services"])
328         except KeyError:
329             pass
330
331         i = 0
332         while True:
333             unresolved = {}
334             for k, v in types.items():
335                 t = v["data"]
336                 if not vpp_get_type(k):
337                     if v["type"] == "enum":
338                         try:
339                             VPPEnumType(t[0], t[1:])
340                         except ValueError:
341                             unresolved[k] = v
342                 if not vpp_get_type(k):
343                     if v["type"] == "enumflag":
344                         try:
345                             VPPEnumFlagType(t[0], t[1:])
346                         except ValueError:
347                             unresolved[k] = v
348                     elif v["type"] == "union":
349                         try:
350                             VPPUnionType(t[0], t[1:])
351                         except ValueError:
352                             unresolved[k] = v
353                     elif v["type"] == "type":
354                         try:
355                             VPPType(t[0], t[1:])
356                         except ValueError:
357                             unresolved[k] = v
358                     elif v["type"] == "alias":
359                         try:
360                             VPPTypeAlias(k, t)
361                         except ValueError:
362                             unresolved[k] = v
363             if len(unresolved) == 0:
364                 break
365             if i > 3:
366                 raise VPPValueError("Unresolved type definitions {}".format(unresolved))
367             types = unresolved
368             i += 1
369         try:
370             for m in api["messages"]:
371                 try:
372                     messages[m[0]] = VPPMessage(m[0], m[1:])
373                 except VPPNotImplementedError:
374                     ### OLE FIXME
375                     logger.error("Not implemented error for {}".format(m[0]))
376         except KeyError:
377             pass
378         return messages, services
379
380
381 class VPPApiClient:
382     """VPP interface.
383
384     This class provides the APIs to VPP.  The APIs are loaded
385     from provided .api.json files and makes functions accordingly.
386     These functions are documented in the VPP .api files, as they
387     are dynamically created.
388
389     Additionally, VPP can send callback messages; this class
390     provides a means to register a callback function to receive
391     these messages in a background thread.
392     """
393
394     apidir = None
395     VPPApiError = VPPApiError
396     VPPRuntimeError = VPPRuntimeError
397     VPPValueError = VPPValueError
398     VPPNotImplementedError = VPPNotImplementedError
399     VPPIOError = VPPIOError
400
401     def __init__(
402         self,
403         *,
404         apifiles=None,
405         testmode=False,
406         async_thread=True,
407         logger=None,
408         loglevel=None,
409         read_timeout=5,
410         use_socket=True,
411         server_address="/run/vpp/api.sock",
412     ):
413         """Create a VPP API object.
414
415         apifiles is a list of files containing API
416         descriptions that will be loaded - methods will be
417         dynamically created reflecting these APIs.  If not
418         provided this will load the API files from VPP's
419         default install location.
420
421         logger, if supplied, is the logging logger object to log to.
422         loglevel, if supplied, is the log level this logger is set
423         to report at (from the loglevels in the logging module).
424         """
425         if logger is None:
426             logger = logging.getLogger(
427                 "{}.{}".format(__name__, self.__class__.__name__)
428             )
429             if loglevel is not None:
430                 logger.setLevel(loglevel)
431         self.logger = logger
432
433         self.messages = {}
434         self.services = {}
435         self.id_names = []
436         self.id_msgdef = []
437         self.header = VPPType("header", [["u16", "msgid"], ["u32", "client_index"]])
438         self.apifiles = []
439         self.event_callback = None
440         self.message_queue = queue.Queue()
441         self.read_timeout = read_timeout
442         self.async_thread = async_thread
443         self.event_thread = None
444         self.testmode = testmode
445         self.server_address = server_address
446         self._apifiles = apifiles
447         self.stats = {}
448
449         if not apifiles:
450             # Pick up API definitions from default directory
451             try:
452                 if isinstance(self.apidir, list):
453                     apifiles = []
454                     for d in self.apidir:
455                         apifiles += VPPApiJSONFiles.find_api_files(d)
456                 else:
457                     apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
458             except (RuntimeError, VPPApiError):
459                 # In test mode we don't care that we can't find the API files
460                 if testmode:
461                     apifiles = []
462                 else:
463                     raise VPPRuntimeError
464
465         for file in apifiles:
466             with open(file) as apidef_file:
467                 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
468                 self.messages.update(m)
469                 self.services.update(s)
470
471         self.apifiles = apifiles
472
473         # Basic sanity check
474         if len(self.messages) == 0 and not testmode:
475             raise VPPValueError(1, "Missing JSON message definitions")
476         if not (verify_enum_hint(VppEnum.vl_api_address_family_t)):
477             raise VPPRuntimeError("Invalid address family hints. " "Cannot continue.")
478
479         self.transport = VppTransport(
480             self, read_timeout=read_timeout, server_address=server_address
481         )
482         # Make sure we allow VPP to clean up the message rings.
483         atexit.register(vpp_atexit, weakref.ref(self))
484
485         add_convenience_methods()
486
487     def get_function(self, name):
488         return getattr(self._api, name)
489
490     class ContextId:
491         """Multiprocessing-safe provider of unique context IDs."""
492
493         def __init__(self):
494             self.context = mp.Value(ctypes.c_uint, 0)
495             self.lock = mp.Lock()
496
497         def __call__(self):
498             """Get a new unique (or, at least, not recently used) context."""
499             with self.lock:
500                 self.context.value += 1
501                 return self.context.value
502
503     get_context = ContextId()
504
505     def get_type(self, name):
506         return vpp_get_type(name)
507
508     @property
509     def api(self):
510         if not hasattr(self, "_api"):
511             raise VPPApiError("Not connected, api definitions not available")
512         return self._api
513
514     def make_function(self, msg, i, multipart, do_async):
515         if do_async:
516
517             def f(**kwargs):
518                 return self._call_vpp_async(i, msg, **kwargs)
519
520         else:
521
522             def f(**kwargs):
523                 return self._call_vpp(i, msg, multipart, **kwargs)
524
525         f.__name__ = str(msg.name)
526         f.__doc__ = ", ".join(
527             ["%s %s" % (msg.fieldtypes[j], k) for j, k in enumerate(msg.fields)]
528         )
529         f.msg = msg
530
531         return f
532
533     def _register_functions(self, do_async=False):
534         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
535         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
536         self._api = VppApiDynamicMethodHolder()
537         for name, msg in self.messages.items():
538             n = name + "_" + msg.crc[2:]
539             i = self.transport.get_msg_index(n)
540             if i > 0:
541                 self.id_msgdef[i] = msg
542                 self.id_names[i] = name
543
544                 # Create function for client side messages.
545                 if name in self.services:
546                     f = self.make_function(msg, i, self.services[name], do_async)
547                     setattr(self._api, name, FuncWrapper(f))
548             else:
549                 self.logger.debug("No such message type or failed CRC checksum: %s", n)
550
551     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, do_async):
552         pfx = chroot_prefix.encode("utf-8") if chroot_prefix else None
553
554         rv = self.transport.connect(name, pfx, msg_handler, rx_qlen, do_async)
555         if rv != 0:
556             raise VPPIOError(2, "Connect failed")
557         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
558         self._register_functions(do_async=do_async)
559
560         # Initialise control ping
561         crc = self.messages["control_ping"].crc
562         self.control_ping_index = self.transport.get_msg_index(
563             ("control_ping" + "_" + crc[2:])
564         )
565         self.control_ping_msgdef = self.messages["control_ping"]
566         if self.async_thread:
567             self.event_thread = threading.Thread(target=self.thread_msg_handler)
568             self.event_thread.daemon = True
569             self.event_thread.start()
570         else:
571             self.event_thread = None
572         return rv
573
574     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
575         """Attach to VPP.
576
577         name - the name of the client.
578         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
579         do_async - if true, messages are sent without waiting for a reply
580         rx_qlen - the length of the VPP message receive queue between
581         client and server.
582         """
583         msg_handler = self.transport.get_callback(do_async)
584         return self.connect_internal(
585             name, msg_handler, chroot_prefix, rx_qlen, do_async
586         )
587
588     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
589         """Attach to VPP in synchronous mode. Application must poll for events.
590
591         name - the name of the client.
592         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
593         rx_qlen - the length of the VPP message receive queue between
594         client and server.
595         """
596
597         return self.connect_internal(name, None, chroot_prefix, rx_qlen, do_async=False)
598
599     def disconnect(self):
600         """Detach from VPP."""
601         rv = self.transport.disconnect()
602         if self.event_thread is not None:
603             self.message_queue.put("terminate event thread")
604         return rv
605
606     def msg_handler_sync(self, msg):
607         """Process an incoming message from VPP in sync mode.
608
609         The message may be a reply or it may be an async notification.
610         """
611         r = self.decode_incoming_msg(msg)
612         if r is None:
613             return
614
615         # If we have a context, then use the context to find any
616         # request waiting for a reply
617         context = 0
618         if hasattr(r, "context") and r.context > 0:
619             context = r.context
620
621         if context == 0:
622             # No context -> async notification that we feed to the callback
623             self.message_queue.put_nowait(r)
624         else:
625             raise VPPIOError(2, "RPC reply message received in event handler")
626
627     def has_context(self, msg):
628         if len(msg) < 10:
629             return False
630
631         header = VPPType(
632             "header_with_context",
633             [["u16", "msgid"], ["u32", "client_index"], ["u32", "context"]],
634         )
635
636         (i, ci, context), size = header.unpack(msg, 0)
637         if self.id_names[i] == "rx_thread_exit":
638             return
639
640         #
641         # Decode message and returns a tuple.
642         #
643         msgobj = self.id_msgdef[i]
644         if "context" in msgobj.field_by_name and context >= 0:
645             return True
646         return False
647
648     def decode_incoming_msg(self, msg, no_type_conversion=False):
649         if not msg:
650             logger.warning("vpp_api.read failed")
651             return
652
653         (i, ci), size = self.header.unpack(msg, 0)
654         if self.id_names[i] == "rx_thread_exit":
655             return
656
657         #
658         # Decode message and returns a tuple.
659         #
660         msgobj = self.id_msgdef[i]
661         if not msgobj:
662             raise VPPIOError(2, "Reply message undefined")
663
664         r, size = msgobj.unpack(msg, ntc=no_type_conversion)
665         return r
666
667     def msg_handler_async(self, msg):
668         """Process a message from VPP in async mode.
669
670         In async mode, all messages are returned to the callback.
671         """
672         r = self.decode_incoming_msg(msg)
673         if r is None:
674             return
675
676         msgname = type(r).__name__
677
678         if self.event_callback:
679             self.event_callback(msgname, r)
680
681     def _control_ping(self, context):
682         """Send a ping command."""
683         self._call_vpp_async(
684             self.control_ping_index, self.control_ping_msgdef, context=context
685         )
686
687     def validate_args(self, msg, kwargs):
688         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
689         if d:
690             raise VPPValueError("Invalid argument {} to {}".format(list(d), msg.name))
691
692     def _add_stat(self, name, ms):
693         if not name in self.stats:
694             self.stats[name] = {"max": ms, "count": 1, "avg": ms}
695         else:
696             if ms > self.stats[name]["max"]:
697                 self.stats[name]["max"] = ms
698             self.stats[name]["count"] += 1
699             n = self.stats[name]["count"]
700             self.stats[name]["avg"] = self.stats[name]["avg"] * (n - 1) / n + ms / n
701
702     def get_stats(self):
703         s = "\n=== API PAPI STATISTICS ===\n"
704         s += "{:<30} {:>4} {:>6} {:>6}\n".format("message", "cnt", "avg", "max")
705         for n in sorted(self.stats.items(), key=lambda v: v[1]["avg"], reverse=True):
706             s += "{:<30} {:>4} {:>6.2f} {:>6.2f}\n".format(
707                 n[0], n[1]["count"], n[1]["avg"], n[1]["max"]
708             )
709         return s
710
711     def get_field_options(self, msg, fld_name):
712         # when there is an option, the msgdef has 3 elements.
713         # ['u32', 'ring_size', {'default': 1024}]
714         for _def in self.messages[msg].msgdef:
715             if isinstance(_def, list) and len(_def) == 3 and _def[1] == fld_name:
716                 return _def[2]
717
718     def _call_vpp(self, i, msgdef, service, **kwargs):
719         """Given a message, send the message and await a reply.
720
721         msgdef - the message packing definition
722         i - the message type index
723         multipart - True if the message returns multiple
724         messages in return.
725         context - context number - chosen at random if not
726         supplied.
727         The remainder of the kwargs are the arguments to the API call.
728
729         The return value is the message or message array containing
730         the response.  It will raise an IOError exception if there was
731         no response within the timeout window.
732         """
733         ts = time.time()
734         if "context" not in kwargs:
735             context = self.get_context()
736             kwargs["context"] = context
737         else:
738             context = kwargs["context"]
739         kwargs["_vl_msg_id"] = i
740
741         no_type_conversion = kwargs.pop("_no_type_conversion", False)
742         timeout = kwargs.pop("_timeout", None)
743
744         try:
745             if self.transport.socket_index:
746                 kwargs["client_index"] = self.transport.socket_index
747         except AttributeError:
748             pass
749         self.validate_args(msgdef, kwargs)
750
751         s = "Calling {}({})".format(
752             msgdef.name, ",".join(["{!r}:{!r}".format(k, v) for k, v in kwargs.items()])
753         )
754         self.logger.debug(s)
755
756         b = msgdef.pack(kwargs)
757         self.transport.suspend()
758
759         self.transport.write(b)
760
761         msgreply = service["reply"]
762         stream = True if "stream" in service else False
763         if stream:
764             if "stream_msg" in service:
765                 # New service['reply'] = _reply and service['stream_message'] = _details
766                 stream_message = service["stream_msg"]
767                 modern = True
768             else:
769                 # Old  service['reply'] = _details
770                 stream_message = msgreply
771                 msgreply = "control_ping_reply"
772                 modern = False
773                 # Send a ping after the request - we use its response
774                 # to detect that we have seen all results.
775                 self._control_ping(context)
776
777         # Block until we get a reply.
778         rl = []
779         while True:
780             r = self.read_blocking(no_type_conversion, timeout)
781             if r is None:
782                 raise VPPIOError(2, "VPP API client: read failed")
783             msgname = type(r).__name__
784             if context not in r or r.context == 0 or context != r.context:
785                 # Message being queued
786                 self.message_queue.put_nowait(r)
787                 continue
788             if msgname != msgreply and (stream and (msgname != stream_message)):
789                 print("REPLY MISMATCH", msgreply, msgname, stream_message, stream)
790             if not stream:
791                 rl = r
792                 break
793             if msgname == msgreply:
794                 if modern:  # Return both reply and list
795                     rl = r, rl
796                 break
797
798             rl.append(r)
799
800         self.transport.resume()
801
802         s = "Return value: {!r}".format(r)
803         if len(s) > 80:
804             s = s[:80] + "..."
805         self.logger.debug(s)
806         te = time.time()
807         self._add_stat(msgdef.name, (te - ts) * 1000)
808         return rl
809
810     def _call_vpp_async(self, i, msg, **kwargs):
811         """Given a message, send the message and return the context.
812
813         msgdef - the message packing definition
814         i - the message type index
815         context - context number - chosen at random if not
816         supplied.
817         The remainder of the kwargs are the arguments to the API call.
818
819         The reply message(s) will be delivered later to the registered callback.
820         The returned context will help with assigning which call
821         the reply belongs to.
822         """
823         if "context" not in kwargs:
824             context = self.get_context()
825             kwargs["context"] = context
826         else:
827             context = kwargs["context"]
828         try:
829             if self.transport.socket_index:
830                 kwargs["client_index"] = self.transport.socket_index
831         except AttributeError:
832             kwargs["client_index"] = 0
833         kwargs["_vl_msg_id"] = i
834         b = msg.pack(kwargs)
835
836         self.transport.write(b)
837         return context
838
839     def read_blocking(self, no_type_conversion=False, timeout=None):
840         """Get next received message from transport within timeout, decoded.
841
842         Note that notifications have context zero
843         and are not put into receive queue (at least for socket transport),
844         use async_thread with registered callback for processing them.
845
846         If no message appears in the queue within timeout, return None.
847
848         Optionally, type conversion can be skipped,
849         as some of conversions are into less precise types.
850
851         When r is the return value of this, the caller can get message name as:
852             msgname = type(r).__name__
853         and context number (type long) as:
854             context = r.context
855
856         :param no_type_conversion: If false, type conversions are applied.
857         :type no_type_conversion: bool
858         :returns: Decoded message, or None if no message (within timeout).
859         :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
860         :raises VppTransportShmemIOError if timed out.
861         """
862         msg = self.transport.read(timeout=timeout)
863         if not msg:
864             return None
865         return self.decode_incoming_msg(msg, no_type_conversion)
866
867     def register_event_callback(self, callback):
868         """Register a callback for async messages.
869
870         This will be called for async notifications in sync mode,
871         and all messages in async mode.  In sync mode, replies to
872         requests will not come here.
873
874         callback is a fn(msg_type_name, msg_type) that will be
875         called when a message comes in.  While this function is
876         executing, note that (a) you are in a background thread and
877         may wish to use threading.Lock to protect your datastructures,
878         and (b) message processing from VPP will stop (so if you take
879         a long while about it you may provoke reply timeouts or cause
880         VPP to fill the RX buffer).  Passing None will disable the
881         callback.
882         """
883         self.event_callback = callback
884
885     def thread_msg_handler(self):
886         """Python thread calling the user registered message handler.
887
888         This is to emulate the old style event callback scheme. Modern
889         clients should provide their own thread to poll the event
890         queue.
891         """
892         while True:
893             r = self.message_queue.get()
894             if r == "terminate event thread":
895                 break
896             msgname = type(r).__name__
897             if self.event_callback:
898                 self.event_callback(msgname, r)
899
900     def validate_message_table(self, namecrctable):
901         """Take a dictionary of name_crc message names
902         and returns an array of missing messages"""
903
904         missing_table = []
905         for name_crc in namecrctable:
906             i = self.transport.get_msg_index(name_crc)
907             if i <= 0:
908                 missing_table.append(name_crc)
909         return missing_table
910
911     def dump_message_table(self):
912         """Return VPPs API message table as name_crc dictionary"""
913         return self.transport.message_table
914
915     def dump_message_table_filtered(self, msglist):
916         """Return VPPs API message table as name_crc dictionary,
917         filtered by message name list."""
918
919         replies = [self.services[n]["reply"] for n in msglist]
920         message_table_filtered = {}
921         for name in msglist + replies:
922             for k, v in self.transport.message_table.items():
923                 if k.startswith(name):
924                     message_table_filtered[k] = v
925                     break
926         return message_table_filtered
927
928     def __repr__(self):
929         return (
930             "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, "
931             "logger=%s, read_timeout=%s, "
932             "server_address='%s'>"
933             % (
934                 self._apifiles,
935                 self.testmode,
936                 self.async_thread,
937                 self.logger,
938                 self.read_timeout,
939                 self.server_address,
940             )
941         )
942
943     def details_iter(self, f, **kwargs):
944         cursor = 0
945         while True:
946             kwargs["cursor"] = cursor
947             rv, details = f(**kwargs)
948             for d in details:
949                 yield d
950             if rv.retval == 0 or rv.retval != -165:
951                 break
952             cursor = rv.cursor