vcl: allow more rx events on peek
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import ctypes
20 import ipaddress
21 import multiprocessing as mp
22 import os
23 import queue
24 import logging
25 import functools
26 import json
27 import threading
28 import fnmatch
29 import weakref
30 import atexit
31 import time
32 import pkg_resources
33 from .vpp_format import verify_enum_hint
34 from .vpp_serializer import VPPType, VPPEnumType, VPPEnumFlagType, VPPUnionType
35 from .vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
36
37 try:
38     import VppTransport
39 except ModuleNotFoundError:
40
41     class V:
42         """placeholder for VppTransport as the implementation is dependent on
43         VPPAPIClient's initialization values
44         """
45
46     VppTransport = V
47
48 from .vpp_transport_socket import VppTransport
49
50 logger = logging.getLogger("vpp_papi")
51 logger.addHandler(logging.NullHandler())
52
53 __all__ = (
54     "FuncWrapper",
55     "VppApiDynamicMethodHolder",
56     "VppEnum",
57     "VppEnumType",
58     "VppEnumFlag",
59     "VPPIOError",
60     "VPPRuntimeError",
61     "VPPValueError",
62     "VPPApiClient",
63 )
64
65
66 def metaclass(metaclass):
67     @functools.wraps(metaclass)
68     def wrapper(cls):
69         return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
70
71     return wrapper
72
73
74 class VppEnumType(type):
75     def __getattr__(cls, name):
76         t = vpp_get_type(name)
77         return t.enum
78
79
80 @metaclass(VppEnumType)
81 class VppEnum:
82     pass
83
84
85 @metaclass(VppEnumType)
86 class VppEnumFlag:
87     pass
88
89
90 def vpp_atexit(vpp_weakref):
91     """Clean up VPP connection on shutdown."""
92     vpp_instance = vpp_weakref()
93     if vpp_instance and vpp_instance.transport.connected:
94         logger.debug("Cleaning up VPP on exit")
95         vpp_instance.disconnect()
96
97
98 def add_convenience_methods():
99     # provide convenience methods to IP[46]Address.vapi_af
100     def _vapi_af(self):
101         if 6 == self._version:
102             return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
103         if 4 == self._version:
104             return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
105         raise ValueError("Invalid _version.")
106
107     def _vapi_af_name(self):
108         if 6 == self._version:
109             return "ip6"
110         if 4 == self._version:
111             return "ip4"
112         raise ValueError("Invalid _version.")
113
114     ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
115     ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
116
117
118 class VppApiDynamicMethodHolder:
119     pass
120
121
122 class FuncWrapper:
123     def __init__(self, func):
124         self._func = func
125         self.__name__ = func.__name__
126         self.__doc__ = func.__doc__
127
128     def __call__(self, **kwargs):
129         return self._func(**kwargs)
130
131     def __repr__(self):
132         return "<FuncWrapper(func=<%s(%s)>)>" % (self.__name__, self.__doc__)
133
134
135 class VPPApiError(Exception):
136     pass
137
138
139 class VPPNotImplementedError(NotImplementedError):
140     pass
141
142
143 class VPPIOError(IOError):
144     pass
145
146
147 class VPPRuntimeError(RuntimeError):
148     pass
149
150
151 class VPPValueError(ValueError):
152     pass
153
154
155 class VPPApiJSONFiles:
156     @classmethod
157     def find_api_dir(cls, dirs=[]):
158         """Attempt to find the best directory in which API definition
159         files may reside. If the value VPP_API_DIR exists in the environment
160         then it is first on the search list. If we're inside a recognized
161         location in a VPP source tree (src/scripts and src/vpp-api/python)
162         then entries from there to the likely locations in build-root are
163         added. Finally the location used by system packages is added.
164
165         :returns: A single directory name, or None if no such directory
166             could be found.
167         """
168
169         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
170         # in which case, plot a course to likely places in the src tree
171         import __main__ as main
172
173         if os.getenv("VPP_API_DIR"):
174             dirs.append(os.getenv("VPP_API_DIR"))
175
176         if hasattr(main, "__file__"):
177             # get the path of the calling script
178             localdir = os.path.dirname(os.path.realpath(main.__file__))
179         else:
180             # use cwd if there is no calling script
181             localdir = os.getcwd()
182         localdir_s = localdir.split(os.path.sep)
183
184         def dmatch(dir):
185             """Match dir against right-hand components of the script dir"""
186             d = dir.split("/")  # param 'dir' assumes a / separator
187             length = len(d)
188             return len(localdir_s) > length and localdir_s[-length:] == d
189
190         def sdir(srcdir, variant):
191             """Build a path from srcdir to the staged API files of
192             'variant'  (typically '' or '_debug')"""
193             # Since 'core' and 'plugin' files are staged
194             # in separate directories, we target the parent dir.
195             return os.path.sep.join(
196                 (
197                     srcdir,
198                     "build-root",
199                     "install-vpp%s-native" % variant,
200                     "vpp",
201                     "share",
202                     "vpp",
203                     "api",
204                 )
205             )
206
207         srcdir = None
208         if dmatch("src/scripts"):
209             srcdir = os.path.sep.join(localdir_s[:-2])
210         elif dmatch("src/vpp-api/python"):
211             srcdir = os.path.sep.join(localdir_s[:-3])
212         elif dmatch("test"):
213             # we're apparently running tests
214             srcdir = os.path.sep.join(localdir_s[:-1])
215
216         if srcdir:
217             # we're in the source tree, try both the debug and release
218             # variants.
219             dirs.append(sdir(srcdir, "_debug"))
220             dirs.append(sdir(srcdir, ""))
221
222         # Test for staged copies of the scripts
223         # For these, since we explicitly know if we're running a debug versus
224         # release variant, target only the relevant directory
225         if dmatch("build-root/install-vpp_debug-native/vpp/bin"):
226             srcdir = os.path.sep.join(localdir_s[:-4])
227             dirs.append(sdir(srcdir, "_debug"))
228         if dmatch("build-root/install-vpp-native/vpp/bin"):
229             srcdir = os.path.sep.join(localdir_s[:-4])
230             dirs.append(sdir(srcdir, ""))
231
232         # finally, try the location system packages typically install into
233         dirs.append(os.path.sep.join(("", "usr", "share", "vpp", "api")))
234
235         # check the directories for existence; first one wins
236         for dir in dirs:
237             if os.path.isdir(dir):
238                 return dir
239
240         return None
241
242     @classmethod
243     def find_api_files(cls, api_dir=None, patterns="*"):  # -> list
244         """Find API definition files from the given directory tree with the
245         given pattern. If no directory is given then find_api_dir() is used
246         to locate one. If no pattern is given then all definition files found
247         in the directory tree are used.
248
249         :param api_dir: A directory tree in which to locate API definition
250             files; subdirectories are descended into.
251             If this is None then find_api_dir() is called to discover it.
252         :param patterns: A list of patterns to use in each visited directory
253             when looking for files.
254             This can be a list/tuple object or a comma-separated string of
255             patterns. Each value in the list will have leading/trialing
256             whitespace stripped.
257             The pattern specifies the first part of the filename, '.api.json'
258             is appended.
259             The results are de-duplicated, thus overlapping patterns are fine.
260             If this is None it defaults to '*' meaning "all API files".
261         :returns: A list of file paths for the API files found.
262         """
263         if api_dir is None:
264             api_dir = cls.find_api_dir([])
265             if api_dir is None:
266                 raise VPPApiError("api_dir cannot be located")
267
268         if isinstance(patterns, list) or isinstance(patterns, tuple):
269             patterns = [p.strip() + ".api.json" for p in patterns]
270         else:
271             patterns = [p.strip() + ".api.json" for p in patterns.split(",")]
272
273         api_files = []
274         for root, dirnames, files in os.walk(api_dir):
275             # iterate all given patterns and de-dup the result
276             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
277             for filename in files:
278                 api_files.append(os.path.join(root, filename))
279
280         return api_files
281
282     @classmethod
283     def process_json_file(self, apidef_file):
284         api = json.load(apidef_file)
285         return self._process_json(api)
286
287     @classmethod
288     def process_json_str(self, json_str):
289         api = json.loads(json_str)
290         return self._process_json(api)
291
292     @classmethod
293     def process_json_array_str(self, json_str):
294         services = {}
295         messages = {}
296
297         apis = json.loads(json_str)
298         for a in apis:
299             m, s = self._process_json(a)
300             messages.update(m)
301             services.update(s)
302         return messages, services
303
304     @staticmethod
305     def _process_json(api):  # -> Tuple[Dict, Dict]
306         types = {}
307         services = {}
308         messages = {}
309         try:
310             for t in api["enums"]:
311                 t[0] = "vl_api_" + t[0] + "_t"
312                 types[t[0]] = {"type": "enum", "data": t}
313         except KeyError:
314             pass
315         try:
316             for t in api["enumflags"]:
317                 t[0] = "vl_api_" + t[0] + "_t"
318                 types[t[0]] = {"type": "enum", "data": t}
319         except KeyError:
320             pass
321         try:
322             for t in api["unions"]:
323                 t[0] = "vl_api_" + t[0] + "_t"
324                 types[t[0]] = {"type": "union", "data": t}
325         except KeyError:
326             pass
327
328         try:
329             for t in api["types"]:
330                 t[0] = "vl_api_" + t[0] + "_t"
331                 types[t[0]] = {"type": "type", "data": t}
332         except KeyError:
333             pass
334
335         try:
336             for t, v in api["aliases"].items():
337                 types["vl_api_" + t + "_t"] = {"type": "alias", "data": v}
338         except KeyError:
339             pass
340
341         try:
342             services.update(api["services"])
343         except KeyError:
344             pass
345
346         i = 0
347         while True:
348             unresolved = {}
349             for k, v in types.items():
350                 t = v["data"]
351                 if not vpp_get_type(k):
352                     if v["type"] == "enum":
353                         try:
354                             VPPEnumType(t[0], t[1:])
355                         except ValueError:
356                             unresolved[k] = v
357                 if not vpp_get_type(k):
358                     if v["type"] == "enumflag":
359                         try:
360                             VPPEnumFlagType(t[0], t[1:])
361                         except ValueError:
362                             unresolved[k] = v
363                     elif v["type"] == "union":
364                         try:
365                             VPPUnionType(t[0], t[1:])
366                         except ValueError:
367                             unresolved[k] = v
368                     elif v["type"] == "type":
369                         try:
370                             VPPType(t[0], t[1:])
371                         except ValueError:
372                             unresolved[k] = v
373                     elif v["type"] == "alias":
374                         try:
375                             VPPTypeAlias(k, t)
376                         except ValueError:
377                             unresolved[k] = v
378             if len(unresolved) == 0:
379                 break
380             if i > 3:
381                 raise VPPValueError("Unresolved type definitions {}".format(unresolved))
382             types = unresolved
383             i += 1
384         try:
385             for m in api["messages"]:
386                 try:
387                     messages[m[0]] = VPPMessage(m[0], m[1:])
388                 except VPPNotImplementedError:
389                     logger.error("Not implemented error for {}".format(m[0]))
390         except KeyError:
391             pass
392         return messages, services
393
394     @staticmethod
395     def load_api(apifiles=None, apidir=None):
396         messages = {}
397         services = {}
398         if not apifiles:
399             # Pick up API definitions from default directory
400             try:
401                 if isinstance(apidir, list):
402                     apifiles = []
403                     for d in apidir:
404                         apifiles += VPPApiJSONFiles.find_api_files(d)
405                 else:
406                     apifiles = VPPApiJSONFiles.find_api_files(apidir)
407             except (RuntimeError, VPPApiError):
408                 raise VPPRuntimeError
409
410         for file in apifiles:
411             with open(file) as apidef_file:
412                 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
413                 messages.update(m)
414                 services.update(s)
415
416         return apifiles, messages, services
417
418
419 class VPPApiClient:
420     """VPP interface.
421
422     This class provides the APIs to VPP.  The APIs are loaded
423     from provided .api.json files and makes functions accordingly.
424     These functions are documented in the VPP .api files, as they
425     are dynamically created.
426
427     Additionally, VPP can send callback messages; this class
428     provides a means to register a callback function to receive
429     these messages in a background thread.
430     """
431
432     VPPApiError = VPPApiError
433     VPPRuntimeError = VPPRuntimeError
434     VPPValueError = VPPValueError
435     VPPNotImplementedError = VPPNotImplementedError
436     VPPIOError = VPPIOError
437
438     def __init__(
439         self,
440         *,
441         apifiles=None,
442         apidir=None,
443         testmode=False,
444         async_thread=True,
445         logger=None,
446         loglevel=None,
447         read_timeout=5,
448         use_socket=True,
449         server_address="/run/vpp/api.sock",
450         bootstrapapi=False,
451     ):
452         """Create a VPP API object.
453
454         apifiles is a list of files containing API
455         descriptions that will be loaded - methods will be
456         dynamically created reflecting these APIs.  If not
457         provided this will load the API files from VPP's
458         default install location.
459
460         logger, if supplied, is the logging logger object to log to.
461         loglevel, if supplied, is the log level this logger is set
462         to report at (from the loglevels in the logging module).
463         """
464         if logger is None:
465             logger = logging.getLogger(
466                 "{}.{}".format(__name__, self.__class__.__name__)
467             )
468             if loglevel is not None:
469                 logger.setLevel(loglevel)
470         self.logger = logger
471
472         self.messages = {}
473         self.services = {}
474         self.id_names = []
475         self.id_msgdef = []
476         self.header = VPPType("header", [["u16", "msgid"], ["u32", "client_index"]])
477         self.apifiles = []
478         self.apidir = apidir
479         self.event_callback = None
480         self.message_queue = queue.Queue()
481         self.read_timeout = read_timeout
482         self.async_thread = async_thread
483         self.event_thread = None
484         self.testmode = testmode
485         self.server_address = server_address
486         self._apifiles = apifiles
487         self.stats = {}
488         self.bootstrapapi = bootstrapapi
489
490         if not bootstrapapi:
491             if self.apidir is None and hasattr(self.__class__, "apidir"):
492                 # Keep supporting the old style of providing apidir.
493                 self.apidir = self.__class__.apidir
494             try:
495                 self.apifiles, self.messages, self.services = VPPApiJSONFiles.load_api(
496                     apifiles, self.apidir
497                 )
498             except VPPRuntimeError as e:
499                 if testmode:
500                     self.apifiles = []
501                 else:
502                     raise e
503         else:
504             # Bootstrap the API (memclnt.api bundled with VPP PAPI)
505             resource_path = "/".join(("data", "memclnt.api.json"))
506             file_content = pkg_resources.resource_string(__name__, resource_path)
507             self.messages, self.services = VPPApiJSONFiles.process_json_str(
508                 file_content
509             )
510
511         # Basic sanity check
512         if len(self.messages) == 0 and not testmode:
513             raise VPPValueError(1, "Missing JSON message definitions")
514         if not bootstrapapi:
515             if not (verify_enum_hint(VppEnum.vl_api_address_family_t)):
516                 raise VPPRuntimeError(
517                     "Invalid address family hints. " "Cannot continue."
518                 )
519
520         self.transport = VppTransport(
521             self, read_timeout=read_timeout, server_address=server_address
522         )
523         # Make sure we allow VPP to clean up the message rings.
524         atexit.register(vpp_atexit, weakref.ref(self))
525
526         add_convenience_methods()
527
528     def get_function(self, name):
529         return getattr(self._api, name)
530
531     class ContextId:
532         """Multiprocessing-safe provider of unique context IDs."""
533
534         def __init__(self):
535             self.context = mp.Value(ctypes.c_uint, 0)
536             self.lock = mp.Lock()
537
538         def __call__(self):
539             """Get a new unique (or, at least, not recently used) context."""
540             with self.lock:
541                 self.context.value += 1
542                 return self.context.value
543
544     get_context = ContextId()
545
546     def get_type(self, name):
547         return vpp_get_type(name)
548
549     @property
550     def api(self):
551         if not hasattr(self, "_api"):
552             raise VPPApiError("Not connected, api definitions not available")
553         return self._api
554
555     def make_function(self, msg, i, multipart, do_async):
556         if do_async:
557
558             def f(**kwargs):
559                 return self._call_vpp_async(i, msg, **kwargs)
560
561         else:
562
563             def f(**kwargs):
564                 return self._call_vpp(i, msg, multipart, **kwargs)
565
566         f.__name__ = str(msg.name)
567         f.__doc__ = ", ".join(
568             ["%s %s" % (msg.fieldtypes[j], k) for j, k in enumerate(msg.fields)]
569         )
570         f.msg = msg
571
572         return f
573
574     def make_pack_function(self, msg, i, multipart):
575         def f(**kwargs):
576             return self._call_vpp_pack(i, msg, **kwargs)
577
578         f.msg = msg
579         return f
580
581     def _register_functions(self, do_async=False):
582         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
583         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
584         self._api = VppApiDynamicMethodHolder()
585         for name, msg in self.messages.items():
586             n = name + "_" + msg.crc[2:]
587             i = self.transport.get_msg_index(n)
588             if i > 0:
589                 self.id_msgdef[i] = msg
590                 self.id_names[i] = name
591
592                 # Create function for client side messages.
593                 if name in self.services:
594                     f = self.make_function(msg, i, self.services[name], do_async)
595                     f_pack = self.make_pack_function(msg, i, self.services[name])
596                     setattr(self._api, name, FuncWrapper(f))
597                     setattr(self._api, name + "_pack", FuncWrapper(f_pack))
598             else:
599                 self.logger.debug("No such message type or failed CRC checksum: %s", n)
600
601     def get_api_definitions(self):
602         """get_api_definition. Bootstrap from the embedded memclnt.api.json file."""
603
604         # Bootstrap so we can call the get_api_json function
605         self._register_functions(do_async=False)
606
607         r = self.api.get_api_json()
608         if r.retval != 0:
609             raise VPPApiError("Failed to load API definitions from VPP")
610
611         # Process JSON
612         m, s = VPPApiJSONFiles.process_json_array_str(r.json)
613         self.messages.update(m)
614         self.services.update(s)
615
616     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, do_async):
617         pfx = chroot_prefix.encode("utf-8") if chroot_prefix else None
618
619         rv = self.transport.connect(name, pfx, msg_handler, rx_qlen, do_async)
620         if rv != 0:
621             raise VPPIOError(2, "Connect failed")
622         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
623
624         # Register functions
625         if self.bootstrapapi:
626             self.get_api_definitions()
627         self._register_functions(do_async=do_async)
628
629         # Initialise control ping
630         crc = self.messages["control_ping"].crc
631         self.control_ping_index = self.transport.get_msg_index(
632             ("control_ping" + "_" + crc[2:])
633         )
634         self.control_ping_msgdef = self.messages["control_ping"]
635
636         if self.async_thread:
637             self.event_thread = threading.Thread(target=self.thread_msg_handler)
638             self.event_thread.daemon = True
639             self.event_thread.start()
640         else:
641             self.event_thread = None
642         return rv
643
644     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
645         """Attach to VPP.
646
647         name - the name of the client.
648         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
649         do_async - if true, messages are sent without waiting for a reply
650         rx_qlen - the length of the VPP message receive queue between
651         client and server.
652         """
653         msg_handler = self.transport.get_callback(do_async)
654         return self.connect_internal(
655             name, msg_handler, chroot_prefix, rx_qlen, do_async
656         )
657
658     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
659         """Attach to VPP in synchronous mode. Application must poll for events.
660
661         name - the name of the client.
662         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
663         rx_qlen - the length of the VPP message receive queue between
664         client and server.
665         """
666
667         return self.connect_internal(name, None, chroot_prefix, rx_qlen, do_async=False)
668
669     def disconnect(self):
670         """Detach from VPP."""
671         rv = self.transport.disconnect()
672         if self.event_thread is not None:
673             self.message_queue.put("terminate event thread")
674         return rv
675
676     def msg_handler_sync(self, msg):
677         """Process an incoming message from VPP in sync mode.
678
679         The message may be a reply or it may be an async notification.
680         """
681         r = self.decode_incoming_msg(msg)
682         if r is None:
683             return
684
685         # If we have a context, then use the context to find any
686         # request waiting for a reply
687         context = 0
688         if hasattr(r, "context") and r.context > 0:
689             context = r.context
690
691         if context == 0:
692             # No context -> async notification that we feed to the callback
693             self.message_queue.put_nowait(r)
694         else:
695             raise VPPIOError(2, "RPC reply message received in event handler")
696
697     def has_context(self, msg):
698         if len(msg) < 10:
699             return False
700
701         header = VPPType(
702             "header_with_context",
703             [["u16", "msgid"], ["u32", "client_index"], ["u32", "context"]],
704         )
705
706         (i, ci, context), size = header.unpack(msg, 0)
707
708         if self.id_names[i] == "rx_thread_exit":
709             return
710
711         #
712         # Decode message and returns a tuple.
713         #
714         msgobj = self.id_msgdef[i]
715         if "context" in msgobj.field_by_name and context >= 0:
716             return True
717         return False
718
719     def decode_incoming_msg(self, msg, no_type_conversion=False):
720         if not msg:
721             logger.warning("vpp_api.read failed")
722             return
723
724         (i, ci), size = self.header.unpack(msg, 0)
725         if self.id_names[i] == "rx_thread_exit":
726             return
727
728         #
729         # Decode message and returns a tuple.
730         #
731         msgobj = self.id_msgdef[i]
732         if not msgobj:
733             raise VPPIOError(2, "Reply message undefined")
734
735         r, size = msgobj.unpack(msg, ntc=no_type_conversion)
736         return r
737
738     def msg_handler_async(self, msg):
739         """Process a message from VPP in async mode.
740
741         In async mode, all messages are returned to the callback.
742         """
743         r = self.decode_incoming_msg(msg)
744         if r is None:
745             return
746
747         msgname = type(r).__name__
748
749         if self.event_callback:
750             self.event_callback(msgname, r)
751
752     def _control_ping(self, context):
753         """Send a ping command."""
754         self._call_vpp_async(
755             self.control_ping_index, self.control_ping_msgdef, context=context
756         )
757
758     def validate_args(self, msg, kwargs):
759         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
760         if d:
761             raise VPPValueError("Invalid argument {} to {}".format(list(d), msg.name))
762
763     def _add_stat(self, name, ms):
764         if not name in self.stats:
765             self.stats[name] = {"max": ms, "count": 1, "avg": ms}
766         else:
767             if ms > self.stats[name]["max"]:
768                 self.stats[name]["max"] = ms
769             self.stats[name]["count"] += 1
770             n = self.stats[name]["count"]
771             self.stats[name]["avg"] = self.stats[name]["avg"] * (n - 1) / n + ms / n
772
773     def get_stats(self):
774         s = "\n=== API PAPI STATISTICS ===\n"
775         s += "{:<30} {:>4} {:>6} {:>6}\n".format("message", "cnt", "avg", "max")
776         for n in sorted(self.stats.items(), key=lambda v: v[1]["avg"], reverse=True):
777             s += "{:<30} {:>4} {:>6.2f} {:>6.2f}\n".format(
778                 n[0], n[1]["count"], n[1]["avg"], n[1]["max"]
779             )
780         return s
781
782     def get_field_options(self, msg, fld_name):
783         # when there is an option, the msgdef has 3 elements.
784         # ['u32', 'ring_size', {'default': 1024}]
785         for _def in self.messages[msg].msgdef:
786             if isinstance(_def, list) and len(_def) == 3 and _def[1] == fld_name:
787                 return _def[2]
788
789     def _call_vpp(self, i, msgdef, service, **kwargs):
790         """Given a message, send the message and await a reply.
791
792         msgdef - the message packing definition
793         i - the message type index
794         multipart - True if the message returns multiple
795         messages in return.
796         context - context number - chosen at random if not
797         supplied.
798         The remainder of the kwargs are the arguments to the API call.
799
800         The return value is the message or message array containing
801         the response.  It will raise an IOError exception if there was
802         no response within the timeout window.
803         """
804         ts = time.time()
805         if "context" not in kwargs:
806             context = self.get_context()
807             kwargs["context"] = context
808         else:
809             context = kwargs["context"]
810         kwargs["_vl_msg_id"] = i
811
812         no_type_conversion = kwargs.pop("_no_type_conversion", False)
813         timeout = kwargs.pop("_timeout", None)
814
815         try:
816             if self.transport.socket_index:
817                 kwargs["client_index"] = self.transport.socket_index
818         except AttributeError:
819             pass
820         self.validate_args(msgdef, kwargs)
821
822         s = "Calling {}({})".format(
823             msgdef.name, ",".join(["{!r}:{!r}".format(k, v) for k, v in kwargs.items()])
824         )
825         self.logger.debug(s)
826
827         b = msgdef.pack(kwargs)
828         self.transport.suspend()
829
830         self.transport.write(b)
831
832         msgreply = service["reply"]
833         stream = True if "stream" in service else False
834         if stream:
835             if "stream_msg" in service:
836                 # New service['reply'] = _reply and service['stream_message'] = _details
837                 stream_message = service["stream_msg"]
838                 modern = True
839             else:
840                 # Old  service['reply'] = _details
841                 stream_message = msgreply
842                 msgreply = "control_ping_reply"
843                 modern = False
844                 # Send a ping after the request - we use its response
845                 # to detect that we have seen all results.
846                 self._control_ping(context)
847
848         # Block until we get a reply.
849         rl = []
850         while True:
851             r = self.read_blocking(no_type_conversion, timeout)
852             if r is None:
853                 raise VPPIOError(2, "VPP API client: read failed")
854             msgname = type(r).__name__
855             if context not in r or r.context == 0 or context != r.context:
856                 # Message being queued
857                 self.message_queue.put_nowait(r)
858                 continue
859             if msgname != msgreply and (stream and (msgname != stream_message)):
860                 print("REPLY MISMATCH", msgreply, msgname, stream_message, stream)
861             if not stream:
862                 rl = r
863                 break
864             if msgname == msgreply:
865                 if modern:  # Return both reply and list
866                     rl = r, rl
867                 break
868
869             rl.append(r)
870
871         self.transport.resume()
872
873         s = "Return value: {!r}".format(r)
874         if len(s) > 80:
875             s = s[:80] + "..."
876         self.logger.debug(s)
877         te = time.time()
878         self._add_stat(msgdef.name, (te - ts) * 1000)
879         return rl
880
881     def _call_vpp_async(self, i, msg, **kwargs):
882         """Given a message, send the message and return the context.
883
884         msgdef - the message packing definition
885         i - the message type index
886         context - context number - chosen at random if not
887         supplied.
888         The remainder of the kwargs are the arguments to the API call.
889
890         The reply message(s) will be delivered later to the registered callback.
891         The returned context will help with assigning which call
892         the reply belongs to.
893         """
894         if "context" not in kwargs:
895             context = self.get_context()
896             kwargs["context"] = context
897         else:
898             context = kwargs["context"]
899         try:
900             if self.transport.socket_index:
901                 kwargs["client_index"] = self.transport.socket_index
902         except AttributeError:
903             kwargs["client_index"] = 0
904         kwargs["_vl_msg_id"] = i
905         b = msg.pack(kwargs)
906
907         self.transport.write(b)
908         return context
909
910     def _call_vpp_pack(self, i, msg, **kwargs):
911         """Given a message, return the binary representation."""
912         kwargs["_vl_msg_id"] = i
913         kwargs["client_index"] = 0
914         kwargs["context"] = 0
915         return msg.pack(kwargs)
916
917     def read_blocking(self, no_type_conversion=False, timeout=None):
918         """Get next received message from transport within timeout, decoded.
919
920         Note that notifications have context zero
921         and are not put into receive queue (at least for socket transport),
922         use async_thread with registered callback for processing them.
923
924         If no message appears in the queue within timeout, return None.
925
926         Optionally, type conversion can be skipped,
927         as some of conversions are into less precise types.
928
929         When r is the return value of this, the caller can get message name as:
930             msgname = type(r).__name__
931         and context number (type long) as:
932             context = r.context
933
934         :param no_type_conversion: If false, type conversions are applied.
935         :type no_type_conversion: bool
936         :returns: Decoded message, or None if no message (within timeout).
937         :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
938         :raises VppTransportShmemIOError if timed out.
939         """
940         msg = self.transport.read(timeout=timeout)
941         if not msg:
942             return None
943         return self.decode_incoming_msg(msg, no_type_conversion)
944
945     def register_event_callback(self, callback):
946         """Register a callback for async messages.
947
948         This will be called for async notifications in sync mode,
949         and all messages in async mode.  In sync mode, replies to
950         requests will not come here.
951
952         callback is a fn(msg_type_name, msg_type) that will be
953         called when a message comes in.  While this function is
954         executing, note that (a) you are in a background thread and
955         may wish to use threading.Lock to protect your datastructures,
956         and (b) message processing from VPP will stop (so if you take
957         a long while about it you may provoke reply timeouts or cause
958         VPP to fill the RX buffer).  Passing None will disable the
959         callback.
960         """
961         self.event_callback = callback
962
963     def thread_msg_handler(self):
964         """Python thread calling the user registered message handler.
965
966         This is to emulate the old style event callback scheme. Modern
967         clients should provide their own thread to poll the event
968         queue.
969         """
970         while True:
971             r = self.message_queue.get()
972             if r == "terminate event thread":
973                 break
974             msgname = type(r).__name__
975             if self.event_callback:
976                 self.event_callback(msgname, r)
977
978     def validate_message_table(self, namecrctable):
979         """Take a dictionary of name_crc message names
980         and returns an array of missing messages"""
981
982         missing_table = []
983         for name_crc in namecrctable:
984             i = self.transport.get_msg_index(name_crc)
985             if i <= 0:
986                 missing_table.append(name_crc)
987         return missing_table
988
989     def dump_message_table(self):
990         """Return VPPs API message table as name_crc dictionary"""
991         return self.transport.message_table
992
993     def dump_message_table_filtered(self, msglist):
994         """Return VPPs API message table as name_crc dictionary,
995         filtered by message name list."""
996
997         replies = [self.services[n]["reply"] for n in msglist]
998         message_table_filtered = {}
999         for name in msglist + replies:
1000             for k, v in self.transport.message_table.items():
1001                 if k.startswith(name):
1002                     message_table_filtered[k] = v
1003                     break
1004         return message_table_filtered
1005
1006     def __repr__(self):
1007         return (
1008             "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, "
1009             "logger=%s, read_timeout=%s, "
1010             "server_address='%s'>"
1011             % (
1012                 self._apifiles,
1013                 self.testmode,
1014                 self.async_thread,
1015                 self.logger,
1016                 self.read_timeout,
1017                 self.server_address,
1018             )
1019         )
1020
1021     def details_iter(self, f, **kwargs):
1022         cursor = 0
1023         while True:
1024             kwargs["cursor"] = cursor
1025             rv, details = f(**kwargs)
1026             for d in details:
1027                 yield d
1028             if rv.retval == 0 or rv.retval != -165:
1029                 break
1030             cursor = rv.cursor