5c089647e597a9f8bd90e923bfef6ac9026ce5ec
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import ctypes
20 import ipaddress
21 import sys
22 import multiprocessing as mp
23 import os
24 import queue
25 import logging
26 import functools
27 import json
28 import threading
29 import fnmatch
30 import weakref
31 import atexit
32 import time
33 from .vpp_format import verify_enum_hint
34 from .vpp_serializer import VPPType, VPPEnumType, VPPEnumFlagType, VPPUnionType
35 from .vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
36
37 try:
38     import VppTransport
39 except ModuleNotFoundError:
40
41     class V:
42         """placeholder for VppTransport as the implementation is dependent on
43         VPPAPIClient's initialization values
44         """
45
46     VppTransport = V
47
48 from .vpp_transport_socket import VppTransport
49
50 logger = logging.getLogger("vpp_papi")
51 logger.addHandler(logging.NullHandler())
52
53 __all__ = (
54     "FuncWrapper",
55     "VppApiDynamicMethodHolder",
56     "VppEnum",
57     "VppEnumType",
58     "VppEnumFlag",
59     "VPPIOError",
60     "VPPRuntimeError",
61     "VPPValueError",
62     "VPPApiClient",
63 )
64
65
66 def metaclass(metaclass):
67     @functools.wraps(metaclass)
68     def wrapper(cls):
69         return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
70
71     return wrapper
72
73
74 class VppEnumType(type):
75     def __getattr__(cls, name):
76         t = vpp_get_type(name)
77         return t.enum
78
79
80 @metaclass(VppEnumType)
81 class VppEnum:
82     pass
83
84
85 @metaclass(VppEnumType)
86 class VppEnumFlag:
87     pass
88
89
90 def vpp_atexit(vpp_weakref):
91     """Clean up VPP connection on shutdown."""
92     vpp_instance = vpp_weakref()
93     if vpp_instance and vpp_instance.transport.connected:
94         logger.debug("Cleaning up VPP on exit")
95         vpp_instance.disconnect()
96
97
98 def add_convenience_methods():
99     # provide convenience methods to IP[46]Address.vapi_af
100     def _vapi_af(self):
101         if 6 == self._version:
102             return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
103         if 4 == self._version:
104             return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
105         raise ValueError("Invalid _version.")
106
107     def _vapi_af_name(self):
108         if 6 == self._version:
109             return "ip6"
110         if 4 == self._version:
111             return "ip4"
112         raise ValueError("Invalid _version.")
113
114     ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
115     ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
116
117
118 class VppApiDynamicMethodHolder:
119     pass
120
121
122 class FuncWrapper:
123     def __init__(self, func):
124         self._func = func
125         self.__name__ = func.__name__
126         self.__doc__ = func.__doc__
127
128     def __call__(self, **kwargs):
129         return self._func(**kwargs)
130
131     def __repr__(self):
132         return "<FuncWrapper(func=<%s(%s)>)>" % (self.__name__, self.__doc__)
133
134
135 class VPPApiError(Exception):
136     pass
137
138
139 class VPPNotImplementedError(NotImplementedError):
140     pass
141
142
143 class VPPIOError(IOError):
144     pass
145
146
147 class VPPRuntimeError(RuntimeError):
148     pass
149
150
151 class VPPValueError(ValueError):
152     pass
153
154
155 class VPPApiJSONFiles:
156     @classmethod
157     def find_api_dir(cls, dirs=[]):
158         """Attempt to find the best directory in which API definition
159         files may reside. If the value VPP_API_DIR exists in the environment
160         then it is first on the search list. If we're inside a recognized
161         location in a VPP source tree (src/scripts and src/vpp-api/python)
162         then entries from there to the likely locations in build-root are
163         added. Finally the location used by system packages is added.
164
165         :returns: A single directory name, or None if no such directory
166             could be found.
167         """
168
169         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
170         # in which case, plot a course to likely places in the src tree
171         import __main__ as main
172
173         if os.getenv("VPP_API_DIR"):
174             dirs.append(os.getenv("VPP_API_DIR"))
175
176         if hasattr(main, "__file__"):
177             # get the path of the calling script
178             localdir = os.path.dirname(os.path.realpath(main.__file__))
179         else:
180             # use cwd if there is no calling script
181             localdir = os.getcwd()
182         localdir_s = localdir.split(os.path.sep)
183
184         def dmatch(dir):
185             """Match dir against right-hand components of the script dir"""
186             d = dir.split("/")  # param 'dir' assumes a / separator
187             length = len(d)
188             return len(localdir_s) > length and localdir_s[-length:] == d
189
190         def sdir(srcdir, variant):
191             """Build a path from srcdir to the staged API files of
192             'variant'  (typically '' or '_debug')"""
193             # Since 'core' and 'plugin' files are staged
194             # in separate directories, we target the parent dir.
195             return os.path.sep.join(
196                 (
197                     srcdir,
198                     "build-root",
199                     "install-vpp%s-native" % variant,
200                     "vpp",
201                     "share",
202                     "vpp",
203                     "api",
204                 )
205             )
206
207         srcdir = None
208         if dmatch("src/scripts"):
209             srcdir = os.path.sep.join(localdir_s[:-2])
210         elif dmatch("src/vpp-api/python"):
211             srcdir = os.path.sep.join(localdir_s[:-3])
212         elif dmatch("test"):
213             # we're apparently running tests
214             srcdir = os.path.sep.join(localdir_s[:-1])
215
216         if srcdir:
217             # we're in the source tree, try both the debug and release
218             # variants.
219             dirs.append(sdir(srcdir, "_debug"))
220             dirs.append(sdir(srcdir, ""))
221
222         # Test for staged copies of the scripts
223         # For these, since we explicitly know if we're running a debug versus
224         # release variant, target only the relevant directory
225         if dmatch("build-root/install-vpp_debug-native/vpp/bin"):
226             srcdir = os.path.sep.join(localdir_s[:-4])
227             dirs.append(sdir(srcdir, "_debug"))
228         if dmatch("build-root/install-vpp-native/vpp/bin"):
229             srcdir = os.path.sep.join(localdir_s[:-4])
230             dirs.append(sdir(srcdir, ""))
231
232         # finally, try the location system packages typically install into
233         dirs.append(os.path.sep.join(("", "usr", "share", "vpp", "api")))
234
235         # check the directories for existence; first one wins
236         for dir in dirs:
237             if os.path.isdir(dir):
238                 return dir
239
240         return None
241
242     @classmethod
243     def find_api_files(cls, api_dir=None, patterns="*"):  # -> list
244         """Find API definition files from the given directory tree with the
245         given pattern. If no directory is given then find_api_dir() is used
246         to locate one. If no pattern is given then all definition files found
247         in the directory tree are used.
248
249         :param api_dir: A directory tree in which to locate API definition
250             files; subdirectories are descended into.
251             If this is None then find_api_dir() is called to discover it.
252         :param patterns: A list of patterns to use in each visited directory
253             when looking for files.
254             This can be a list/tuple object or a comma-separated string of
255             patterns. Each value in the list will have leading/trialing
256             whitespace stripped.
257             The pattern specifies the first part of the filename, '.api.json'
258             is appended.
259             The results are de-duplicated, thus overlapping patterns are fine.
260             If this is None it defaults to '*' meaning "all API files".
261         :returns: A list of file paths for the API files found.
262         """
263         if api_dir is None:
264             api_dir = cls.find_api_dir([])
265             if api_dir is None:
266                 raise VPPApiError("api_dir cannot be located")
267
268         if isinstance(patterns, list) or isinstance(patterns, tuple):
269             patterns = [p.strip() + ".api.json" for p in patterns]
270         else:
271             patterns = [p.strip() + ".api.json" for p in patterns.split(",")]
272
273         api_files = []
274         for root, dirnames, files in os.walk(api_dir):
275             # iterate all given patterns and de-dup the result
276             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
277             for filename in files:
278                 api_files.append(os.path.join(root, filename))
279
280         return api_files
281
282     @classmethod
283     def process_json_file(self, apidef_file):
284         return self._process_json(apidef_file.read())
285
286     @classmethod
287     def process_json_str(self, json_str):
288         return self._process_json(json_str)
289
290     @staticmethod
291     def _process_json(json_str):  # -> Tuple[Dict, Dict]
292         api = json.loads(json_str)
293         types = {}
294         services = {}
295         messages = {}
296         try:
297             for t in api["enums"]:
298                 t[0] = "vl_api_" + t[0] + "_t"
299                 types[t[0]] = {"type": "enum", "data": t}
300         except KeyError:
301             pass
302         try:
303             for t in api["enumflags"]:
304                 t[0] = "vl_api_" + t[0] + "_t"
305                 types[t[0]] = {"type": "enum", "data": t}
306         except KeyError:
307             pass
308         try:
309             for t in api["unions"]:
310                 t[0] = "vl_api_" + t[0] + "_t"
311                 types[t[0]] = {"type": "union", "data": t}
312         except KeyError:
313             pass
314
315         try:
316             for t in api["types"]:
317                 t[0] = "vl_api_" + t[0] + "_t"
318                 types[t[0]] = {"type": "type", "data": t}
319         except KeyError:
320             pass
321
322         try:
323             for t, v in api["aliases"].items():
324                 types["vl_api_" + t + "_t"] = {"type": "alias", "data": v}
325         except KeyError:
326             pass
327
328         try:
329             services.update(api["services"])
330         except KeyError:
331             pass
332
333         i = 0
334         while True:
335             unresolved = {}
336             for k, v in types.items():
337                 t = v["data"]
338                 if not vpp_get_type(k):
339                     if v["type"] == "enum":
340                         try:
341                             VPPEnumType(t[0], t[1:])
342                         except ValueError:
343                             unresolved[k] = v
344                 if not vpp_get_type(k):
345                     if v["type"] == "enumflag":
346                         try:
347                             VPPEnumFlagType(t[0], t[1:])
348                         except ValueError:
349                             unresolved[k] = v
350                     elif v["type"] == "union":
351                         try:
352                             VPPUnionType(t[0], t[1:])
353                         except ValueError:
354                             unresolved[k] = v
355                     elif v["type"] == "type":
356                         try:
357                             VPPType(t[0], t[1:])
358                         except ValueError:
359                             unresolved[k] = v
360                     elif v["type"] == "alias":
361                         try:
362                             VPPTypeAlias(k, t)
363                         except ValueError:
364                             unresolved[k] = v
365             if len(unresolved) == 0:
366                 break
367             if i > 3:
368                 raise VPPValueError("Unresolved type definitions {}".format(unresolved))
369             types = unresolved
370             i += 1
371         try:
372             for m in api["messages"]:
373                 try:
374                     messages[m[0]] = VPPMessage(m[0], m[1:])
375                 except VPPNotImplementedError:
376                     ### OLE FIXME
377                     logger.error("Not implemented error for {}".format(m[0]))
378         except KeyError:
379             pass
380         return messages, services
381
382     @staticmethod
383     def load_api(apifiles=None, apidir=None):
384         messages = {}
385         services = {}
386         if not apifiles:
387             # Pick up API definitions from default directory
388             try:
389                 if isinstance(apidir, list):
390                     apifiles = []
391                     for d in apidir:
392                         apifiles += VPPApiJSONFiles.find_api_files(d)
393                 else:
394                     apifiles = VPPApiJSONFiles.find_api_files(apidir)
395             except (RuntimeError, VPPApiError):
396                 raise VPPRuntimeError
397
398         for file in apifiles:
399             with open(file) as apidef_file:
400                 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
401                 messages.update(m)
402                 services.update(s)
403
404         return apifiles, messages, services
405
406
407 class VPPApiClient:
408     """VPP interface.
409
410     This class provides the APIs to VPP.  The APIs are loaded
411     from provided .api.json files and makes functions accordingly.
412     These functions are documented in the VPP .api files, as they
413     are dynamically created.
414
415     Additionally, VPP can send callback messages; this class
416     provides a means to register a callback function to receive
417     these messages in a background thread.
418     """
419
420     VPPApiError = VPPApiError
421     VPPRuntimeError = VPPRuntimeError
422     VPPValueError = VPPValueError
423     VPPNotImplementedError = VPPNotImplementedError
424     VPPIOError = VPPIOError
425
426     def __init__(
427         self,
428         *,
429         apifiles=None,
430         apidir=None,
431         testmode=False,
432         async_thread=True,
433         logger=None,
434         loglevel=None,
435         read_timeout=5,
436         use_socket=True,
437         server_address="/run/vpp/api.sock",
438     ):
439         """Create a VPP API object.
440
441         apifiles is a list of files containing API
442         descriptions that will be loaded - methods will be
443         dynamically created reflecting these APIs.  If not
444         provided this will load the API files from VPP's
445         default install location.
446
447         logger, if supplied, is the logging logger object to log to.
448         loglevel, if supplied, is the log level this logger is set
449         to report at (from the loglevels in the logging module).
450         """
451         if logger is None:
452             logger = logging.getLogger(
453                 "{}.{}".format(__name__, self.__class__.__name__)
454             )
455             if loglevel is not None:
456                 logger.setLevel(loglevel)
457         self.logger = logger
458
459         self.messages = {}
460         self.services = {}
461         self.id_names = []
462         self.id_msgdef = []
463         self.header = VPPType("header", [["u16", "msgid"], ["u32", "client_index"]])
464         self.apifiles = []
465         self.apidir = apidir
466         self.event_callback = None
467         self.message_queue = queue.Queue()
468         self.read_timeout = read_timeout
469         self.async_thread = async_thread
470         self.event_thread = None
471         self.testmode = testmode
472         self.server_address = server_address
473         self._apifiles = apifiles
474         self.stats = {}
475
476         try:
477             self.apifiles, self.messages, self.services = VPPApiJSONFiles.load_api(
478                 apifiles, apidir
479             )
480         except VPPRuntimeError as e:
481             if testmode:
482                 self.apifiles = []
483             else:
484                 raise e
485
486         # Basic sanity check
487         if len(self.messages) == 0 and not testmode:
488             raise VPPValueError(1, "Missing JSON message definitions")
489         if not (verify_enum_hint(VppEnum.vl_api_address_family_t)):
490             raise VPPRuntimeError("Invalid address family hints. " "Cannot continue.")
491
492         self.transport = VppTransport(
493             self, read_timeout=read_timeout, server_address=server_address
494         )
495         # Make sure we allow VPP to clean up the message rings.
496         atexit.register(vpp_atexit, weakref.ref(self))
497
498         add_convenience_methods()
499
500     def get_function(self, name):
501         return getattr(self._api, name)
502
503     class ContextId:
504         """Multiprocessing-safe provider of unique context IDs."""
505
506         def __init__(self):
507             self.context = mp.Value(ctypes.c_uint, 0)
508             self.lock = mp.Lock()
509
510         def __call__(self):
511             """Get a new unique (or, at least, not recently used) context."""
512             with self.lock:
513                 self.context.value += 1
514                 return self.context.value
515
516     get_context = ContextId()
517
518     def get_type(self, name):
519         return vpp_get_type(name)
520
521     @property
522     def api(self):
523         if not hasattr(self, "_api"):
524             raise VPPApiError("Not connected, api definitions not available")
525         return self._api
526
527     def make_function(self, msg, i, multipart, do_async):
528         if do_async:
529
530             def f(**kwargs):
531                 return self._call_vpp_async(i, msg, **kwargs)
532
533         else:
534
535             def f(**kwargs):
536                 return self._call_vpp(i, msg, multipart, **kwargs)
537
538         f.__name__ = str(msg.name)
539         f.__doc__ = ", ".join(
540             ["%s %s" % (msg.fieldtypes[j], k) for j, k in enumerate(msg.fields)]
541         )
542         f.msg = msg
543
544         return f
545
546     def make_pack_function(self, msg, i, multipart):
547         def f(**kwargs):
548             return self._call_vpp_pack(i, msg, **kwargs)
549
550         f.msg = msg
551         return f
552
553     def _register_functions(self, do_async=False):
554         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
555         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
556         self._api = VppApiDynamicMethodHolder()
557         for name, msg in self.messages.items():
558             n = name + "_" + msg.crc[2:]
559             i = self.transport.get_msg_index(n)
560             if i > 0:
561                 self.id_msgdef[i] = msg
562                 self.id_names[i] = name
563
564                 # Create function for client side messages.
565                 if name in self.services:
566                     f = self.make_function(msg, i, self.services[name], do_async)
567                     f_pack = self.make_pack_function(msg, i, self.services[name])
568                     setattr(self._api, name, FuncWrapper(f))
569                     setattr(self._api, name + "_pack", FuncWrapper(f_pack))
570             else:
571                 self.logger.debug("No such message type or failed CRC checksum: %s", n)
572
573     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, do_async):
574         pfx = chroot_prefix.encode("utf-8") if chroot_prefix else None
575
576         rv = self.transport.connect(name, pfx, msg_handler, rx_qlen, do_async)
577         if rv != 0:
578             raise VPPIOError(2, "Connect failed")
579         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
580         self._register_functions(do_async=do_async)
581
582         # Initialise control ping
583         crc = self.messages["control_ping"].crc
584         self.control_ping_index = self.transport.get_msg_index(
585             ("control_ping" + "_" + crc[2:])
586         )
587         self.control_ping_msgdef = self.messages["control_ping"]
588         if self.async_thread:
589             self.event_thread = threading.Thread(target=self.thread_msg_handler)
590             self.event_thread.daemon = True
591             self.event_thread.start()
592         else:
593             self.event_thread = None
594         return rv
595
596     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
597         """Attach to VPP.
598
599         name - the name of the client.
600         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
601         do_async - if true, messages are sent without waiting for a reply
602         rx_qlen - the length of the VPP message receive queue between
603         client and server.
604         """
605         msg_handler = self.transport.get_callback(do_async)
606         return self.connect_internal(
607             name, msg_handler, chroot_prefix, rx_qlen, do_async
608         )
609
610     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
611         """Attach to VPP in synchronous mode. Application must poll for events.
612
613         name - the name of the client.
614         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
615         rx_qlen - the length of the VPP message receive queue between
616         client and server.
617         """
618
619         return self.connect_internal(name, None, chroot_prefix, rx_qlen, do_async=False)
620
621     def disconnect(self):
622         """Detach from VPP."""
623         rv = self.transport.disconnect()
624         if self.event_thread is not None:
625             self.message_queue.put("terminate event thread")
626         return rv
627
628     def msg_handler_sync(self, msg):
629         """Process an incoming message from VPP in sync mode.
630
631         The message may be a reply or it may be an async notification.
632         """
633         r = self.decode_incoming_msg(msg)
634         if r is None:
635             return
636
637         # If we have a context, then use the context to find any
638         # request waiting for a reply
639         context = 0
640         if hasattr(r, "context") and r.context > 0:
641             context = r.context
642
643         if context == 0:
644             # No context -> async notification that we feed to the callback
645             self.message_queue.put_nowait(r)
646         else:
647             raise VPPIOError(2, "RPC reply message received in event handler")
648
649     def has_context(self, msg):
650         if len(msg) < 10:
651             return False
652
653         header = VPPType(
654             "header_with_context",
655             [["u16", "msgid"], ["u32", "client_index"], ["u32", "context"]],
656         )
657
658         (i, ci, context), size = header.unpack(msg, 0)
659         if self.id_names[i] == "rx_thread_exit":
660             return
661
662         #
663         # Decode message and returns a tuple.
664         #
665         msgobj = self.id_msgdef[i]
666         if "context" in msgobj.field_by_name and context >= 0:
667             return True
668         return False
669
670     def decode_incoming_msg(self, msg, no_type_conversion=False):
671         if not msg:
672             logger.warning("vpp_api.read failed")
673             return
674
675         (i, ci), size = self.header.unpack(msg, 0)
676         if self.id_names[i] == "rx_thread_exit":
677             return
678
679         #
680         # Decode message and returns a tuple.
681         #
682         msgobj = self.id_msgdef[i]
683         if not msgobj:
684             raise VPPIOError(2, "Reply message undefined")
685
686         r, size = msgobj.unpack(msg, ntc=no_type_conversion)
687         return r
688
689     def msg_handler_async(self, msg):
690         """Process a message from VPP in async mode.
691
692         In async mode, all messages are returned to the callback.
693         """
694         r = self.decode_incoming_msg(msg)
695         if r is None:
696             return
697
698         msgname = type(r).__name__
699
700         if self.event_callback:
701             self.event_callback(msgname, r)
702
703     def _control_ping(self, context):
704         """Send a ping command."""
705         self._call_vpp_async(
706             self.control_ping_index, self.control_ping_msgdef, context=context
707         )
708
709     def validate_args(self, msg, kwargs):
710         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
711         if d:
712             raise VPPValueError("Invalid argument {} to {}".format(list(d), msg.name))
713
714     def _add_stat(self, name, ms):
715         if not name in self.stats:
716             self.stats[name] = {"max": ms, "count": 1, "avg": ms}
717         else:
718             if ms > self.stats[name]["max"]:
719                 self.stats[name]["max"] = ms
720             self.stats[name]["count"] += 1
721             n = self.stats[name]["count"]
722             self.stats[name]["avg"] = self.stats[name]["avg"] * (n - 1) / n + ms / n
723
724     def get_stats(self):
725         s = "\n=== API PAPI STATISTICS ===\n"
726         s += "{:<30} {:>4} {:>6} {:>6}\n".format("message", "cnt", "avg", "max")
727         for n in sorted(self.stats.items(), key=lambda v: v[1]["avg"], reverse=True):
728             s += "{:<30} {:>4} {:>6.2f} {:>6.2f}\n".format(
729                 n[0], n[1]["count"], n[1]["avg"], n[1]["max"]
730             )
731         return s
732
733     def get_field_options(self, msg, fld_name):
734         # when there is an option, the msgdef has 3 elements.
735         # ['u32', 'ring_size', {'default': 1024}]
736         for _def in self.messages[msg].msgdef:
737             if isinstance(_def, list) and len(_def) == 3 and _def[1] == fld_name:
738                 return _def[2]
739
740     def _call_vpp(self, i, msgdef, service, **kwargs):
741         """Given a message, send the message and await a reply.
742
743         msgdef - the message packing definition
744         i - the message type index
745         multipart - True if the message returns multiple
746         messages in return.
747         context - context number - chosen at random if not
748         supplied.
749         The remainder of the kwargs are the arguments to the API call.
750
751         The return value is the message or message array containing
752         the response.  It will raise an IOError exception if there was
753         no response within the timeout window.
754         """
755         ts = time.time()
756         if "context" not in kwargs:
757             context = self.get_context()
758             kwargs["context"] = context
759         else:
760             context = kwargs["context"]
761         kwargs["_vl_msg_id"] = i
762
763         no_type_conversion = kwargs.pop("_no_type_conversion", False)
764         timeout = kwargs.pop("_timeout", None)
765
766         try:
767             if self.transport.socket_index:
768                 kwargs["client_index"] = self.transport.socket_index
769         except AttributeError:
770             pass
771         self.validate_args(msgdef, kwargs)
772
773         s = "Calling {}({})".format(
774             msgdef.name, ",".join(["{!r}:{!r}".format(k, v) for k, v in kwargs.items()])
775         )
776         self.logger.debug(s)
777
778         b = msgdef.pack(kwargs)
779         self.transport.suspend()
780
781         self.transport.write(b)
782
783         msgreply = service["reply"]
784         stream = True if "stream" in service else False
785         if stream:
786             if "stream_msg" in service:
787                 # New service['reply'] = _reply and service['stream_message'] = _details
788                 stream_message = service["stream_msg"]
789                 modern = True
790             else:
791                 # Old  service['reply'] = _details
792                 stream_message = msgreply
793                 msgreply = "control_ping_reply"
794                 modern = False
795                 # Send a ping after the request - we use its response
796                 # to detect that we have seen all results.
797                 self._control_ping(context)
798
799         # Block until we get a reply.
800         rl = []
801         while True:
802             r = self.read_blocking(no_type_conversion, timeout)
803             if r is None:
804                 raise VPPIOError(2, "VPP API client: read failed")
805             msgname = type(r).__name__
806             if context not in r or r.context == 0 or context != r.context:
807                 # Message being queued
808                 self.message_queue.put_nowait(r)
809                 continue
810             if msgname != msgreply and (stream and (msgname != stream_message)):
811                 print("REPLY MISMATCH", msgreply, msgname, stream_message, stream)
812             if not stream:
813                 rl = r
814                 break
815             if msgname == msgreply:
816                 if modern:  # Return both reply and list
817                     rl = r, rl
818                 break
819
820             rl.append(r)
821
822         self.transport.resume()
823
824         s = "Return value: {!r}".format(r)
825         if len(s) > 80:
826             s = s[:80] + "..."
827         self.logger.debug(s)
828         te = time.time()
829         self._add_stat(msgdef.name, (te - ts) * 1000)
830         return rl
831
832     def _call_vpp_async(self, i, msg, **kwargs):
833         """Given a message, send the message and return the context.
834
835         msgdef - the message packing definition
836         i - the message type index
837         context - context number - chosen at random if not
838         supplied.
839         The remainder of the kwargs are the arguments to the API call.
840
841         The reply message(s) will be delivered later to the registered callback.
842         The returned context will help with assigning which call
843         the reply belongs to.
844         """
845         if "context" not in kwargs:
846             context = self.get_context()
847             kwargs["context"] = context
848         else:
849             context = kwargs["context"]
850         try:
851             if self.transport.socket_index:
852                 kwargs["client_index"] = self.transport.socket_index
853         except AttributeError:
854             kwargs["client_index"] = 0
855         kwargs["_vl_msg_id"] = i
856         b = msg.pack(kwargs)
857
858         self.transport.write(b)
859         return context
860
861     def _call_vpp_pack(self, i, msg, **kwargs):
862         """Given a message, return the binary representation."""
863         kwargs["_vl_msg_id"] = i
864         kwargs["client_index"] = 0
865         kwargs["context"] = 0
866         return msg.pack(kwargs)
867
868     def read_blocking(self, no_type_conversion=False, timeout=None):
869         """Get next received message from transport within timeout, decoded.
870
871         Note that notifications have context zero
872         and are not put into receive queue (at least for socket transport),
873         use async_thread with registered callback for processing them.
874
875         If no message appears in the queue within timeout, return None.
876
877         Optionally, type conversion can be skipped,
878         as some of conversions are into less precise types.
879
880         When r is the return value of this, the caller can get message name as:
881             msgname = type(r).__name__
882         and context number (type long) as:
883             context = r.context
884
885         :param no_type_conversion: If false, type conversions are applied.
886         :type no_type_conversion: bool
887         :returns: Decoded message, or None if no message (within timeout).
888         :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
889         :raises VppTransportShmemIOError if timed out.
890         """
891         msg = self.transport.read(timeout=timeout)
892         if not msg:
893             return None
894         return self.decode_incoming_msg(msg, no_type_conversion)
895
896     def register_event_callback(self, callback):
897         """Register a callback for async messages.
898
899         This will be called for async notifications in sync mode,
900         and all messages in async mode.  In sync mode, replies to
901         requests will not come here.
902
903         callback is a fn(msg_type_name, msg_type) that will be
904         called when a message comes in.  While this function is
905         executing, note that (a) you are in a background thread and
906         may wish to use threading.Lock to protect your datastructures,
907         and (b) message processing from VPP will stop (so if you take
908         a long while about it you may provoke reply timeouts or cause
909         VPP to fill the RX buffer).  Passing None will disable the
910         callback.
911         """
912         self.event_callback = callback
913
914     def thread_msg_handler(self):
915         """Python thread calling the user registered message handler.
916
917         This is to emulate the old style event callback scheme. Modern
918         clients should provide their own thread to poll the event
919         queue.
920         """
921         while True:
922             r = self.message_queue.get()
923             if r == "terminate event thread":
924                 break
925             msgname = type(r).__name__
926             if self.event_callback:
927                 self.event_callback(msgname, r)
928
929     def validate_message_table(self, namecrctable):
930         """Take a dictionary of name_crc message names
931         and returns an array of missing messages"""
932
933         missing_table = []
934         for name_crc in namecrctable:
935             i = self.transport.get_msg_index(name_crc)
936             if i <= 0:
937                 missing_table.append(name_crc)
938         return missing_table
939
940     def dump_message_table(self):
941         """Return VPPs API message table as name_crc dictionary"""
942         return self.transport.message_table
943
944     def dump_message_table_filtered(self, msglist):
945         """Return VPPs API message table as name_crc dictionary,
946         filtered by message name list."""
947
948         replies = [self.services[n]["reply"] for n in msglist]
949         message_table_filtered = {}
950         for name in msglist + replies:
951             for k, v in self.transport.message_table.items():
952                 if k.startswith(name):
953                     message_table_filtered[k] = v
954                     break
955         return message_table_filtered
956
957     def __repr__(self):
958         return (
959             "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, "
960             "logger=%s, read_timeout=%s, "
961             "server_address='%s'>"
962             % (
963                 self._apifiles,
964                 self.testmode,
965                 self.async_thread,
966                 self.logger,
967                 self.read_timeout,
968                 self.server_address,
969             )
970         )
971
972     def details_iter(self, f, **kwargs):
973         cursor = 0
974         while True:
975             kwargs["cursor"] = cursor
976             rv, details = f(**kwargs)
977             for d in details:
978                 yield d
979             if rv.retval == 0 or rv.retval != -165:
980                 break
981             cursor = rv.cursor