papi: remove shared memory transport
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import ctypes
20 import ipaddress
21 import sys
22 import multiprocessing as mp
23 import os
24 import queue
25 import logging
26 import functools
27 import json
28 import threading
29 import fnmatch
30 import weakref
31 import atexit
32 import time
33 from . vpp_format import verify_enum_hint
34 from . vpp_serializer import VPPType, VPPEnumType, VPPEnumFlagType, VPPUnionType
35 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
36
37 try:
38     import VppTransport
39 except ModuleNotFoundError:
40     class V:
41         """placeholder for VppTransport as the implementation is dependent on
42         VPPAPIClient's initialization values
43         """
44
45     VppTransport = V
46
47 from . vpp_transport_socket import VppTransport
48
49 logger = logging.getLogger('vpp_papi')
50 logger.addHandler(logging.NullHandler())
51
52 __all__ = ('FuncWrapper', 'VppApiDynamicMethodHolder',
53            'VppEnum', 'VppEnumType', 'VppEnumFlag',
54            'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
55            'VPPApiClient', )
56
57
58 def metaclass(metaclass):
59     @functools.wraps(metaclass)
60     def wrapper(cls):
61         return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
62
63     return wrapper
64
65
66 class VppEnumType(type):
67     def __getattr__(cls, name):
68         t = vpp_get_type(name)
69         return t.enum
70
71
72 @metaclass(VppEnumType)
73 class VppEnum:
74     pass
75
76
77 @metaclass(VppEnumType)
78 class VppEnumFlag:
79     pass
80
81
82 def vpp_atexit(vpp_weakref):
83     """Clean up VPP connection on shutdown."""
84     vpp_instance = vpp_weakref()
85     if vpp_instance and vpp_instance.transport.connected:
86         logger.debug('Cleaning up VPP on exit')
87         vpp_instance.disconnect()
88
89
90 def add_convenience_methods():
91     # provide convenience methods to IP[46]Address.vapi_af
92     def _vapi_af(self):
93         if 6 == self._version:
94             return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
95         if 4 == self._version:
96             return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
97         raise ValueError("Invalid _version.")
98
99     def _vapi_af_name(self):
100         if 6 == self._version:
101             return 'ip6'
102         if 4 == self._version:
103             return 'ip4'
104         raise ValueError("Invalid _version.")
105
106     ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
107     ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
108
109
110 class VppApiDynamicMethodHolder:
111     pass
112
113
114 class FuncWrapper:
115     def __init__(self, func):
116         self._func = func
117         self.__name__ = func.__name__
118         self.__doc__ = func.__doc__
119
120     def __call__(self, **kwargs):
121         return self._func(**kwargs)
122
123     def __repr__(self):
124         return '<FuncWrapper(func=<%s(%s)>)>' % (self.__name__, self.__doc__)
125
126
127 class VPPApiError(Exception):
128     pass
129
130
131 class VPPNotImplementedError(NotImplementedError):
132     pass
133
134
135 class VPPIOError(IOError):
136     pass
137
138
139 class VPPRuntimeError(RuntimeError):
140     pass
141
142
143 class VPPValueError(ValueError):
144     pass
145
146
147 class VPPApiJSONFiles:
148     @classmethod
149     def find_api_dir(cls, dirs):
150         """Attempt to find the best directory in which API definition
151         files may reside. If the value VPP_API_DIR exists in the environment
152         then it is first on the search list. If we're inside a recognized
153         location in a VPP source tree (src/scripts and src/vpp-api/python)
154         then entries from there to the likely locations in build-root are
155         added. Finally the location used by system packages is added.
156
157         :returns: A single directory name, or None if no such directory
158             could be found.
159         """
160
161         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
162         # in which case, plot a course to likely places in the src tree
163         import __main__ as main
164         if hasattr(main, '__file__'):
165             # get the path of the calling script
166             localdir = os.path.dirname(os.path.realpath(main.__file__))
167         else:
168             # use cwd if there is no calling script
169             localdir = os.getcwd()
170         localdir_s = localdir.split(os.path.sep)
171
172         def dmatch(dir):
173             """Match dir against right-hand components of the script dir"""
174             d = dir.split('/')  # param 'dir' assumes a / separator
175             length = len(d)
176             return len(localdir_s) > length and localdir_s[-length:] == d
177
178         def sdir(srcdir, variant):
179             """Build a path from srcdir to the staged API files of
180             'variant'  (typically '' or '_debug')"""
181             # Since 'core' and 'plugin' files are staged
182             # in separate directories, we target the parent dir.
183             return os.path.sep.join((
184                 srcdir,
185                 'build-root',
186                 'install-vpp%s-native' % variant,
187                 'vpp',
188                 'share',
189                 'vpp',
190                 'api',
191             ))
192
193         srcdir = None
194         if dmatch('src/scripts'):
195             srcdir = os.path.sep.join(localdir_s[:-2])
196         elif dmatch('src/vpp-api/python'):
197             srcdir = os.path.sep.join(localdir_s[:-3])
198         elif dmatch('test'):
199             # we're apparently running tests
200             srcdir = os.path.sep.join(localdir_s[:-1])
201
202         if srcdir:
203             # we're in the source tree, try both the debug and release
204             # variants.
205             dirs.append(sdir(srcdir, '_debug'))
206             dirs.append(sdir(srcdir, ''))
207
208         # Test for staged copies of the scripts
209         # For these, since we explicitly know if we're running a debug versus
210         # release variant, target only the relevant directory
211         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
212             srcdir = os.path.sep.join(localdir_s[:-4])
213             dirs.append(sdir(srcdir, '_debug'))
214         if dmatch('build-root/install-vpp-native/vpp/bin'):
215             srcdir = os.path.sep.join(localdir_s[:-4])
216             dirs.append(sdir(srcdir, ''))
217
218         # finally, try the location system packages typically install into
219         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
220
221         # check the directories for existence; first one wins
222         for dir in dirs:
223             if os.path.isdir(dir):
224                 return dir
225
226         return None
227
228     @classmethod
229     def find_api_files(cls, api_dir=None, patterns='*'):  # -> list
230         """Find API definition files from the given directory tree with the
231         given pattern. If no directory is given then find_api_dir() is used
232         to locate one. If no pattern is given then all definition files found
233         in the directory tree are used.
234
235         :param api_dir: A directory tree in which to locate API definition
236             files; subdirectories are descended into.
237             If this is None then find_api_dir() is called to discover it.
238         :param patterns: A list of patterns to use in each visited directory
239             when looking for files.
240             This can be a list/tuple object or a comma-separated string of
241             patterns. Each value in the list will have leading/trialing
242             whitespace stripped.
243             The pattern specifies the first part of the filename, '.api.json'
244             is appended.
245             The results are de-duplicated, thus overlapping patterns are fine.
246             If this is None it defaults to '*' meaning "all API files".
247         :returns: A list of file paths for the API files found.
248         """
249         if api_dir is None:
250             api_dir = cls.find_api_dir([])
251             if api_dir is None:
252                 raise VPPApiError("api_dir cannot be located")
253
254         if isinstance(patterns, list) or isinstance(patterns, tuple):
255             patterns = [p.strip() + '.api.json' for p in patterns]
256         else:
257             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
258
259         api_files = []
260         for root, dirnames, files in os.walk(api_dir):
261             # iterate all given patterns and de-dup the result
262             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
263             for filename in files:
264                 api_files.append(os.path.join(root, filename))
265
266         return api_files
267
268     @classmethod
269     def process_json_file(self, apidef_file):
270         api = json.load(apidef_file)
271         return self._process_json(api)
272
273     @classmethod
274     def process_json_str(self, json_str):
275         api = json.loads(json_str)
276         return self._process_json(api)
277
278     @staticmethod
279     def _process_json(api):  # -> Tuple[Dict, Dict]
280         types = {}
281         services = {}
282         messages = {}
283         try:
284             for t in api['enums']:
285                 t[0] = 'vl_api_' + t[0] + '_t'
286                 types[t[0]] = {'type': 'enum', 'data': t}
287         except KeyError:
288             pass
289         try:
290             for t in api['enumflags']:
291                 t[0] = 'vl_api_' + t[0] + '_t'
292                 types[t[0]] = {'type': 'enum', 'data': t}
293         except KeyError:
294             pass
295         try:
296             for t in api['unions']:
297                 t[0] = 'vl_api_' + t[0] + '_t'
298                 types[t[0]] = {'type': 'union', 'data': t}
299         except KeyError:
300             pass
301
302         try:
303             for t in api['types']:
304                 t[0] = 'vl_api_' + t[0] + '_t'
305                 types[t[0]] = {'type': 'type', 'data': t}
306         except KeyError:
307             pass
308
309         try:
310             for t, v in api['aliases'].items():
311                 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
312         except KeyError:
313             pass
314
315         try:
316             services.update(api['services'])
317         except KeyError:
318             pass
319
320         i = 0
321         while True:
322             unresolved = {}
323             for k, v in types.items():
324                 t = v['data']
325                 if not vpp_get_type(k):
326                     if v['type'] == 'enum':
327                         try:
328                             VPPEnumType(t[0], t[1:])
329                         except ValueError:
330                             unresolved[k] = v
331                 if not vpp_get_type(k):
332                     if v['type'] == 'enumflag':
333                         try:
334                             VPPEnumFlagType(t[0], t[1:])
335                         except ValueError:
336                             unresolved[k] = v
337                     elif v['type'] == 'union':
338                         try:
339                             VPPUnionType(t[0], t[1:])
340                         except ValueError:
341                             unresolved[k] = v
342                     elif v['type'] == 'type':
343                         try:
344                             VPPType(t[0], t[1:])
345                         except ValueError:
346                             unresolved[k] = v
347                     elif v['type'] == 'alias':
348                         try:
349                             VPPTypeAlias(k, t)
350                         except ValueError:
351                             unresolved[k] = v
352             if len(unresolved) == 0:
353                 break
354             if i > 3:
355                 raise VPPValueError('Unresolved type definitions {}'
356                                     .format(unresolved))
357             types = unresolved
358             i += 1
359         try:
360             for m in api['messages']:
361                 try:
362                     messages[m[0]] = VPPMessage(m[0], m[1:])
363                 except VPPNotImplementedError:
364                     ### OLE FIXME
365                     logger.error('Not implemented error for {}'.format(m[0]))
366         except KeyError:
367             pass
368         return messages, services
369
370
371 class VPPApiClient:
372     """VPP interface.
373
374     This class provides the APIs to VPP.  The APIs are loaded
375     from provided .api.json files and makes functions accordingly.
376     These functions are documented in the VPP .api files, as they
377     are dynamically created.
378
379     Additionally, VPP can send callback messages; this class
380     provides a means to register a callback function to receive
381     these messages in a background thread.
382     """
383     apidir = None
384     VPPApiError = VPPApiError
385     VPPRuntimeError = VPPRuntimeError
386     VPPValueError = VPPValueError
387     VPPNotImplementedError = VPPNotImplementedError
388     VPPIOError = VPPIOError
389
390
391     def __init__(self, *, apifiles=None, testmode=False, async_thread=True,
392                  logger=None, loglevel=None,
393                  read_timeout=5, use_socket=True,
394                  server_address='/run/vpp/api.sock'):
395         """Create a VPP API object.
396
397         apifiles is a list of files containing API
398         descriptions that will be loaded - methods will be
399         dynamically created reflecting these APIs.  If not
400         provided this will load the API files from VPP's
401         default install location.
402
403         logger, if supplied, is the logging logger object to log to.
404         loglevel, if supplied, is the log level this logger is set
405         to report at (from the loglevels in the logging module).
406         """
407         if logger is None:
408             logger = logging.getLogger(
409                 "{}.{}".format(__name__, self.__class__.__name__))
410             if loglevel is not None:
411                 logger.setLevel(loglevel)
412         self.logger = logger
413
414         self.messages = {}
415         self.services = {}
416         self.id_names = []
417         self.id_msgdef = []
418         self.header = VPPType('header', [['u16', 'msgid'],
419                                          ['u32', 'client_index']])
420         self.apifiles = []
421         self.event_callback = None
422         self.message_queue = queue.Queue()
423         self.read_timeout = read_timeout
424         self.async_thread = async_thread
425         self.event_thread = None
426         self.testmode = testmode
427         self.server_address = server_address
428         self._apifiles = apifiles
429         self.stats = {}
430
431         if not apifiles:
432             # Pick up API definitions from default directory
433             try:
434                 apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
435             except (RuntimeError, VPPApiError):
436                 # In test mode we don't care that we can't find the API files
437                 if testmode:
438                     apifiles = []
439                 else:
440                     raise VPPRuntimeError
441
442         for file in apifiles:
443             with open(file) as apidef_file:
444                 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
445                 self.messages.update(m)
446                 self.services.update(s)
447
448         self.apifiles = apifiles
449
450         # Basic sanity check
451         if len(self.messages) == 0 and not testmode:
452             raise VPPValueError(1, 'Missing JSON message definitions')
453         if not(verify_enum_hint(VppEnum.vl_api_address_family_t)):
454             raise VPPRuntimeError("Invalid address family hints. "
455                                   "Cannot continue.")
456
457         self.transport = VppTransport(self, read_timeout=read_timeout,
458                                       server_address=server_address)
459         # Make sure we allow VPP to clean up the message rings.
460         atexit.register(vpp_atexit, weakref.ref(self))
461
462         add_convenience_methods()
463
464     def get_function(self, name):
465         return getattr(self._api, name)
466
467     class ContextId:
468         """Multiprocessing-safe provider of unique context IDs."""
469         def __init__(self):
470             self.context = mp.Value(ctypes.c_uint, 0)
471             self.lock = mp.Lock()
472
473         def __call__(self):
474             """Get a new unique (or, at least, not recently used) context."""
475             with self.lock:
476                 self.context.value += 1
477                 return self.context.value
478     get_context = ContextId()
479
480     def get_type(self, name):
481         return vpp_get_type(name)
482
483     @property
484     def api(self):
485         if not hasattr(self, "_api"):
486             raise VPPApiError("Not connected, api definitions not available")
487         return self._api
488
489     def make_function(self, msg, i, multipart, do_async):
490         if (do_async):
491             def f(**kwargs):
492                 return self._call_vpp_async(i, msg, **kwargs)
493         else:
494             def f(**kwargs):
495                 return self._call_vpp(i, msg, multipart, **kwargs)
496
497         f.__name__ = str(msg.name)
498         f.__doc__ = ", ".join(["%s %s" %
499                                (msg.fieldtypes[j], k)
500                                for j, k in enumerate(msg.fields)])
501         f.msg = msg
502
503         return f
504
505     def _register_functions(self, do_async=False):
506         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
507         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
508         self._api = VppApiDynamicMethodHolder()
509         for name, msg in self.messages.items():
510             n = name + '_' + msg.crc[2:]
511             i = self.transport.get_msg_index(n)
512             if i > 0:
513                 self.id_msgdef[i] = msg
514                 self.id_names[i] = name
515
516                 # Create function for client side messages.
517                 if name in self.services:
518                     f = self.make_function(msg, i, self.services[name], do_async)
519                     setattr(self._api, name, FuncWrapper(f))
520             else:
521                 self.logger.debug(
522                     'No such message type or failed CRC checksum: %s', n)
523
524     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
525                          do_async):
526         pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
527
528         rv = self.transport.connect(name, pfx,
529                                     msg_handler, rx_qlen)
530         if rv != 0:
531             raise VPPIOError(2, 'Connect failed')
532         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
533         self._register_functions(do_async=do_async)
534
535         # Initialise control ping
536         crc = self.messages['control_ping'].crc
537         self.control_ping_index = self.transport.get_msg_index(
538             ('control_ping' + '_' + crc[2:]))
539         self.control_ping_msgdef = self.messages['control_ping']
540         if self.async_thread:
541             self.event_thread = threading.Thread(
542                 target=self.thread_msg_handler)
543             self.event_thread.daemon = True
544             self.event_thread.start()
545         else:
546             self.event_thread = None
547         return rv
548
549     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
550         """Attach to VPP.
551
552         name - the name of the client.
553         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
554         do_async - if true, messages are sent without waiting for a reply
555         rx_qlen - the length of the VPP message receive queue between
556         client and server.
557         """
558         msg_handler = self.transport.get_callback(do_async)
559         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
560                                      do_async)
561
562     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
563         """Attach to VPP in synchronous mode. Application must poll for events.
564
565         name - the name of the client.
566         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
567         rx_qlen - the length of the VPP message receive queue between
568         client and server.
569         """
570
571         return self.connect_internal(name, None, chroot_prefix, rx_qlen,
572                                      do_async=False)
573
574     def disconnect(self):
575         """Detach from VPP."""
576         rv = self.transport.disconnect()
577         if self.event_thread is not None:
578             self.message_queue.put("terminate event thread")
579         return rv
580
581     def msg_handler_sync(self, msg):
582         """Process an incoming message from VPP in sync mode.
583
584         The message may be a reply or it may be an async notification.
585         """
586         r = self.decode_incoming_msg(msg)
587         if r is None:
588             return
589
590         # If we have a context, then use the context to find any
591         # request waiting for a reply
592         context = 0
593         if hasattr(r, 'context') and r.context > 0:
594             context = r.context
595
596         if context == 0:
597             # No context -> async notification that we feed to the callback
598             self.message_queue.put_nowait(r)
599         else:
600             raise VPPIOError(2, 'RPC reply message received in event handler')
601
602     def has_context(self, msg):
603         if len(msg) < 10:
604             return False
605
606         header = VPPType('header_with_context', [['u16', 'msgid'],
607                                                  ['u32', 'client_index'],
608                                                  ['u32', 'context']])
609
610         (i, ci, context), size = header.unpack(msg, 0)
611         if self.id_names[i] == 'rx_thread_exit':
612             return
613
614         #
615         # Decode message and returns a tuple.
616         #
617         msgobj = self.id_msgdef[i]
618         if 'context' in msgobj.field_by_name and context >= 0:
619             return True
620         return False
621
622     def decode_incoming_msg(self, msg, no_type_conversion=False):
623         if not msg:
624             logger.warning('vpp_api.read failed')
625             return
626
627         (i, ci), size = self.header.unpack(msg, 0)
628         if self.id_names[i] == 'rx_thread_exit':
629             return
630
631         #
632         # Decode message and returns a tuple.
633         #
634         msgobj = self.id_msgdef[i]
635         if not msgobj:
636             raise VPPIOError(2, 'Reply message undefined')
637
638         r, size = msgobj.unpack(msg, ntc=no_type_conversion)
639         return r
640
641     def msg_handler_async(self, msg):
642         """Process a message from VPP in async mode.
643
644         In async mode, all messages are returned to the callback.
645         """
646         r = self.decode_incoming_msg(msg)
647         if r is None:
648             return
649
650         msgname = type(r).__name__
651
652         if self.event_callback:
653             self.event_callback(msgname, r)
654
655     def _control_ping(self, context):
656         """Send a ping command."""
657         self._call_vpp_async(self.control_ping_index,
658                              self.control_ping_msgdef,
659                              context=context)
660
661     def validate_args(self, msg, kwargs):
662         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
663         if d:
664             raise VPPValueError('Invalid argument {} to {}'
665                                 .format(list(d), msg.name))
666
667     def _add_stat(self, name, ms):
668         if not name in self.stats:
669             self.stats[name] = {'max': ms, 'count': 1, 'avg': ms}
670         else:
671             if ms > self.stats[name]['max']:
672                 self.stats[name]['max'] = ms
673             self.stats[name]['count'] += 1
674             n = self.stats[name]['count']
675             self.stats[name]['avg'] = self.stats[name]['avg'] * (n - 1) / n + ms / n
676
677     def get_stats(self):
678         s = '\n=== API PAPI STATISTICS ===\n'
679         s += '{:<30} {:>4} {:>6} {:>6}\n'.format('message', 'cnt', 'avg', 'max')
680         for n in sorted(self.stats.items(), key=lambda v: v[1]['avg'], reverse=True):
681             s += '{:<30} {:>4} {:>6.2f} {:>6.2f}\n'.format(n[0], n[1]['count'],
682                                                            n[1]['avg'], n[1]['max'])
683         return s
684
685     def get_field_options(self, msg, fld_name):
686         # when there is an option, the msgdef has 3 elements.
687         # ['u32', 'ring_size', {'default': 1024}]
688         for _def in self.messages[msg].msgdef:
689             if isinstance(_def, list) and \
690                     len(_def) == 3 and \
691                     _def[1] == fld_name:
692                 return _def[2]
693
694     def _call_vpp(self, i, msgdef, service, **kwargs):
695         """Given a message, send the message and await a reply.
696
697         msgdef - the message packing definition
698         i - the message type index
699         multipart - True if the message returns multiple
700         messages in return.
701         context - context number - chosen at random if not
702         supplied.
703         The remainder of the kwargs are the arguments to the API call.
704
705         The return value is the message or message array containing
706         the response.  It will raise an IOError exception if there was
707         no response within the timeout window.
708         """
709         ts = time.time()
710         if 'context' not in kwargs:
711             context = self.get_context()
712             kwargs['context'] = context
713         else:
714             context = kwargs['context']
715         kwargs['_vl_msg_id'] = i
716
717         no_type_conversion = kwargs.pop('_no_type_conversion', False)
718         timeout = kwargs.pop('_timeout', None)
719
720         try:
721             if self.transport.socket_index:
722                 kwargs['client_index'] = self.transport.socket_index
723         except AttributeError:
724             pass
725         self.validate_args(msgdef, kwargs)
726
727         s = 'Calling {}({})'.format(msgdef.name,
728             ','.join(['{!r}:{!r}'.format(k, v) for k, v in kwargs.items()]))
729         self.logger.debug(s)
730
731         b = msgdef.pack(kwargs)
732         self.transport.suspend()
733
734         self.transport.write(b)
735
736         msgreply = service['reply']
737         stream = True if 'stream' in service else False
738         if stream:
739             if 'stream_msg' in service:
740                 # New service['reply'] = _reply and service['stream_message'] = _details
741                 stream_message = service['stream_msg']
742                 modern =True
743             else:
744                 # Old  service['reply'] = _details
745                 stream_message = msgreply
746                 msgreply = 'control_ping_reply'
747                 modern = False
748                 # Send a ping after the request - we use its response
749                 # to detect that we have seen all results.
750                 self._control_ping(context)
751
752         # Block until we get a reply.
753         rl = []
754         while (True):
755             r = self.read_blocking(no_type_conversion, timeout)
756             if r is None:
757                 raise VPPIOError(2, 'VPP API client: read failed')
758             msgname = type(r).__name__
759             if context not in r or r.context == 0 or context != r.context:
760                 # Message being queued
761                 self.message_queue.put_nowait(r)
762                 continue
763             if msgname != msgreply and (stream and (msgname != stream_message)):
764                 print('REPLY MISMATCH', msgreply, msgname, stream_message, stream)
765             if not stream:
766                 rl = r
767                 break
768             if msgname == msgreply:
769                 if modern: # Return both reply and list
770                     rl = r, rl
771                 break
772
773             rl.append(r)
774
775         self.transport.resume()
776
777         s = 'Return value: {!r}'.format(r)
778         if len(s) > 80:
779             s = s[:80] + "..."
780         self.logger.debug(s)
781         te = time.time()
782         self._add_stat(msgdef.name, (te - ts) * 1000)
783         return rl
784
785     def _call_vpp_async(self, i, msg, **kwargs):
786         """Given a message, send the message and return the context.
787
788         msgdef - the message packing definition
789         i - the message type index
790         context - context number - chosen at random if not
791         supplied.
792         The remainder of the kwargs are the arguments to the API call.
793
794         The reply message(s) will be delivered later to the registered callback.
795         The returned context will help with assigning which call
796         the reply belongs to.
797         """
798         if 'context' not in kwargs:
799             context = self.get_context()
800             kwargs['context'] = context
801         else:
802             context = kwargs['context']
803         try:
804             if self.transport.socket_index:
805                 kwargs['client_index'] = self.transport.socket_index
806         except AttributeError:
807             kwargs['client_index'] = 0
808         kwargs['_vl_msg_id'] = i
809         b = msg.pack(kwargs)
810
811         self.transport.write(b)
812         return context
813
814     def read_blocking(self, no_type_conversion=False, timeout=None):
815         """Get next received message from transport within timeout, decoded.
816
817         Note that notifications have context zero
818         and are not put into receive queue (at least for socket transport),
819         use async_thread with registered callback for processing them.
820
821         If no message appears in the queue within timeout, return None.
822
823         Optionally, type conversion can be skipped,
824         as some of conversions are into less precise types.
825
826         When r is the return value of this, the caller can get message name as:
827             msgname = type(r).__name__
828         and context number (type long) as:
829             context = r.context
830
831         :param no_type_conversion: If false, type conversions are applied.
832         :type no_type_conversion: bool
833         :returns: Decoded message, or None if no message (within timeout).
834         :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
835         :raises VppTransportShmemIOError if timed out.
836         """
837         msg = self.transport.read(timeout=timeout)
838         if not msg:
839             return None
840         return self.decode_incoming_msg(msg, no_type_conversion)
841
842     def register_event_callback(self, callback):
843         """Register a callback for async messages.
844
845         This will be called for async notifications in sync mode,
846         and all messages in async mode.  In sync mode, replies to
847         requests will not come here.
848
849         callback is a fn(msg_type_name, msg_type) that will be
850         called when a message comes in.  While this function is
851         executing, note that (a) you are in a background thread and
852         may wish to use threading.Lock to protect your datastructures,
853         and (b) message processing from VPP will stop (so if you take
854         a long while about it you may provoke reply timeouts or cause
855         VPP to fill the RX buffer).  Passing None will disable the
856         callback.
857         """
858         self.event_callback = callback
859
860     def thread_msg_handler(self):
861         """Python thread calling the user registered message handler.
862
863         This is to emulate the old style event callback scheme. Modern
864         clients should provide their own thread to poll the event
865         queue.
866         """
867         while True:
868             r = self.message_queue.get()
869             if r == "terminate event thread":
870                 break
871             msgname = type(r).__name__
872             if self.event_callback:
873                 self.event_callback(msgname, r)
874
875     def validate_message_table(self, namecrctable):
876         """Take a dictionary of name_crc message names
877         and returns an array of missing messages"""
878
879         missing_table = []
880         for name_crc in namecrctable:
881             i = self.transport.get_msg_index(name_crc)
882             if i <= 0:
883                 missing_table.append(name_crc)
884         return missing_table
885
886     def dump_message_table(self):
887         """Return VPPs API message table as name_crc dictionary"""
888         return self.transport.message_table
889
890     def dump_message_table_filtered(self, msglist):
891         """Return VPPs API message table as name_crc dictionary,
892         filtered by message name list."""
893
894         replies = [self.services[n]['reply'] for n in msglist]
895         message_table_filtered = {}
896         for name in msglist + replies:
897             for k,v in self.transport.message_table.items():
898                 if k.startswith(name):
899                     message_table_filtered[k] = v
900                     break
901         return message_table_filtered
902
903     def __repr__(self):
904         return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
905                "logger=%s, read_timeout=%s, " \
906                "server_address='%s'>" % (
907                    self._apifiles, self.testmode, self.async_thread,
908                    self.logger, self.read_timeout, self.server_address)
909
910     def details_iter(self, f, **kwargs):
911         cursor = 0
912         while True:
913             kwargs['cursor'] = cursor
914             rv, details = f(**kwargs)
915             for d in details:
916                 yield d
917             if rv.retval == 0 or rv.retval != -165:
918                 break
919             cursor = rv.cursor