papi: support old style of providing apidir
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import ctypes
20 import ipaddress
21 import sys
22 import multiprocessing as mp
23 import os
24 import queue
25 import logging
26 import functools
27 import json
28 import threading
29 import fnmatch
30 import weakref
31 import atexit
32 import time
33 from .vpp_format import verify_enum_hint
34 from .vpp_serializer import VPPType, VPPEnumType, VPPEnumFlagType, VPPUnionType
35 from .vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
36
37 try:
38     import VppTransport
39 except ModuleNotFoundError:
40
41     class V:
42         """placeholder for VppTransport as the implementation is dependent on
43         VPPAPIClient's initialization values
44         """
45
46     VppTransport = V
47
48 from .vpp_transport_socket import VppTransport
49
50 logger = logging.getLogger("vpp_papi")
51 logger.addHandler(logging.NullHandler())
52
53 __all__ = (
54     "FuncWrapper",
55     "VppApiDynamicMethodHolder",
56     "VppEnum",
57     "VppEnumType",
58     "VppEnumFlag",
59     "VPPIOError",
60     "VPPRuntimeError",
61     "VPPValueError",
62     "VPPApiClient",
63 )
64
65
66 def metaclass(metaclass):
67     @functools.wraps(metaclass)
68     def wrapper(cls):
69         return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
70
71     return wrapper
72
73
74 class VppEnumType(type):
75     def __getattr__(cls, name):
76         t = vpp_get_type(name)
77         return t.enum
78
79
80 @metaclass(VppEnumType)
81 class VppEnum:
82     pass
83
84
85 @metaclass(VppEnumType)
86 class VppEnumFlag:
87     pass
88
89
90 def vpp_atexit(vpp_weakref):
91     """Clean up VPP connection on shutdown."""
92     vpp_instance = vpp_weakref()
93     if vpp_instance and vpp_instance.transport.connected:
94         logger.debug("Cleaning up VPP on exit")
95         vpp_instance.disconnect()
96
97
98 def add_convenience_methods():
99     # provide convenience methods to IP[46]Address.vapi_af
100     def _vapi_af(self):
101         if 6 == self._version:
102             return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
103         if 4 == self._version:
104             return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
105         raise ValueError("Invalid _version.")
106
107     def _vapi_af_name(self):
108         if 6 == self._version:
109             return "ip6"
110         if 4 == self._version:
111             return "ip4"
112         raise ValueError("Invalid _version.")
113
114     ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
115     ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
116
117
118 class VppApiDynamicMethodHolder:
119     pass
120
121
122 class FuncWrapper:
123     def __init__(self, func):
124         self._func = func
125         self.__name__ = func.__name__
126         self.__doc__ = func.__doc__
127
128     def __call__(self, **kwargs):
129         return self._func(**kwargs)
130
131     def __repr__(self):
132         return "<FuncWrapper(func=<%s(%s)>)>" % (self.__name__, self.__doc__)
133
134
135 class VPPApiError(Exception):
136     pass
137
138
139 class VPPNotImplementedError(NotImplementedError):
140     pass
141
142
143 class VPPIOError(IOError):
144     pass
145
146
147 class VPPRuntimeError(RuntimeError):
148     pass
149
150
151 class VPPValueError(ValueError):
152     pass
153
154
155 class VPPApiJSONFiles:
156     @classmethod
157     def find_api_dir(cls, dirs=[]):
158         """Attempt to find the best directory in which API definition
159         files may reside. If the value VPP_API_DIR exists in the environment
160         then it is first on the search list. If we're inside a recognized
161         location in a VPP source tree (src/scripts and src/vpp-api/python)
162         then entries from there to the likely locations in build-root are
163         added. Finally the location used by system packages is added.
164
165         :returns: A single directory name, or None if no such directory
166             could be found.
167         """
168
169         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
170         # in which case, plot a course to likely places in the src tree
171         import __main__ as main
172
173         if os.getenv("VPP_API_DIR"):
174             dirs.append(os.getenv("VPP_API_DIR"))
175
176         if hasattr(main, "__file__"):
177             # get the path of the calling script
178             localdir = os.path.dirname(os.path.realpath(main.__file__))
179         else:
180             # use cwd if there is no calling script
181             localdir = os.getcwd()
182         localdir_s = localdir.split(os.path.sep)
183
184         def dmatch(dir):
185             """Match dir against right-hand components of the script dir"""
186             d = dir.split("/")  # param 'dir' assumes a / separator
187             length = len(d)
188             return len(localdir_s) > length and localdir_s[-length:] == d
189
190         def sdir(srcdir, variant):
191             """Build a path from srcdir to the staged API files of
192             'variant'  (typically '' or '_debug')"""
193             # Since 'core' and 'plugin' files are staged
194             # in separate directories, we target the parent dir.
195             return os.path.sep.join(
196                 (
197                     srcdir,
198                     "build-root",
199                     "install-vpp%s-native" % variant,
200                     "vpp",
201                     "share",
202                     "vpp",
203                     "api",
204                 )
205             )
206
207         srcdir = None
208         if dmatch("src/scripts"):
209             srcdir = os.path.sep.join(localdir_s[:-2])
210         elif dmatch("src/vpp-api/python"):
211             srcdir = os.path.sep.join(localdir_s[:-3])
212         elif dmatch("test"):
213             # we're apparently running tests
214             srcdir = os.path.sep.join(localdir_s[:-1])
215
216         if srcdir:
217             # we're in the source tree, try both the debug and release
218             # variants.
219             dirs.append(sdir(srcdir, "_debug"))
220             dirs.append(sdir(srcdir, ""))
221
222         # Test for staged copies of the scripts
223         # For these, since we explicitly know if we're running a debug versus
224         # release variant, target only the relevant directory
225         if dmatch("build-root/install-vpp_debug-native/vpp/bin"):
226             srcdir = os.path.sep.join(localdir_s[:-4])
227             dirs.append(sdir(srcdir, "_debug"))
228         if dmatch("build-root/install-vpp-native/vpp/bin"):
229             srcdir = os.path.sep.join(localdir_s[:-4])
230             dirs.append(sdir(srcdir, ""))
231
232         # finally, try the location system packages typically install into
233         dirs.append(os.path.sep.join(("", "usr", "share", "vpp", "api")))
234
235         # check the directories for existence; first one wins
236         for dir in dirs:
237             if os.path.isdir(dir):
238                 return dir
239
240         return None
241
242     @classmethod
243     def find_api_files(cls, api_dir=None, patterns="*"):  # -> list
244         """Find API definition files from the given directory tree with the
245         given pattern. If no directory is given then find_api_dir() is used
246         to locate one. If no pattern is given then all definition files found
247         in the directory tree are used.
248
249         :param api_dir: A directory tree in which to locate API definition
250             files; subdirectories are descended into.
251             If this is None then find_api_dir() is called to discover it.
252         :param patterns: A list of patterns to use in each visited directory
253             when looking for files.
254             This can be a list/tuple object or a comma-separated string of
255             patterns. Each value in the list will have leading/trialing
256             whitespace stripped.
257             The pattern specifies the first part of the filename, '.api.json'
258             is appended.
259             The results are de-duplicated, thus overlapping patterns are fine.
260             If this is None it defaults to '*' meaning "all API files".
261         :returns: A list of file paths for the API files found.
262         """
263         if api_dir is None:
264             api_dir = cls.find_api_dir([])
265             if api_dir is None:
266                 raise VPPApiError("api_dir cannot be located")
267
268         if isinstance(patterns, list) or isinstance(patterns, tuple):
269             patterns = [p.strip() + ".api.json" for p in patterns]
270         else:
271             patterns = [p.strip() + ".api.json" for p in patterns.split(",")]
272
273         api_files = []
274         for root, dirnames, files in os.walk(api_dir):
275             # iterate all given patterns and de-dup the result
276             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
277             for filename in files:
278                 api_files.append(os.path.join(root, filename))
279
280         return api_files
281
282     @classmethod
283     def process_json_file(self, apidef_file):
284         return self._process_json(apidef_file.read())
285
286     @classmethod
287     def process_json_str(self, json_str):
288         return self._process_json(json_str)
289
290     @staticmethod
291     def _process_json(json_str):  # -> Tuple[Dict, Dict]
292         api = json.loads(json_str)
293         types = {}
294         services = {}
295         messages = {}
296         try:
297             for t in api["enums"]:
298                 t[0] = "vl_api_" + t[0] + "_t"
299                 types[t[0]] = {"type": "enum", "data": t}
300         except KeyError:
301             pass
302         try:
303             for t in api["enumflags"]:
304                 t[0] = "vl_api_" + t[0] + "_t"
305                 types[t[0]] = {"type": "enum", "data": t}
306         except KeyError:
307             pass
308         try:
309             for t in api["unions"]:
310                 t[0] = "vl_api_" + t[0] + "_t"
311                 types[t[0]] = {"type": "union", "data": t}
312         except KeyError:
313             pass
314
315         try:
316             for t in api["types"]:
317                 t[0] = "vl_api_" + t[0] + "_t"
318                 types[t[0]] = {"type": "type", "data": t}
319         except KeyError:
320             pass
321
322         try:
323             for t, v in api["aliases"].items():
324                 types["vl_api_" + t + "_t"] = {"type": "alias", "data": v}
325         except KeyError:
326             pass
327
328         try:
329             services.update(api["services"])
330         except KeyError:
331             pass
332
333         i = 0
334         while True:
335             unresolved = {}
336             for k, v in types.items():
337                 t = v["data"]
338                 if not vpp_get_type(k):
339                     if v["type"] == "enum":
340                         try:
341                             VPPEnumType(t[0], t[1:])
342                         except ValueError:
343                             unresolved[k] = v
344                 if not vpp_get_type(k):
345                     if v["type"] == "enumflag":
346                         try:
347                             VPPEnumFlagType(t[0], t[1:])
348                         except ValueError:
349                             unresolved[k] = v
350                     elif v["type"] == "union":
351                         try:
352                             VPPUnionType(t[0], t[1:])
353                         except ValueError:
354                             unresolved[k] = v
355                     elif v["type"] == "type":
356                         try:
357                             VPPType(t[0], t[1:])
358                         except ValueError:
359                             unresolved[k] = v
360                     elif v["type"] == "alias":
361                         try:
362                             VPPTypeAlias(k, t)
363                         except ValueError:
364                             unresolved[k] = v
365             if len(unresolved) == 0:
366                 break
367             if i > 3:
368                 raise VPPValueError("Unresolved type definitions {}".format(unresolved))
369             types = unresolved
370             i += 1
371         try:
372             for m in api["messages"]:
373                 try:
374                     messages[m[0]] = VPPMessage(m[0], m[1:])
375                 except VPPNotImplementedError:
376                     ### OLE FIXME
377                     logger.error("Not implemented error for {}".format(m[0]))
378         except KeyError:
379             pass
380         return messages, services
381
382     @staticmethod
383     def load_api(apifiles=None, apidir=None):
384         messages = {}
385         services = {}
386         if not apifiles:
387             # Pick up API definitions from default directory
388             try:
389                 if isinstance(apidir, list):
390                     apifiles = []
391                     for d in apidir:
392                         apifiles += VPPApiJSONFiles.find_api_files(d)
393                 else:
394                     apifiles = VPPApiJSONFiles.find_api_files(apidir)
395             except (RuntimeError, VPPApiError):
396                 raise VPPRuntimeError
397
398         for file in apifiles:
399             with open(file) as apidef_file:
400                 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
401                 messages.update(m)
402                 services.update(s)
403
404         return apifiles, messages, services
405
406
407 class VPPApiClient:
408     """VPP interface.
409
410     This class provides the APIs to VPP.  The APIs are loaded
411     from provided .api.json files and makes functions accordingly.
412     These functions are documented in the VPP .api files, as they
413     are dynamically created.
414
415     Additionally, VPP can send callback messages; this class
416     provides a means to register a callback function to receive
417     these messages in a background thread.
418     """
419
420     VPPApiError = VPPApiError
421     VPPRuntimeError = VPPRuntimeError
422     VPPValueError = VPPValueError
423     VPPNotImplementedError = VPPNotImplementedError
424     VPPIOError = VPPIOError
425
426     def __init__(
427         self,
428         *,
429         apifiles=None,
430         apidir=None,
431         testmode=False,
432         async_thread=True,
433         logger=None,
434         loglevel=None,
435         read_timeout=5,
436         use_socket=True,
437         server_address="/run/vpp/api.sock",
438     ):
439         """Create a VPP API object.
440
441         apifiles is a list of files containing API
442         descriptions that will be loaded - methods will be
443         dynamically created reflecting these APIs.  If not
444         provided this will load the API files from VPP's
445         default install location.
446
447         logger, if supplied, is the logging logger object to log to.
448         loglevel, if supplied, is the log level this logger is set
449         to report at (from the loglevels in the logging module).
450         """
451         if logger is None:
452             logger = logging.getLogger(
453                 "{}.{}".format(__name__, self.__class__.__name__)
454             )
455             if loglevel is not None:
456                 logger.setLevel(loglevel)
457         self.logger = logger
458
459         self.messages = {}
460         self.services = {}
461         self.id_names = []
462         self.id_msgdef = []
463         self.header = VPPType("header", [["u16", "msgid"], ["u32", "client_index"]])
464         self.apifiles = []
465         self.apidir = apidir
466         self.event_callback = None
467         self.message_queue = queue.Queue()
468         self.read_timeout = read_timeout
469         self.async_thread = async_thread
470         self.event_thread = None
471         self.testmode = testmode
472         self.server_address = server_address
473         self._apifiles = apifiles
474         self.stats = {}
475
476         if self.apidir is None and hasattr(self.__class__, "apidir"):
477             # Keep supporting the old style of providing apidir.
478             self.apidir = self.__class__.apidir
479         try:
480             self.apifiles, self.messages, self.services = VPPApiJSONFiles.load_api(
481                 apifiles, self.apidir
482             )
483         except VPPRuntimeError as e:
484             if testmode:
485                 self.apifiles = []
486             else:
487                 raise e
488
489         # Basic sanity check
490         if len(self.messages) == 0 and not testmode:
491             raise VPPValueError(1, "Missing JSON message definitions")
492         if not (verify_enum_hint(VppEnum.vl_api_address_family_t)):
493             raise VPPRuntimeError("Invalid address family hints. " "Cannot continue.")
494
495         self.transport = VppTransport(
496             self, read_timeout=read_timeout, server_address=server_address
497         )
498         # Make sure we allow VPP to clean up the message rings.
499         atexit.register(vpp_atexit, weakref.ref(self))
500
501         add_convenience_methods()
502
503     def get_function(self, name):
504         return getattr(self._api, name)
505
506     class ContextId:
507         """Multiprocessing-safe provider of unique context IDs."""
508
509         def __init__(self):
510             self.context = mp.Value(ctypes.c_uint, 0)
511             self.lock = mp.Lock()
512
513         def __call__(self):
514             """Get a new unique (or, at least, not recently used) context."""
515             with self.lock:
516                 self.context.value += 1
517                 return self.context.value
518
519     get_context = ContextId()
520
521     def get_type(self, name):
522         return vpp_get_type(name)
523
524     @property
525     def api(self):
526         if not hasattr(self, "_api"):
527             raise VPPApiError("Not connected, api definitions not available")
528         return self._api
529
530     def make_function(self, msg, i, multipart, do_async):
531         if do_async:
532
533             def f(**kwargs):
534                 return self._call_vpp_async(i, msg, **kwargs)
535
536         else:
537
538             def f(**kwargs):
539                 return self._call_vpp(i, msg, multipart, **kwargs)
540
541         f.__name__ = str(msg.name)
542         f.__doc__ = ", ".join(
543             ["%s %s" % (msg.fieldtypes[j], k) for j, k in enumerate(msg.fields)]
544         )
545         f.msg = msg
546
547         return f
548
549     def make_pack_function(self, msg, i, multipart):
550         def f(**kwargs):
551             return self._call_vpp_pack(i, msg, **kwargs)
552
553         f.msg = msg
554         return f
555
556     def _register_functions(self, do_async=False):
557         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
558         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
559         self._api = VppApiDynamicMethodHolder()
560         for name, msg in self.messages.items():
561             n = name + "_" + msg.crc[2:]
562             i = self.transport.get_msg_index(n)
563             if i > 0:
564                 self.id_msgdef[i] = msg
565                 self.id_names[i] = name
566
567                 # Create function for client side messages.
568                 if name in self.services:
569                     f = self.make_function(msg, i, self.services[name], do_async)
570                     f_pack = self.make_pack_function(msg, i, self.services[name])
571                     setattr(self._api, name, FuncWrapper(f))
572                     setattr(self._api, name + "_pack", FuncWrapper(f_pack))
573             else:
574                 self.logger.debug("No such message type or failed CRC checksum: %s", n)
575
576     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, do_async):
577         pfx = chroot_prefix.encode("utf-8") if chroot_prefix else None
578
579         rv = self.transport.connect(name, pfx, msg_handler, rx_qlen, do_async)
580         if rv != 0:
581             raise VPPIOError(2, "Connect failed")
582         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
583         self._register_functions(do_async=do_async)
584
585         # Initialise control ping
586         crc = self.messages["control_ping"].crc
587         self.control_ping_index = self.transport.get_msg_index(
588             ("control_ping" + "_" + crc[2:])
589         )
590         self.control_ping_msgdef = self.messages["control_ping"]
591         if self.async_thread:
592             self.event_thread = threading.Thread(target=self.thread_msg_handler)
593             self.event_thread.daemon = True
594             self.event_thread.start()
595         else:
596             self.event_thread = None
597         return rv
598
599     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
600         """Attach to VPP.
601
602         name - the name of the client.
603         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
604         do_async - if true, messages are sent without waiting for a reply
605         rx_qlen - the length of the VPP message receive queue between
606         client and server.
607         """
608         msg_handler = self.transport.get_callback(do_async)
609         return self.connect_internal(
610             name, msg_handler, chroot_prefix, rx_qlen, do_async
611         )
612
613     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
614         """Attach to VPP in synchronous mode. Application must poll for events.
615
616         name - the name of the client.
617         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
618         rx_qlen - the length of the VPP message receive queue between
619         client and server.
620         """
621
622         return self.connect_internal(name, None, chroot_prefix, rx_qlen, do_async=False)
623
624     def disconnect(self):
625         """Detach from VPP."""
626         rv = self.transport.disconnect()
627         if self.event_thread is not None:
628             self.message_queue.put("terminate event thread")
629         return rv
630
631     def msg_handler_sync(self, msg):
632         """Process an incoming message from VPP in sync mode.
633
634         The message may be a reply or it may be an async notification.
635         """
636         r = self.decode_incoming_msg(msg)
637         if r is None:
638             return
639
640         # If we have a context, then use the context to find any
641         # request waiting for a reply
642         context = 0
643         if hasattr(r, "context") and r.context > 0:
644             context = r.context
645
646         if context == 0:
647             # No context -> async notification that we feed to the callback
648             self.message_queue.put_nowait(r)
649         else:
650             raise VPPIOError(2, "RPC reply message received in event handler")
651
652     def has_context(self, msg):
653         if len(msg) < 10:
654             return False
655
656         header = VPPType(
657             "header_with_context",
658             [["u16", "msgid"], ["u32", "client_index"], ["u32", "context"]],
659         )
660
661         (i, ci, context), size = header.unpack(msg, 0)
662         if self.id_names[i] == "rx_thread_exit":
663             return
664
665         #
666         # Decode message and returns a tuple.
667         #
668         msgobj = self.id_msgdef[i]
669         if "context" in msgobj.field_by_name and context >= 0:
670             return True
671         return False
672
673     def decode_incoming_msg(self, msg, no_type_conversion=False):
674         if not msg:
675             logger.warning("vpp_api.read failed")
676             return
677
678         (i, ci), size = self.header.unpack(msg, 0)
679         if self.id_names[i] == "rx_thread_exit":
680             return
681
682         #
683         # Decode message and returns a tuple.
684         #
685         msgobj = self.id_msgdef[i]
686         if not msgobj:
687             raise VPPIOError(2, "Reply message undefined")
688
689         r, size = msgobj.unpack(msg, ntc=no_type_conversion)
690         return r
691
692     def msg_handler_async(self, msg):
693         """Process a message from VPP in async mode.
694
695         In async mode, all messages are returned to the callback.
696         """
697         r = self.decode_incoming_msg(msg)
698         if r is None:
699             return
700
701         msgname = type(r).__name__
702
703         if self.event_callback:
704             self.event_callback(msgname, r)
705
706     def _control_ping(self, context):
707         """Send a ping command."""
708         self._call_vpp_async(
709             self.control_ping_index, self.control_ping_msgdef, context=context
710         )
711
712     def validate_args(self, msg, kwargs):
713         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
714         if d:
715             raise VPPValueError("Invalid argument {} to {}".format(list(d), msg.name))
716
717     def _add_stat(self, name, ms):
718         if not name in self.stats:
719             self.stats[name] = {"max": ms, "count": 1, "avg": ms}
720         else:
721             if ms > self.stats[name]["max"]:
722                 self.stats[name]["max"] = ms
723             self.stats[name]["count"] += 1
724             n = self.stats[name]["count"]
725             self.stats[name]["avg"] = self.stats[name]["avg"] * (n - 1) / n + ms / n
726
727     def get_stats(self):
728         s = "\n=== API PAPI STATISTICS ===\n"
729         s += "{:<30} {:>4} {:>6} {:>6}\n".format("message", "cnt", "avg", "max")
730         for n in sorted(self.stats.items(), key=lambda v: v[1]["avg"], reverse=True):
731             s += "{:<30} {:>4} {:>6.2f} {:>6.2f}\n".format(
732                 n[0], n[1]["count"], n[1]["avg"], n[1]["max"]
733             )
734         return s
735
736     def get_field_options(self, msg, fld_name):
737         # when there is an option, the msgdef has 3 elements.
738         # ['u32', 'ring_size', {'default': 1024}]
739         for _def in self.messages[msg].msgdef:
740             if isinstance(_def, list) and len(_def) == 3 and _def[1] == fld_name:
741                 return _def[2]
742
743     def _call_vpp(self, i, msgdef, service, **kwargs):
744         """Given a message, send the message and await a reply.
745
746         msgdef - the message packing definition
747         i - the message type index
748         multipart - True if the message returns multiple
749         messages in return.
750         context - context number - chosen at random if not
751         supplied.
752         The remainder of the kwargs are the arguments to the API call.
753
754         The return value is the message or message array containing
755         the response.  It will raise an IOError exception if there was
756         no response within the timeout window.
757         """
758         ts = time.time()
759         if "context" not in kwargs:
760             context = self.get_context()
761             kwargs["context"] = context
762         else:
763             context = kwargs["context"]
764         kwargs["_vl_msg_id"] = i
765
766         no_type_conversion = kwargs.pop("_no_type_conversion", False)
767         timeout = kwargs.pop("_timeout", None)
768
769         try:
770             if self.transport.socket_index:
771                 kwargs["client_index"] = self.transport.socket_index
772         except AttributeError:
773             pass
774         self.validate_args(msgdef, kwargs)
775
776         s = "Calling {}({})".format(
777             msgdef.name, ",".join(["{!r}:{!r}".format(k, v) for k, v in kwargs.items()])
778         )
779         self.logger.debug(s)
780
781         b = msgdef.pack(kwargs)
782         self.transport.suspend()
783
784         self.transport.write(b)
785
786         msgreply = service["reply"]
787         stream = True if "stream" in service else False
788         if stream:
789             if "stream_msg" in service:
790                 # New service['reply'] = _reply and service['stream_message'] = _details
791                 stream_message = service["stream_msg"]
792                 modern = True
793             else:
794                 # Old  service['reply'] = _details
795                 stream_message = msgreply
796                 msgreply = "control_ping_reply"
797                 modern = False
798                 # Send a ping after the request - we use its response
799                 # to detect that we have seen all results.
800                 self._control_ping(context)
801
802         # Block until we get a reply.
803         rl = []
804         while True:
805             r = self.read_blocking(no_type_conversion, timeout)
806             if r is None:
807                 raise VPPIOError(2, "VPP API client: read failed")
808             msgname = type(r).__name__
809             if context not in r or r.context == 0 or context != r.context:
810                 # Message being queued
811                 self.message_queue.put_nowait(r)
812                 continue
813             if msgname != msgreply and (stream and (msgname != stream_message)):
814                 print("REPLY MISMATCH", msgreply, msgname, stream_message, stream)
815             if not stream:
816                 rl = r
817                 break
818             if msgname == msgreply:
819                 if modern:  # Return both reply and list
820                     rl = r, rl
821                 break
822
823             rl.append(r)
824
825         self.transport.resume()
826
827         s = "Return value: {!r}".format(r)
828         if len(s) > 80:
829             s = s[:80] + "..."
830         self.logger.debug(s)
831         te = time.time()
832         self._add_stat(msgdef.name, (te - ts) * 1000)
833         return rl
834
835     def _call_vpp_async(self, i, msg, **kwargs):
836         """Given a message, send the message and return the context.
837
838         msgdef - the message packing definition
839         i - the message type index
840         context - context number - chosen at random if not
841         supplied.
842         The remainder of the kwargs are the arguments to the API call.
843
844         The reply message(s) will be delivered later to the registered callback.
845         The returned context will help with assigning which call
846         the reply belongs to.
847         """
848         if "context" not in kwargs:
849             context = self.get_context()
850             kwargs["context"] = context
851         else:
852             context = kwargs["context"]
853         try:
854             if self.transport.socket_index:
855                 kwargs["client_index"] = self.transport.socket_index
856         except AttributeError:
857             kwargs["client_index"] = 0
858         kwargs["_vl_msg_id"] = i
859         b = msg.pack(kwargs)
860
861         self.transport.write(b)
862         return context
863
864     def _call_vpp_pack(self, i, msg, **kwargs):
865         """Given a message, return the binary representation."""
866         kwargs["_vl_msg_id"] = i
867         kwargs["client_index"] = 0
868         kwargs["context"] = 0
869         return msg.pack(kwargs)
870
871     def read_blocking(self, no_type_conversion=False, timeout=None):
872         """Get next received message from transport within timeout, decoded.
873
874         Note that notifications have context zero
875         and are not put into receive queue (at least for socket transport),
876         use async_thread with registered callback for processing them.
877
878         If no message appears in the queue within timeout, return None.
879
880         Optionally, type conversion can be skipped,
881         as some of conversions are into less precise types.
882
883         When r is the return value of this, the caller can get message name as:
884             msgname = type(r).__name__
885         and context number (type long) as:
886             context = r.context
887
888         :param no_type_conversion: If false, type conversions are applied.
889         :type no_type_conversion: bool
890         :returns: Decoded message, or None if no message (within timeout).
891         :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
892         :raises VppTransportShmemIOError if timed out.
893         """
894         msg = self.transport.read(timeout=timeout)
895         if not msg:
896             return None
897         return self.decode_incoming_msg(msg, no_type_conversion)
898
899     def register_event_callback(self, callback):
900         """Register a callback for async messages.
901
902         This will be called for async notifications in sync mode,
903         and all messages in async mode.  In sync mode, replies to
904         requests will not come here.
905
906         callback is a fn(msg_type_name, msg_type) that will be
907         called when a message comes in.  While this function is
908         executing, note that (a) you are in a background thread and
909         may wish to use threading.Lock to protect your datastructures,
910         and (b) message processing from VPP will stop (so if you take
911         a long while about it you may provoke reply timeouts or cause
912         VPP to fill the RX buffer).  Passing None will disable the
913         callback.
914         """
915         self.event_callback = callback
916
917     def thread_msg_handler(self):
918         """Python thread calling the user registered message handler.
919
920         This is to emulate the old style event callback scheme. Modern
921         clients should provide their own thread to poll the event
922         queue.
923         """
924         while True:
925             r = self.message_queue.get()
926             if r == "terminate event thread":
927                 break
928             msgname = type(r).__name__
929             if self.event_callback:
930                 self.event_callback(msgname, r)
931
932     def validate_message_table(self, namecrctable):
933         """Take a dictionary of name_crc message names
934         and returns an array of missing messages"""
935
936         missing_table = []
937         for name_crc in namecrctable:
938             i = self.transport.get_msg_index(name_crc)
939             if i <= 0:
940                 missing_table.append(name_crc)
941         return missing_table
942
943     def dump_message_table(self):
944         """Return VPPs API message table as name_crc dictionary"""
945         return self.transport.message_table
946
947     def dump_message_table_filtered(self, msglist):
948         """Return VPPs API message table as name_crc dictionary,
949         filtered by message name list."""
950
951         replies = [self.services[n]["reply"] for n in msglist]
952         message_table_filtered = {}
953         for name in msglist + replies:
954             for k, v in self.transport.message_table.items():
955                 if k.startswith(name):
956                     message_table_filtered[k] = v
957                     break
958         return message_table_filtered
959
960     def __repr__(self):
961         return (
962             "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, "
963             "logger=%s, read_timeout=%s, "
964             "server_address='%s'>"
965             % (
966                 self._apifiles,
967                 self.testmode,
968                 self.async_thread,
969                 self.logger,
970                 self.read_timeout,
971                 self.server_address,
972             )
973         )
974
975     def details_iter(self, f, **kwargs):
976         cursor = 0
977         while True:
978             kwargs["cursor"] = cursor
979             rv, details = f(**kwargs)
980             for d in details:
981                 yield d
982             if rv.retval == 0 or rv.retval != -165:
983                 break
984             cursor = rv.cursor