e67ee192f998e77949299d99cda8dca9584dc4ef
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import ctypes
20 import ipaddress
21 import sys
22 import multiprocessing as mp
23 import os
24 import queue
25 import logging
26 import functools
27 import json
28 import threading
29 import fnmatch
30 import weakref
31 import atexit
32 import time
33 from .vpp_format import verify_enum_hint
34 from .vpp_serializer import VPPType, VPPEnumType, VPPEnumFlagType, VPPUnionType
35 from .vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
36
37 try:
38     import VppTransport
39 except ModuleNotFoundError:
40
41     class V:
42         """placeholder for VppTransport as the implementation is dependent on
43         VPPAPIClient's initialization values
44         """
45
46     VppTransport = V
47
48 from .vpp_transport_socket import VppTransport
49
50 logger = logging.getLogger("vpp_papi")
51 logger.addHandler(logging.NullHandler())
52
53 __all__ = (
54     "FuncWrapper",
55     "VppApiDynamicMethodHolder",
56     "VppEnum",
57     "VppEnumType",
58     "VppEnumFlag",
59     "VPPIOError",
60     "VPPRuntimeError",
61     "VPPValueError",
62     "VPPApiClient",
63 )
64
65
66 def metaclass(metaclass):
67     @functools.wraps(metaclass)
68     def wrapper(cls):
69         return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
70
71     return wrapper
72
73
74 class VppEnumType(type):
75     def __getattr__(cls, name):
76         t = vpp_get_type(name)
77         return t.enum
78
79
80 @metaclass(VppEnumType)
81 class VppEnum:
82     pass
83
84
85 @metaclass(VppEnumType)
86 class VppEnumFlag:
87     pass
88
89
90 def vpp_atexit(vpp_weakref):
91     """Clean up VPP connection on shutdown."""
92     vpp_instance = vpp_weakref()
93     if vpp_instance and vpp_instance.transport.connected:
94         logger.debug("Cleaning up VPP on exit")
95         vpp_instance.disconnect()
96
97
98 def add_convenience_methods():
99     # provide convenience methods to IP[46]Address.vapi_af
100     def _vapi_af(self):
101         if 6 == self._version:
102             return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
103         if 4 == self._version:
104             return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
105         raise ValueError("Invalid _version.")
106
107     def _vapi_af_name(self):
108         if 6 == self._version:
109             return "ip6"
110         if 4 == self._version:
111             return "ip4"
112         raise ValueError("Invalid _version.")
113
114     ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
115     ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
116
117
118 class VppApiDynamicMethodHolder:
119     pass
120
121
122 class FuncWrapper:
123     def __init__(self, func):
124         self._func = func
125         self.__name__ = func.__name__
126         self.__doc__ = func.__doc__
127
128     def __call__(self, **kwargs):
129         return self._func(**kwargs)
130
131     def __repr__(self):
132         return "<FuncWrapper(func=<%s(%s)>)>" % (self.__name__, self.__doc__)
133
134
135 class VPPApiError(Exception):
136     pass
137
138
139 class VPPNotImplementedError(NotImplementedError):
140     pass
141
142
143 class VPPIOError(IOError):
144     pass
145
146
147 class VPPRuntimeError(RuntimeError):
148     pass
149
150
151 class VPPValueError(ValueError):
152     pass
153
154
155 class VPPApiJSONFiles:
156     @classmethod
157     def find_api_dir(cls, dirs):
158         """Attempt to find the best directory in which API definition
159         files may reside. If the value VPP_API_DIR exists in the environment
160         then it is first on the search list. If we're inside a recognized
161         location in a VPP source tree (src/scripts and src/vpp-api/python)
162         then entries from there to the likely locations in build-root are
163         added. Finally the location used by system packages is added.
164
165         :returns: A single directory name, or None if no such directory
166             could be found.
167         """
168
169         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
170         # in which case, plot a course to likely places in the src tree
171         import __main__ as main
172
173         if hasattr(main, "__file__"):
174             # get the path of the calling script
175             localdir = os.path.dirname(os.path.realpath(main.__file__))
176         else:
177             # use cwd if there is no calling script
178             localdir = os.getcwd()
179         localdir_s = localdir.split(os.path.sep)
180
181         def dmatch(dir):
182             """Match dir against right-hand components of the script dir"""
183             d = dir.split("/")  # param 'dir' assumes a / separator
184             length = len(d)
185             return len(localdir_s) > length and localdir_s[-length:] == d
186
187         def sdir(srcdir, variant):
188             """Build a path from srcdir to the staged API files of
189             'variant'  (typically '' or '_debug')"""
190             # Since 'core' and 'plugin' files are staged
191             # in separate directories, we target the parent dir.
192             return os.path.sep.join(
193                 (
194                     srcdir,
195                     "build-root",
196                     "install-vpp%s-native" % variant,
197                     "vpp",
198                     "share",
199                     "vpp",
200                     "api",
201                 )
202             )
203
204         srcdir = None
205         if dmatch("src/scripts"):
206             srcdir = os.path.sep.join(localdir_s[:-2])
207         elif dmatch("src/vpp-api/python"):
208             srcdir = os.path.sep.join(localdir_s[:-3])
209         elif dmatch("test"):
210             # we're apparently running tests
211             srcdir = os.path.sep.join(localdir_s[:-1])
212
213         if srcdir:
214             # we're in the source tree, try both the debug and release
215             # variants.
216             dirs.append(sdir(srcdir, "_debug"))
217             dirs.append(sdir(srcdir, ""))
218
219         # Test for staged copies of the scripts
220         # For these, since we explicitly know if we're running a debug versus
221         # release variant, target only the relevant directory
222         if dmatch("build-root/install-vpp_debug-native/vpp/bin"):
223             srcdir = os.path.sep.join(localdir_s[:-4])
224             dirs.append(sdir(srcdir, "_debug"))
225         if dmatch("build-root/install-vpp-native/vpp/bin"):
226             srcdir = os.path.sep.join(localdir_s[:-4])
227             dirs.append(sdir(srcdir, ""))
228
229         # finally, try the location system packages typically install into
230         dirs.append(os.path.sep.join(("", "usr", "share", "vpp", "api")))
231
232         # check the directories for existence; first one wins
233         for dir in dirs:
234             if os.path.isdir(dir):
235                 return dir
236
237         return None
238
239     @classmethod
240     def find_api_files(cls, api_dir=None, patterns="*"):  # -> list
241         """Find API definition files from the given directory tree with the
242         given pattern. If no directory is given then find_api_dir() is used
243         to locate one. If no pattern is given then all definition files found
244         in the directory tree are used.
245
246         :param api_dir: A directory tree in which to locate API definition
247             files; subdirectories are descended into.
248             If this is None then find_api_dir() is called to discover it.
249         :param patterns: A list of patterns to use in each visited directory
250             when looking for files.
251             This can be a list/tuple object or a comma-separated string of
252             patterns. Each value in the list will have leading/trialing
253             whitespace stripped.
254             The pattern specifies the first part of the filename, '.api.json'
255             is appended.
256             The results are de-duplicated, thus overlapping patterns are fine.
257             If this is None it defaults to '*' meaning "all API files".
258         :returns: A list of file paths for the API files found.
259         """
260         if api_dir is None:
261             api_dir = cls.find_api_dir([])
262             if api_dir is None:
263                 raise VPPApiError("api_dir cannot be located")
264
265         if isinstance(patterns, list) or isinstance(patterns, tuple):
266             patterns = [p.strip() + ".api.json" for p in patterns]
267         else:
268             patterns = [p.strip() + ".api.json" for p in patterns.split(",")]
269
270         api_files = []
271         for root, dirnames, files in os.walk(api_dir):
272             # iterate all given patterns and de-dup the result
273             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
274             for filename in files:
275                 api_files.append(os.path.join(root, filename))
276
277         return api_files
278
279     @classmethod
280     def process_json_file(self, apidef_file):
281         api = json.load(apidef_file)
282         return self._process_json(api)
283
284     @classmethod
285     def process_json_str(self, json_str):
286         api = json.loads(json_str)
287         return self._process_json(api)
288
289     @staticmethod
290     def _process_json(api):  # -> Tuple[Dict, Dict]
291         types = {}
292         services = {}
293         messages = {}
294         try:
295             for t in api["enums"]:
296                 t[0] = "vl_api_" + t[0] + "_t"
297                 types[t[0]] = {"type": "enum", "data": t}
298         except KeyError:
299             pass
300         try:
301             for t in api["enumflags"]:
302                 t[0] = "vl_api_" + t[0] + "_t"
303                 types[t[0]] = {"type": "enum", "data": t}
304         except KeyError:
305             pass
306         try:
307             for t in api["unions"]:
308                 t[0] = "vl_api_" + t[0] + "_t"
309                 types[t[0]] = {"type": "union", "data": t}
310         except KeyError:
311             pass
312
313         try:
314             for t in api["types"]:
315                 t[0] = "vl_api_" + t[0] + "_t"
316                 types[t[0]] = {"type": "type", "data": t}
317         except KeyError:
318             pass
319
320         try:
321             for t, v in api["aliases"].items():
322                 types["vl_api_" + t + "_t"] = {"type": "alias", "data": v}
323         except KeyError:
324             pass
325
326         try:
327             services.update(api["services"])
328         except KeyError:
329             pass
330
331         i = 0
332         while True:
333             unresolved = {}
334             for k, v in types.items():
335                 t = v["data"]
336                 if not vpp_get_type(k):
337                     if v["type"] == "enum":
338                         try:
339                             VPPEnumType(t[0], t[1:])
340                         except ValueError:
341                             unresolved[k] = v
342                 if not vpp_get_type(k):
343                     if v["type"] == "enumflag":
344                         try:
345                             VPPEnumFlagType(t[0], t[1:])
346                         except ValueError:
347                             unresolved[k] = v
348                     elif v["type"] == "union":
349                         try:
350                             VPPUnionType(t[0], t[1:])
351                         except ValueError:
352                             unresolved[k] = v
353                     elif v["type"] == "type":
354                         try:
355                             VPPType(t[0], t[1:])
356                         except ValueError:
357                             unresolved[k] = v
358                     elif v["type"] == "alias":
359                         try:
360                             VPPTypeAlias(k, t)
361                         except ValueError:
362                             unresolved[k] = v
363             if len(unresolved) == 0:
364                 break
365             if i > 3:
366                 raise VPPValueError("Unresolved type definitions {}".format(unresolved))
367             types = unresolved
368             i += 1
369         try:
370             for m in api["messages"]:
371                 try:
372                     messages[m[0]] = VPPMessage(m[0], m[1:])
373                 except VPPNotImplementedError:
374                     ### OLE FIXME
375                     logger.error("Not implemented error for {}".format(m[0]))
376         except KeyError:
377             pass
378         return messages, services
379
380
381 class VPPApiClient:
382     """VPP interface.
383
384     This class provides the APIs to VPP.  The APIs are loaded
385     from provided .api.json files and makes functions accordingly.
386     These functions are documented in the VPP .api files, as they
387     are dynamically created.
388
389     Additionally, VPP can send callback messages; this class
390     provides a means to register a callback function to receive
391     these messages in a background thread.
392     """
393
394     apidir = None
395     VPPApiError = VPPApiError
396     VPPRuntimeError = VPPRuntimeError
397     VPPValueError = VPPValueError
398     VPPNotImplementedError = VPPNotImplementedError
399     VPPIOError = VPPIOError
400
401     def __init__(
402         self,
403         *,
404         apifiles=None,
405         testmode=False,
406         async_thread=True,
407         logger=None,
408         loglevel=None,
409         read_timeout=5,
410         use_socket=True,
411         server_address="/run/vpp/api.sock",
412     ):
413         """Create a VPP API object.
414
415         apifiles is a list of files containing API
416         descriptions that will be loaded - methods will be
417         dynamically created reflecting these APIs.  If not
418         provided this will load the API files from VPP's
419         default install location.
420
421         logger, if supplied, is the logging logger object to log to.
422         loglevel, if supplied, is the log level this logger is set
423         to report at (from the loglevels in the logging module).
424         """
425         if logger is None:
426             logger = logging.getLogger(
427                 "{}.{}".format(__name__, self.__class__.__name__)
428             )
429             if loglevel is not None:
430                 logger.setLevel(loglevel)
431         self.logger = logger
432
433         self.messages = {}
434         self.services = {}
435         self.id_names = []
436         self.id_msgdef = []
437         self.header = VPPType("header", [["u16", "msgid"], ["u32", "client_index"]])
438         self.apifiles = []
439         self.event_callback = None
440         self.message_queue = queue.Queue()
441         self.read_timeout = read_timeout
442         self.async_thread = async_thread
443         self.event_thread = None
444         self.testmode = testmode
445         self.server_address = server_address
446         self._apifiles = apifiles
447         self.stats = {}
448
449         if not apifiles:
450             # Pick up API definitions from default directory
451             try:
452                 if isinstance(self.apidir, list):
453                     apifiles = []
454                     for d in self.apidir:
455                         apifiles += VPPApiJSONFiles.find_api_files(d)
456                 else:
457                     apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
458             except (RuntimeError, VPPApiError):
459                 # In test mode we don't care that we can't find the API files
460                 if testmode:
461                     apifiles = []
462                 else:
463                     raise VPPRuntimeError
464
465         for file in apifiles:
466             with open(file) as apidef_file:
467                 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
468                 self.messages.update(m)
469                 self.services.update(s)
470
471         self.apifiles = apifiles
472
473         # Basic sanity check
474         if len(self.messages) == 0 and not testmode:
475             raise VPPValueError(1, "Missing JSON message definitions")
476         if not (verify_enum_hint(VppEnum.vl_api_address_family_t)):
477             raise VPPRuntimeError("Invalid address family hints. " "Cannot continue.")
478
479         self.transport = VppTransport(
480             self, read_timeout=read_timeout, server_address=server_address
481         )
482         # Make sure we allow VPP to clean up the message rings.
483         atexit.register(vpp_atexit, weakref.ref(self))
484
485         add_convenience_methods()
486
487     def get_function(self, name):
488         return getattr(self._api, name)
489
490     class ContextId:
491         """Multiprocessing-safe provider of unique context IDs."""
492
493         def __init__(self):
494             self.context = mp.Value(ctypes.c_uint, 0)
495             self.lock = mp.Lock()
496
497         def __call__(self):
498             """Get a new unique (or, at least, not recently used) context."""
499             with self.lock:
500                 self.context.value += 1
501                 return self.context.value
502
503     get_context = ContextId()
504
505     def get_type(self, name):
506         return vpp_get_type(name)
507
508     @property
509     def api(self):
510         if not hasattr(self, "_api"):
511             raise VPPApiError("Not connected, api definitions not available")
512         return self._api
513
514     def make_function(self, msg, i, multipart, do_async):
515         if do_async:
516
517             def f(**kwargs):
518                 return self._call_vpp_async(i, msg, **kwargs)
519
520         else:
521
522             def f(**kwargs):
523                 return self._call_vpp(i, msg, multipart, **kwargs)
524
525         f.__name__ = str(msg.name)
526         f.__doc__ = ", ".join(
527             ["%s %s" % (msg.fieldtypes[j], k) for j, k in enumerate(msg.fields)]
528         )
529         f.msg = msg
530
531         return f
532
533     def make_pack_function(self, msg, i, multipart):
534         def f(**kwargs):
535             return self._call_vpp_pack(i, msg, **kwargs)
536
537         f.msg = msg
538         return f
539
540     def _register_functions(self, do_async=False):
541         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
542         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
543         self._api = VppApiDynamicMethodHolder()
544         for name, msg in self.messages.items():
545             n = name + "_" + msg.crc[2:]
546             i = self.transport.get_msg_index(n)
547             if i > 0:
548                 self.id_msgdef[i] = msg
549                 self.id_names[i] = name
550
551                 # Create function for client side messages.
552                 if name in self.services:
553                     f = self.make_function(msg, i, self.services[name], do_async)
554                     f_pack = self.make_pack_function(msg, i, self.services[name])
555                     setattr(self._api, name, FuncWrapper(f))
556                     setattr(self._api, name + "_pack", FuncWrapper(f_pack))
557             else:
558                 self.logger.debug("No such message type or failed CRC checksum: %s", n)
559
560     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, do_async):
561         pfx = chroot_prefix.encode("utf-8") if chroot_prefix else None
562
563         rv = self.transport.connect(name, pfx, msg_handler, rx_qlen, do_async)
564         if rv != 0:
565             raise VPPIOError(2, "Connect failed")
566         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
567         self._register_functions(do_async=do_async)
568
569         # Initialise control ping
570         crc = self.messages["control_ping"].crc
571         self.control_ping_index = self.transport.get_msg_index(
572             ("control_ping" + "_" + crc[2:])
573         )
574         self.control_ping_msgdef = self.messages["control_ping"]
575         if self.async_thread:
576             self.event_thread = threading.Thread(target=self.thread_msg_handler)
577             self.event_thread.daemon = True
578             self.event_thread.start()
579         else:
580             self.event_thread = None
581         return rv
582
583     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
584         """Attach to VPP.
585
586         name - the name of the client.
587         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
588         do_async - if true, messages are sent without waiting for a reply
589         rx_qlen - the length of the VPP message receive queue between
590         client and server.
591         """
592         msg_handler = self.transport.get_callback(do_async)
593         return self.connect_internal(
594             name, msg_handler, chroot_prefix, rx_qlen, do_async
595         )
596
597     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
598         """Attach to VPP in synchronous mode. Application must poll for events.
599
600         name - the name of the client.
601         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
602         rx_qlen - the length of the VPP message receive queue between
603         client and server.
604         """
605
606         return self.connect_internal(name, None, chroot_prefix, rx_qlen, do_async=False)
607
608     def disconnect(self):
609         """Detach from VPP."""
610         rv = self.transport.disconnect()
611         if self.event_thread is not None:
612             self.message_queue.put("terminate event thread")
613         return rv
614
615     def msg_handler_sync(self, msg):
616         """Process an incoming message from VPP in sync mode.
617
618         The message may be a reply or it may be an async notification.
619         """
620         r = self.decode_incoming_msg(msg)
621         if r is None:
622             return
623
624         # If we have a context, then use the context to find any
625         # request waiting for a reply
626         context = 0
627         if hasattr(r, "context") and r.context > 0:
628             context = r.context
629
630         if context == 0:
631             # No context -> async notification that we feed to the callback
632             self.message_queue.put_nowait(r)
633         else:
634             raise VPPIOError(2, "RPC reply message received in event handler")
635
636     def has_context(self, msg):
637         if len(msg) < 10:
638             return False
639
640         header = VPPType(
641             "header_with_context",
642             [["u16", "msgid"], ["u32", "client_index"], ["u32", "context"]],
643         )
644
645         (i, ci, context), size = header.unpack(msg, 0)
646         if self.id_names[i] == "rx_thread_exit":
647             return
648
649         #
650         # Decode message and returns a tuple.
651         #
652         msgobj = self.id_msgdef[i]
653         if "context" in msgobj.field_by_name and context >= 0:
654             return True
655         return False
656
657     def decode_incoming_msg(self, msg, no_type_conversion=False):
658         if not msg:
659             logger.warning("vpp_api.read failed")
660             return
661
662         (i, ci), size = self.header.unpack(msg, 0)
663         if self.id_names[i] == "rx_thread_exit":
664             return
665
666         #
667         # Decode message and returns a tuple.
668         #
669         msgobj = self.id_msgdef[i]
670         if not msgobj:
671             raise VPPIOError(2, "Reply message undefined")
672
673         r, size = msgobj.unpack(msg, ntc=no_type_conversion)
674         return r
675
676     def msg_handler_async(self, msg):
677         """Process a message from VPP in async mode.
678
679         In async mode, all messages are returned to the callback.
680         """
681         r = self.decode_incoming_msg(msg)
682         if r is None:
683             return
684
685         msgname = type(r).__name__
686
687         if self.event_callback:
688             self.event_callback(msgname, r)
689
690     def _control_ping(self, context):
691         """Send a ping command."""
692         self._call_vpp_async(
693             self.control_ping_index, self.control_ping_msgdef, context=context
694         )
695
696     def validate_args(self, msg, kwargs):
697         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
698         if d:
699             raise VPPValueError("Invalid argument {} to {}".format(list(d), msg.name))
700
701     def _add_stat(self, name, ms):
702         if not name in self.stats:
703             self.stats[name] = {"max": ms, "count": 1, "avg": ms}
704         else:
705             if ms > self.stats[name]["max"]:
706                 self.stats[name]["max"] = ms
707             self.stats[name]["count"] += 1
708             n = self.stats[name]["count"]
709             self.stats[name]["avg"] = self.stats[name]["avg"] * (n - 1) / n + ms / n
710
711     def get_stats(self):
712         s = "\n=== API PAPI STATISTICS ===\n"
713         s += "{:<30} {:>4} {:>6} {:>6}\n".format("message", "cnt", "avg", "max")
714         for n in sorted(self.stats.items(), key=lambda v: v[1]["avg"], reverse=True):
715             s += "{:<30} {:>4} {:>6.2f} {:>6.2f}\n".format(
716                 n[0], n[1]["count"], n[1]["avg"], n[1]["max"]
717             )
718         return s
719
720     def get_field_options(self, msg, fld_name):
721         # when there is an option, the msgdef has 3 elements.
722         # ['u32', 'ring_size', {'default': 1024}]
723         for _def in self.messages[msg].msgdef:
724             if isinstance(_def, list) and len(_def) == 3 and _def[1] == fld_name:
725                 return _def[2]
726
727     def _call_vpp(self, i, msgdef, service, **kwargs):
728         """Given a message, send the message and await a reply.
729
730         msgdef - the message packing definition
731         i - the message type index
732         multipart - True if the message returns multiple
733         messages in return.
734         context - context number - chosen at random if not
735         supplied.
736         The remainder of the kwargs are the arguments to the API call.
737
738         The return value is the message or message array containing
739         the response.  It will raise an IOError exception if there was
740         no response within the timeout window.
741         """
742         ts = time.time()
743         if "context" not in kwargs:
744             context = self.get_context()
745             kwargs["context"] = context
746         else:
747             context = kwargs["context"]
748         kwargs["_vl_msg_id"] = i
749
750         no_type_conversion = kwargs.pop("_no_type_conversion", False)
751         timeout = kwargs.pop("_timeout", None)
752
753         try:
754             if self.transport.socket_index:
755                 kwargs["client_index"] = self.transport.socket_index
756         except AttributeError:
757             pass
758         self.validate_args(msgdef, kwargs)
759
760         s = "Calling {}({})".format(
761             msgdef.name, ",".join(["{!r}:{!r}".format(k, v) for k, v in kwargs.items()])
762         )
763         self.logger.debug(s)
764
765         b = msgdef.pack(kwargs)
766         self.transport.suspend()
767
768         self.transport.write(b)
769
770         msgreply = service["reply"]
771         stream = True if "stream" in service else False
772         if stream:
773             if "stream_msg" in service:
774                 # New service['reply'] = _reply and service['stream_message'] = _details
775                 stream_message = service["stream_msg"]
776                 modern = True
777             else:
778                 # Old  service['reply'] = _details
779                 stream_message = msgreply
780                 msgreply = "control_ping_reply"
781                 modern = False
782                 # Send a ping after the request - we use its response
783                 # to detect that we have seen all results.
784                 self._control_ping(context)
785
786         # Block until we get a reply.
787         rl = []
788         while True:
789             r = self.read_blocking(no_type_conversion, timeout)
790             if r is None:
791                 raise VPPIOError(2, "VPP API client: read failed")
792             msgname = type(r).__name__
793             if context not in r or r.context == 0 or context != r.context:
794                 # Message being queued
795                 self.message_queue.put_nowait(r)
796                 continue
797             if msgname != msgreply and (stream and (msgname != stream_message)):
798                 print("REPLY MISMATCH", msgreply, msgname, stream_message, stream)
799             if not stream:
800                 rl = r
801                 break
802             if msgname == msgreply:
803                 if modern:  # Return both reply and list
804                     rl = r, rl
805                 break
806
807             rl.append(r)
808
809         self.transport.resume()
810
811         s = "Return value: {!r}".format(r)
812         if len(s) > 80:
813             s = s[:80] + "..."
814         self.logger.debug(s)
815         te = time.time()
816         self._add_stat(msgdef.name, (te - ts) * 1000)
817         return rl
818
819     def _call_vpp_async(self, i, msg, **kwargs):
820         """Given a message, send the message and return the context.
821
822         msgdef - the message packing definition
823         i - the message type index
824         context - context number - chosen at random if not
825         supplied.
826         The remainder of the kwargs are the arguments to the API call.
827
828         The reply message(s) will be delivered later to the registered callback.
829         The returned context will help with assigning which call
830         the reply belongs to.
831         """
832         if "context" not in kwargs:
833             context = self.get_context()
834             kwargs["context"] = context
835         else:
836             context = kwargs["context"]
837         try:
838             if self.transport.socket_index:
839                 kwargs["client_index"] = self.transport.socket_index
840         except AttributeError:
841             kwargs["client_index"] = 0
842         kwargs["_vl_msg_id"] = i
843         b = msg.pack(kwargs)
844
845         self.transport.write(b)
846         return context
847
848     def _call_vpp_pack(self, i, msg, **kwargs):
849         """Given a message, return the binary representation."""
850         kwargs["_vl_msg_id"] = i
851         kwargs["client_index"] = 0
852         kwargs["context"] = 0
853         return msg.pack(kwargs)
854
855     def read_blocking(self, no_type_conversion=False, timeout=None):
856         """Get next received message from transport within timeout, decoded.
857
858         Note that notifications have context zero
859         and are not put into receive queue (at least for socket transport),
860         use async_thread with registered callback for processing them.
861
862         If no message appears in the queue within timeout, return None.
863
864         Optionally, type conversion can be skipped,
865         as some of conversions are into less precise types.
866
867         When r is the return value of this, the caller can get message name as:
868             msgname = type(r).__name__
869         and context number (type long) as:
870             context = r.context
871
872         :param no_type_conversion: If false, type conversions are applied.
873         :type no_type_conversion: bool
874         :returns: Decoded message, or None if no message (within timeout).
875         :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
876         :raises VppTransportShmemIOError if timed out.
877         """
878         msg = self.transport.read(timeout=timeout)
879         if not msg:
880             return None
881         return self.decode_incoming_msg(msg, no_type_conversion)
882
883     def register_event_callback(self, callback):
884         """Register a callback for async messages.
885
886         This will be called for async notifications in sync mode,
887         and all messages in async mode.  In sync mode, replies to
888         requests will not come here.
889
890         callback is a fn(msg_type_name, msg_type) that will be
891         called when a message comes in.  While this function is
892         executing, note that (a) you are in a background thread and
893         may wish to use threading.Lock to protect your datastructures,
894         and (b) message processing from VPP will stop (so if you take
895         a long while about it you may provoke reply timeouts or cause
896         VPP to fill the RX buffer).  Passing None will disable the
897         callback.
898         """
899         self.event_callback = callback
900
901     def thread_msg_handler(self):
902         """Python thread calling the user registered message handler.
903
904         This is to emulate the old style event callback scheme. Modern
905         clients should provide their own thread to poll the event
906         queue.
907         """
908         while True:
909             r = self.message_queue.get()
910             if r == "terminate event thread":
911                 break
912             msgname = type(r).__name__
913             if self.event_callback:
914                 self.event_callback(msgname, r)
915
916     def validate_message_table(self, namecrctable):
917         """Take a dictionary of name_crc message names
918         and returns an array of missing messages"""
919
920         missing_table = []
921         for name_crc in namecrctable:
922             i = self.transport.get_msg_index(name_crc)
923             if i <= 0:
924                 missing_table.append(name_crc)
925         return missing_table
926
927     def dump_message_table(self):
928         """Return VPPs API message table as name_crc dictionary"""
929         return self.transport.message_table
930
931     def dump_message_table_filtered(self, msglist):
932         """Return VPPs API message table as name_crc dictionary,
933         filtered by message name list."""
934
935         replies = [self.services[n]["reply"] for n in msglist]
936         message_table_filtered = {}
937         for name in msglist + replies:
938             for k, v in self.transport.message_table.items():
939                 if k.startswith(name):
940                     message_table_filtered[k] = v
941                     break
942         return message_table_filtered
943
944     def __repr__(self):
945         return (
946             "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, "
947             "logger=%s, read_timeout=%s, "
948             "server_address='%s'>"
949             % (
950                 self._apifiles,
951                 self.testmode,
952                 self.async_thread,
953                 self.logger,
954                 self.read_timeout,
955                 self.server_address,
956             )
957         )
958
959     def details_iter(self, f, **kwargs):
960         cursor = 0
961         while True:
962             kwargs["cursor"] = cursor
963             rv, details = f(**kwargs)
964             for d in details:
965                 yield d
966             if rv.retval == 0 or rv.retval != -165:
967                 break
968             cursor = rv.cursor