papi: fix VPP_API_DIR
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import ctypes
20 import ipaddress
21 import sys
22 import multiprocessing as mp
23 import os
24 import queue
25 import logging
26 import functools
27 import json
28 import threading
29 import fnmatch
30 import weakref
31 import atexit
32 import time
33 from .vpp_format import verify_enum_hint
34 from .vpp_serializer import VPPType, VPPEnumType, VPPEnumFlagType, VPPUnionType
35 from .vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
36
37 try:
38     import VppTransport
39 except ModuleNotFoundError:
40
41     class V:
42         """placeholder for VppTransport as the implementation is dependent on
43         VPPAPIClient's initialization values
44         """
45
46     VppTransport = V
47
48 from .vpp_transport_socket import VppTransport
49
50 logger = logging.getLogger("vpp_papi")
51 logger.addHandler(logging.NullHandler())
52
53 __all__ = (
54     "FuncWrapper",
55     "VppApiDynamicMethodHolder",
56     "VppEnum",
57     "VppEnumType",
58     "VppEnumFlag",
59     "VPPIOError",
60     "VPPRuntimeError",
61     "VPPValueError",
62     "VPPApiClient",
63 )
64
65
66 def metaclass(metaclass):
67     @functools.wraps(metaclass)
68     def wrapper(cls):
69         return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
70
71     return wrapper
72
73
74 class VppEnumType(type):
75     def __getattr__(cls, name):
76         t = vpp_get_type(name)
77         return t.enum
78
79
80 @metaclass(VppEnumType)
81 class VppEnum:
82     pass
83
84
85 @metaclass(VppEnumType)
86 class VppEnumFlag:
87     pass
88
89
90 def vpp_atexit(vpp_weakref):
91     """Clean up VPP connection on shutdown."""
92     vpp_instance = vpp_weakref()
93     if vpp_instance and vpp_instance.transport.connected:
94         logger.debug("Cleaning up VPP on exit")
95         vpp_instance.disconnect()
96
97
98 def add_convenience_methods():
99     # provide convenience methods to IP[46]Address.vapi_af
100     def _vapi_af(self):
101         if 6 == self._version:
102             return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
103         if 4 == self._version:
104             return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
105         raise ValueError("Invalid _version.")
106
107     def _vapi_af_name(self):
108         if 6 == self._version:
109             return "ip6"
110         if 4 == self._version:
111             return "ip4"
112         raise ValueError("Invalid _version.")
113
114     ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
115     ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
116
117
118 class VppApiDynamicMethodHolder:
119     pass
120
121
122 class FuncWrapper:
123     def __init__(self, func):
124         self._func = func
125         self.__name__ = func.__name__
126         self.__doc__ = func.__doc__
127
128     def __call__(self, **kwargs):
129         return self._func(**kwargs)
130
131     def __repr__(self):
132         return "<FuncWrapper(func=<%s(%s)>)>" % (self.__name__, self.__doc__)
133
134
135 class VPPApiError(Exception):
136     pass
137
138
139 class VPPNotImplementedError(NotImplementedError):
140     pass
141
142
143 class VPPIOError(IOError):
144     pass
145
146
147 class VPPRuntimeError(RuntimeError):
148     pass
149
150
151 class VPPValueError(ValueError):
152     pass
153
154
155 class VPPApiJSONFiles:
156     @classmethod
157     def find_api_dir(cls, dirs=[]):
158         """Attempt to find the best directory in which API definition
159         files may reside. If the value VPP_API_DIR exists in the environment
160         then it is first on the search list. If we're inside a recognized
161         location in a VPP source tree (src/scripts and src/vpp-api/python)
162         then entries from there to the likely locations in build-root are
163         added. Finally the location used by system packages is added.
164
165         :returns: A single directory name, or None if no such directory
166             could be found.
167         """
168
169         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
170         # in which case, plot a course to likely places in the src tree
171         import __main__ as main
172
173         if os.getenv("VPP_API_DIR"):
174             dirs.append(os.getenv("VPP_API_DIR"))
175
176         if hasattr(main, "__file__"):
177             # get the path of the calling script
178             localdir = os.path.dirname(os.path.realpath(main.__file__))
179         else:
180             # use cwd if there is no calling script
181             localdir = os.getcwd()
182         localdir_s = localdir.split(os.path.sep)
183
184         def dmatch(dir):
185             """Match dir against right-hand components of the script dir"""
186             d = dir.split("/")  # param 'dir' assumes a / separator
187             length = len(d)
188             return len(localdir_s) > length and localdir_s[-length:] == d
189
190         def sdir(srcdir, variant):
191             """Build a path from srcdir to the staged API files of
192             'variant'  (typically '' or '_debug')"""
193             # Since 'core' and 'plugin' files are staged
194             # in separate directories, we target the parent dir.
195             return os.path.sep.join(
196                 (
197                     srcdir,
198                     "build-root",
199                     "install-vpp%s-native" % variant,
200                     "vpp",
201                     "share",
202                     "vpp",
203                     "api",
204                 )
205             )
206
207         srcdir = None
208         if dmatch("src/scripts"):
209             srcdir = os.path.sep.join(localdir_s[:-2])
210         elif dmatch("src/vpp-api/python"):
211             srcdir = os.path.sep.join(localdir_s[:-3])
212         elif dmatch("test"):
213             # we're apparently running tests
214             srcdir = os.path.sep.join(localdir_s[:-1])
215
216         if srcdir:
217             # we're in the source tree, try both the debug and release
218             # variants.
219             dirs.append(sdir(srcdir, "_debug"))
220             dirs.append(sdir(srcdir, ""))
221
222         # Test for staged copies of the scripts
223         # For these, since we explicitly know if we're running a debug versus
224         # release variant, target only the relevant directory
225         if dmatch("build-root/install-vpp_debug-native/vpp/bin"):
226             srcdir = os.path.sep.join(localdir_s[:-4])
227             dirs.append(sdir(srcdir, "_debug"))
228         if dmatch("build-root/install-vpp-native/vpp/bin"):
229             srcdir = os.path.sep.join(localdir_s[:-4])
230             dirs.append(sdir(srcdir, ""))
231
232         # finally, try the location system packages typically install into
233         dirs.append(os.path.sep.join(("", "usr", "share", "vpp", "api")))
234
235         # check the directories for existence; first one wins
236         for dir in dirs:
237             if os.path.isdir(dir):
238                 return dir
239
240         return None
241
242     @classmethod
243     def find_api_files(cls, api_dir=None, patterns="*"):  # -> list
244         """Find API definition files from the given directory tree with the
245         given pattern. If no directory is given then find_api_dir() is used
246         to locate one. If no pattern is given then all definition files found
247         in the directory tree are used.
248
249         :param api_dir: A directory tree in which to locate API definition
250             files; subdirectories are descended into.
251             If this is None then find_api_dir() is called to discover it.
252         :param patterns: A list of patterns to use in each visited directory
253             when looking for files.
254             This can be a list/tuple object or a comma-separated string of
255             patterns. Each value in the list will have leading/trialing
256             whitespace stripped.
257             The pattern specifies the first part of the filename, '.api.json'
258             is appended.
259             The results are de-duplicated, thus overlapping patterns are fine.
260             If this is None it defaults to '*' meaning "all API files".
261         :returns: A list of file paths for the API files found.
262         """
263         if api_dir is None:
264             api_dir = cls.find_api_dir([])
265             if api_dir is None:
266                 raise VPPApiError("api_dir cannot be located")
267
268         if isinstance(patterns, list) or isinstance(patterns, tuple):
269             patterns = [p.strip() + ".api.json" for p in patterns]
270         else:
271             patterns = [p.strip() + ".api.json" for p in patterns.split(",")]
272
273         api_files = []
274         for root, dirnames, files in os.walk(api_dir):
275             # iterate all given patterns and de-dup the result
276             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
277             for filename in files:
278                 api_files.append(os.path.join(root, filename))
279
280         return api_files
281
282     @classmethod
283     def process_json_file(self, apidef_file):
284         api = json.load(apidef_file)
285         return self._process_json(api)
286
287     @classmethod
288     def process_json_str(self, json_str):
289         api = json.loads(json_str)
290         return self._process_json(api)
291
292     @staticmethod
293     def _process_json(api):  # -> Tuple[Dict, Dict]
294         types = {}
295         services = {}
296         messages = {}
297         try:
298             for t in api["enums"]:
299                 t[0] = "vl_api_" + t[0] + "_t"
300                 types[t[0]] = {"type": "enum", "data": t}
301         except KeyError:
302             pass
303         try:
304             for t in api["enumflags"]:
305                 t[0] = "vl_api_" + t[0] + "_t"
306                 types[t[0]] = {"type": "enum", "data": t}
307         except KeyError:
308             pass
309         try:
310             for t in api["unions"]:
311                 t[0] = "vl_api_" + t[0] + "_t"
312                 types[t[0]] = {"type": "union", "data": t}
313         except KeyError:
314             pass
315
316         try:
317             for t in api["types"]:
318                 t[0] = "vl_api_" + t[0] + "_t"
319                 types[t[0]] = {"type": "type", "data": t}
320         except KeyError:
321             pass
322
323         try:
324             for t, v in api["aliases"].items():
325                 types["vl_api_" + t + "_t"] = {"type": "alias", "data": v}
326         except KeyError:
327             pass
328
329         try:
330             services.update(api["services"])
331         except KeyError:
332             pass
333
334         i = 0
335         while True:
336             unresolved = {}
337             for k, v in types.items():
338                 t = v["data"]
339                 if not vpp_get_type(k):
340                     if v["type"] == "enum":
341                         try:
342                             VPPEnumType(t[0], t[1:])
343                         except ValueError:
344                             unresolved[k] = v
345                 if not vpp_get_type(k):
346                     if v["type"] == "enumflag":
347                         try:
348                             VPPEnumFlagType(t[0], t[1:])
349                         except ValueError:
350                             unresolved[k] = v
351                     elif v["type"] == "union":
352                         try:
353                             VPPUnionType(t[0], t[1:])
354                         except ValueError:
355                             unresolved[k] = v
356                     elif v["type"] == "type":
357                         try:
358                             VPPType(t[0], t[1:])
359                         except ValueError:
360                             unresolved[k] = v
361                     elif v["type"] == "alias":
362                         try:
363                             VPPTypeAlias(k, t)
364                         except ValueError:
365                             unresolved[k] = v
366             if len(unresolved) == 0:
367                 break
368             if i > 3:
369                 raise VPPValueError("Unresolved type definitions {}".format(unresolved))
370             types = unresolved
371             i += 1
372         try:
373             for m in api["messages"]:
374                 try:
375                     messages[m[0]] = VPPMessage(m[0], m[1:])
376                 except VPPNotImplementedError:
377                     ### OLE FIXME
378                     logger.error("Not implemented error for {}".format(m[0]))
379         except KeyError:
380             pass
381         return messages, services
382
383
384 class VPPApiClient:
385     """VPP interface.
386
387     This class provides the APIs to VPP.  The APIs are loaded
388     from provided .api.json files and makes functions accordingly.
389     These functions are documented in the VPP .api files, as they
390     are dynamically created.
391
392     Additionally, VPP can send callback messages; this class
393     provides a means to register a callback function to receive
394     these messages in a background thread.
395     """
396
397     apidir = None
398     VPPApiError = VPPApiError
399     VPPRuntimeError = VPPRuntimeError
400     VPPValueError = VPPValueError
401     VPPNotImplementedError = VPPNotImplementedError
402     VPPIOError = VPPIOError
403
404     def __init__(
405         self,
406         *,
407         apifiles=None,
408         testmode=False,
409         async_thread=True,
410         logger=None,
411         loglevel=None,
412         read_timeout=5,
413         use_socket=True,
414         server_address="/run/vpp/api.sock",
415     ):
416         """Create a VPP API object.
417
418         apifiles is a list of files containing API
419         descriptions that will be loaded - methods will be
420         dynamically created reflecting these APIs.  If not
421         provided this will load the API files from VPP's
422         default install location.
423
424         logger, if supplied, is the logging logger object to log to.
425         loglevel, if supplied, is the log level this logger is set
426         to report at (from the loglevels in the logging module).
427         """
428         if logger is None:
429             logger = logging.getLogger(
430                 "{}.{}".format(__name__, self.__class__.__name__)
431             )
432             if loglevel is not None:
433                 logger.setLevel(loglevel)
434         self.logger = logger
435
436         self.messages = {}
437         self.services = {}
438         self.id_names = []
439         self.id_msgdef = []
440         self.header = VPPType("header", [["u16", "msgid"], ["u32", "client_index"]])
441         self.apifiles = []
442         self.event_callback = None
443         self.message_queue = queue.Queue()
444         self.read_timeout = read_timeout
445         self.async_thread = async_thread
446         self.event_thread = None
447         self.testmode = testmode
448         self.server_address = server_address
449         self._apifiles = apifiles
450         self.stats = {}
451
452         if not apifiles:
453             # Pick up API definitions from default directory
454             try:
455                 if isinstance(self.apidir, list):
456                     apifiles = []
457                     for d in self.apidir:
458                         apifiles += VPPApiJSONFiles.find_api_files(d)
459                 else:
460                     apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
461             except (RuntimeError, VPPApiError):
462                 # In test mode we don't care that we can't find the API files
463                 if testmode:
464                     apifiles = []
465                 else:
466                     raise VPPRuntimeError
467
468         for file in apifiles:
469             with open(file) as apidef_file:
470                 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
471                 self.messages.update(m)
472                 self.services.update(s)
473
474         self.apifiles = apifiles
475
476         # Basic sanity check
477         if len(self.messages) == 0 and not testmode:
478             raise VPPValueError(1, "Missing JSON message definitions")
479         if not (verify_enum_hint(VppEnum.vl_api_address_family_t)):
480             raise VPPRuntimeError("Invalid address family hints. " "Cannot continue.")
481
482         self.transport = VppTransport(
483             self, read_timeout=read_timeout, server_address=server_address
484         )
485         # Make sure we allow VPP to clean up the message rings.
486         atexit.register(vpp_atexit, weakref.ref(self))
487
488         add_convenience_methods()
489
490     def get_function(self, name):
491         return getattr(self._api, name)
492
493     class ContextId:
494         """Multiprocessing-safe provider of unique context IDs."""
495
496         def __init__(self):
497             self.context = mp.Value(ctypes.c_uint, 0)
498             self.lock = mp.Lock()
499
500         def __call__(self):
501             """Get a new unique (or, at least, not recently used) context."""
502             with self.lock:
503                 self.context.value += 1
504                 return self.context.value
505
506     get_context = ContextId()
507
508     def get_type(self, name):
509         return vpp_get_type(name)
510
511     @property
512     def api(self):
513         if not hasattr(self, "_api"):
514             raise VPPApiError("Not connected, api definitions not available")
515         return self._api
516
517     def make_function(self, msg, i, multipart, do_async):
518         if do_async:
519
520             def f(**kwargs):
521                 return self._call_vpp_async(i, msg, **kwargs)
522
523         else:
524
525             def f(**kwargs):
526                 return self._call_vpp(i, msg, multipart, **kwargs)
527
528         f.__name__ = str(msg.name)
529         f.__doc__ = ", ".join(
530             ["%s %s" % (msg.fieldtypes[j], k) for j, k in enumerate(msg.fields)]
531         )
532         f.msg = msg
533
534         return f
535
536     def make_pack_function(self, msg, i, multipart):
537         def f(**kwargs):
538             return self._call_vpp_pack(i, msg, **kwargs)
539
540         f.msg = msg
541         return f
542
543     def _register_functions(self, do_async=False):
544         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
545         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
546         self._api = VppApiDynamicMethodHolder()
547         for name, msg in self.messages.items():
548             n = name + "_" + msg.crc[2:]
549             i = self.transport.get_msg_index(n)
550             if i > 0:
551                 self.id_msgdef[i] = msg
552                 self.id_names[i] = name
553
554                 # Create function for client side messages.
555                 if name in self.services:
556                     f = self.make_function(msg, i, self.services[name], do_async)
557                     f_pack = self.make_pack_function(msg, i, self.services[name])
558                     setattr(self._api, name, FuncWrapper(f))
559                     setattr(self._api, name + "_pack", FuncWrapper(f_pack))
560             else:
561                 self.logger.debug("No such message type or failed CRC checksum: %s", n)
562
563     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, do_async):
564         pfx = chroot_prefix.encode("utf-8") if chroot_prefix else None
565
566         rv = self.transport.connect(name, pfx, msg_handler, rx_qlen, do_async)
567         if rv != 0:
568             raise VPPIOError(2, "Connect failed")
569         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
570         self._register_functions(do_async=do_async)
571
572         # Initialise control ping
573         crc = self.messages["control_ping"].crc
574         self.control_ping_index = self.transport.get_msg_index(
575             ("control_ping" + "_" + crc[2:])
576         )
577         self.control_ping_msgdef = self.messages["control_ping"]
578         if self.async_thread:
579             self.event_thread = threading.Thread(target=self.thread_msg_handler)
580             self.event_thread.daemon = True
581             self.event_thread.start()
582         else:
583             self.event_thread = None
584         return rv
585
586     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
587         """Attach to VPP.
588
589         name - the name of the client.
590         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
591         do_async - if true, messages are sent without waiting for a reply
592         rx_qlen - the length of the VPP message receive queue between
593         client and server.
594         """
595         msg_handler = self.transport.get_callback(do_async)
596         return self.connect_internal(
597             name, msg_handler, chroot_prefix, rx_qlen, do_async
598         )
599
600     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
601         """Attach to VPP in synchronous mode. Application must poll for events.
602
603         name - the name of the client.
604         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
605         rx_qlen - the length of the VPP message receive queue between
606         client and server.
607         """
608
609         return self.connect_internal(name, None, chroot_prefix, rx_qlen, do_async=False)
610
611     def disconnect(self):
612         """Detach from VPP."""
613         rv = self.transport.disconnect()
614         if self.event_thread is not None:
615             self.message_queue.put("terminate event thread")
616         return rv
617
618     def msg_handler_sync(self, msg):
619         """Process an incoming message from VPP in sync mode.
620
621         The message may be a reply or it may be an async notification.
622         """
623         r = self.decode_incoming_msg(msg)
624         if r is None:
625             return
626
627         # If we have a context, then use the context to find any
628         # request waiting for a reply
629         context = 0
630         if hasattr(r, "context") and r.context > 0:
631             context = r.context
632
633         if context == 0:
634             # No context -> async notification that we feed to the callback
635             self.message_queue.put_nowait(r)
636         else:
637             raise VPPIOError(2, "RPC reply message received in event handler")
638
639     def has_context(self, msg):
640         if len(msg) < 10:
641             return False
642
643         header = VPPType(
644             "header_with_context",
645             [["u16", "msgid"], ["u32", "client_index"], ["u32", "context"]],
646         )
647
648         (i, ci, context), size = header.unpack(msg, 0)
649         if self.id_names[i] == "rx_thread_exit":
650             return
651
652         #
653         # Decode message and returns a tuple.
654         #
655         msgobj = self.id_msgdef[i]
656         if "context" in msgobj.field_by_name and context >= 0:
657             return True
658         return False
659
660     def decode_incoming_msg(self, msg, no_type_conversion=False):
661         if not msg:
662             logger.warning("vpp_api.read failed")
663             return
664
665         (i, ci), size = self.header.unpack(msg, 0)
666         if self.id_names[i] == "rx_thread_exit":
667             return
668
669         #
670         # Decode message and returns a tuple.
671         #
672         msgobj = self.id_msgdef[i]
673         if not msgobj:
674             raise VPPIOError(2, "Reply message undefined")
675
676         r, size = msgobj.unpack(msg, ntc=no_type_conversion)
677         return r
678
679     def msg_handler_async(self, msg):
680         """Process a message from VPP in async mode.
681
682         In async mode, all messages are returned to the callback.
683         """
684         r = self.decode_incoming_msg(msg)
685         if r is None:
686             return
687
688         msgname = type(r).__name__
689
690         if self.event_callback:
691             self.event_callback(msgname, r)
692
693     def _control_ping(self, context):
694         """Send a ping command."""
695         self._call_vpp_async(
696             self.control_ping_index, self.control_ping_msgdef, context=context
697         )
698
699     def validate_args(self, msg, kwargs):
700         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
701         if d:
702             raise VPPValueError("Invalid argument {} to {}".format(list(d), msg.name))
703
704     def _add_stat(self, name, ms):
705         if not name in self.stats:
706             self.stats[name] = {"max": ms, "count": 1, "avg": ms}
707         else:
708             if ms > self.stats[name]["max"]:
709                 self.stats[name]["max"] = ms
710             self.stats[name]["count"] += 1
711             n = self.stats[name]["count"]
712             self.stats[name]["avg"] = self.stats[name]["avg"] * (n - 1) / n + ms / n
713
714     def get_stats(self):
715         s = "\n=== API PAPI STATISTICS ===\n"
716         s += "{:<30} {:>4} {:>6} {:>6}\n".format("message", "cnt", "avg", "max")
717         for n in sorted(self.stats.items(), key=lambda v: v[1]["avg"], reverse=True):
718             s += "{:<30} {:>4} {:>6.2f} {:>6.2f}\n".format(
719                 n[0], n[1]["count"], n[1]["avg"], n[1]["max"]
720             )
721         return s
722
723     def get_field_options(self, msg, fld_name):
724         # when there is an option, the msgdef has 3 elements.
725         # ['u32', 'ring_size', {'default': 1024}]
726         for _def in self.messages[msg].msgdef:
727             if isinstance(_def, list) and len(_def) == 3 and _def[1] == fld_name:
728                 return _def[2]
729
730     def _call_vpp(self, i, msgdef, service, **kwargs):
731         """Given a message, send the message and await a reply.
732
733         msgdef - the message packing definition
734         i - the message type index
735         multipart - True if the message returns multiple
736         messages in return.
737         context - context number - chosen at random if not
738         supplied.
739         The remainder of the kwargs are the arguments to the API call.
740
741         The return value is the message or message array containing
742         the response.  It will raise an IOError exception if there was
743         no response within the timeout window.
744         """
745         ts = time.time()
746         if "context" not in kwargs:
747             context = self.get_context()
748             kwargs["context"] = context
749         else:
750             context = kwargs["context"]
751         kwargs["_vl_msg_id"] = i
752
753         no_type_conversion = kwargs.pop("_no_type_conversion", False)
754         timeout = kwargs.pop("_timeout", None)
755
756         try:
757             if self.transport.socket_index:
758                 kwargs["client_index"] = self.transport.socket_index
759         except AttributeError:
760             pass
761         self.validate_args(msgdef, kwargs)
762
763         s = "Calling {}({})".format(
764             msgdef.name, ",".join(["{!r}:{!r}".format(k, v) for k, v in kwargs.items()])
765         )
766         self.logger.debug(s)
767
768         b = msgdef.pack(kwargs)
769         self.transport.suspend()
770
771         self.transport.write(b)
772
773         msgreply = service["reply"]
774         stream = True if "stream" in service else False
775         if stream:
776             if "stream_msg" in service:
777                 # New service['reply'] = _reply and service['stream_message'] = _details
778                 stream_message = service["stream_msg"]
779                 modern = True
780             else:
781                 # Old  service['reply'] = _details
782                 stream_message = msgreply
783                 msgreply = "control_ping_reply"
784                 modern = False
785                 # Send a ping after the request - we use its response
786                 # to detect that we have seen all results.
787                 self._control_ping(context)
788
789         # Block until we get a reply.
790         rl = []
791         while True:
792             r = self.read_blocking(no_type_conversion, timeout)
793             if r is None:
794                 raise VPPIOError(2, "VPP API client: read failed")
795             msgname = type(r).__name__
796             if context not in r or r.context == 0 or context != r.context:
797                 # Message being queued
798                 self.message_queue.put_nowait(r)
799                 continue
800             if msgname != msgreply and (stream and (msgname != stream_message)):
801                 print("REPLY MISMATCH", msgreply, msgname, stream_message, stream)
802             if not stream:
803                 rl = r
804                 break
805             if msgname == msgreply:
806                 if modern:  # Return both reply and list
807                     rl = r, rl
808                 break
809
810             rl.append(r)
811
812         self.transport.resume()
813
814         s = "Return value: {!r}".format(r)
815         if len(s) > 80:
816             s = s[:80] + "..."
817         self.logger.debug(s)
818         te = time.time()
819         self._add_stat(msgdef.name, (te - ts) * 1000)
820         return rl
821
822     def _call_vpp_async(self, i, msg, **kwargs):
823         """Given a message, send the message and return the context.
824
825         msgdef - the message packing definition
826         i - the message type index
827         context - context number - chosen at random if not
828         supplied.
829         The remainder of the kwargs are the arguments to the API call.
830
831         The reply message(s) will be delivered later to the registered callback.
832         The returned context will help with assigning which call
833         the reply belongs to.
834         """
835         if "context" not in kwargs:
836             context = self.get_context()
837             kwargs["context"] = context
838         else:
839             context = kwargs["context"]
840         try:
841             if self.transport.socket_index:
842                 kwargs["client_index"] = self.transport.socket_index
843         except AttributeError:
844             kwargs["client_index"] = 0
845         kwargs["_vl_msg_id"] = i
846         b = msg.pack(kwargs)
847
848         self.transport.write(b)
849         return context
850
851     def _call_vpp_pack(self, i, msg, **kwargs):
852         """Given a message, return the binary representation."""
853         kwargs["_vl_msg_id"] = i
854         kwargs["client_index"] = 0
855         kwargs["context"] = 0
856         return msg.pack(kwargs)
857
858     def read_blocking(self, no_type_conversion=False, timeout=None):
859         """Get next received message from transport within timeout, decoded.
860
861         Note that notifications have context zero
862         and are not put into receive queue (at least for socket transport),
863         use async_thread with registered callback for processing them.
864
865         If no message appears in the queue within timeout, return None.
866
867         Optionally, type conversion can be skipped,
868         as some of conversions are into less precise types.
869
870         When r is the return value of this, the caller can get message name as:
871             msgname = type(r).__name__
872         and context number (type long) as:
873             context = r.context
874
875         :param no_type_conversion: If false, type conversions are applied.
876         :type no_type_conversion: bool
877         :returns: Decoded message, or None if no message (within timeout).
878         :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
879         :raises VppTransportShmemIOError if timed out.
880         """
881         msg = self.transport.read(timeout=timeout)
882         if not msg:
883             return None
884         return self.decode_incoming_msg(msg, no_type_conversion)
885
886     def register_event_callback(self, callback):
887         """Register a callback for async messages.
888
889         This will be called for async notifications in sync mode,
890         and all messages in async mode.  In sync mode, replies to
891         requests will not come here.
892
893         callback is a fn(msg_type_name, msg_type) that will be
894         called when a message comes in.  While this function is
895         executing, note that (a) you are in a background thread and
896         may wish to use threading.Lock to protect your datastructures,
897         and (b) message processing from VPP will stop (so if you take
898         a long while about it you may provoke reply timeouts or cause
899         VPP to fill the RX buffer).  Passing None will disable the
900         callback.
901         """
902         self.event_callback = callback
903
904     def thread_msg_handler(self):
905         """Python thread calling the user registered message handler.
906
907         This is to emulate the old style event callback scheme. Modern
908         clients should provide their own thread to poll the event
909         queue.
910         """
911         while True:
912             r = self.message_queue.get()
913             if r == "terminate event thread":
914                 break
915             msgname = type(r).__name__
916             if self.event_callback:
917                 self.event_callback(msgname, r)
918
919     def validate_message_table(self, namecrctable):
920         """Take a dictionary of name_crc message names
921         and returns an array of missing messages"""
922
923         missing_table = []
924         for name_crc in namecrctable:
925             i = self.transport.get_msg_index(name_crc)
926             if i <= 0:
927                 missing_table.append(name_crc)
928         return missing_table
929
930     def dump_message_table(self):
931         """Return VPPs API message table as name_crc dictionary"""
932         return self.transport.message_table
933
934     def dump_message_table_filtered(self, msglist):
935         """Return VPPs API message table as name_crc dictionary,
936         filtered by message name list."""
937
938         replies = [self.services[n]["reply"] for n in msglist]
939         message_table_filtered = {}
940         for name in msglist + replies:
941             for k, v in self.transport.message_table.items():
942                 if k.startswith(name):
943                     message_table_filtered[k] = v
944                     break
945         return message_table_filtered
946
947     def __repr__(self):
948         return (
949             "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, "
950             "logger=%s, read_timeout=%s, "
951             "server_address='%s'>"
952             % (
953                 self._apifiles,
954                 self.testmode,
955                 self.async_thread,
956                 self.logger,
957                 self.read_timeout,
958                 self.server_address,
959             )
960         )
961
962     def details_iter(self, f, **kwargs):
963         cursor = 0
964         while True:
965             kwargs["cursor"] = cursor
966             rv, details = f(**kwargs)
967             for d in details:
968                 yield d
969             if rv.retval == 0 or rv.retval != -165:
970                 break
971             cursor = rv.cursor