5c375df199d85b5eedfed874962ee09f18fdcfd2
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import ctypes
20 import ipaddress
21 import sys
22 import multiprocessing as mp
23 import os
24 import logging
25 import functools
26 import json
27 import threading
28 import fnmatch
29 import weakref
30 import atexit
31 import time
32 from . vpp_format import verify_enum_hint
33 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType
34 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
35
36 try:
37     import VppTransport
38 except ModuleNotFoundError:
39     class V:
40         """placeholder for VppTransport as the implementation is dependent on
41         VPPAPIClient's initialization values
42         """
43
44     VppTransport = V
45
46 logger = logging.getLogger('vpp_papi')
47 logger.addHandler(logging.NullHandler())
48
49 if sys.version[0] == '2':
50     import Queue as queue
51 else:
52     import queue as queue
53
54 __all__ = ('FuncWrapper', 'VPP', 'VppApiDynamicMethodHolder',
55            'VppEnum', 'VppEnumType',
56            'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
57            'VPPApiClient', )
58
59
60 def metaclass(metaclass):
61     @functools.wraps(metaclass)
62     def wrapper(cls):
63         return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
64
65     return wrapper
66
67
68 class VppEnumType(type):
69     def __getattr__(cls, name):
70         t = vpp_get_type(name)
71         return t.enum
72
73
74 @metaclass(VppEnumType)
75 class VppEnum(object):
76     pass
77
78
79 def vpp_atexit(vpp_weakref):
80     """Clean up VPP connection on shutdown."""
81     vpp_instance = vpp_weakref()
82     if vpp_instance and vpp_instance.transport.connected:
83         logger.debug('Cleaning up VPP on exit')
84         vpp_instance.disconnect()
85
86
87 if sys.version[0] == '2':
88     def vpp_iterator(d):
89         return d.iteritems()
90 else:
91     def vpp_iterator(d):
92         return d.items()
93
94
95 def add_convenience_methods():
96     # provide convenience methods to IP[46]Address.vapi_af
97     def _vapi_af(self):
98         if 6 == self._version:
99             return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
100         if 4 == self._version:
101             return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
102         raise ValueError("Invalid _version.")
103
104     def _vapi_af_name(self):
105         if 6 == self._version:
106             return 'ip6'
107         if 4 == self._version:
108             return 'ip4'
109         raise ValueError("Invalid _version.")
110
111     ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
112     ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
113
114
115 class VppApiDynamicMethodHolder(object):
116     pass
117
118
119 class FuncWrapper(object):
120     def __init__(self, func):
121         self._func = func
122         self.__name__ = func.__name__
123         self.__doc__ = func.__doc__
124
125     def __call__(self, **kwargs):
126         return self._func(**kwargs)
127
128     def __repr__(self):
129         return '<FuncWrapper(func=<%s(%s)>)>' % (self.__name__, self.__doc__)
130
131
132 class VPPApiError(Exception):
133     pass
134
135
136 class VPPNotImplementedError(NotImplementedError):
137     pass
138
139
140 class VPPIOError(IOError):
141     pass
142
143
144 class VPPRuntimeError(RuntimeError):
145     pass
146
147
148 class VPPValueError(ValueError):
149     pass
150
151
152 class VPPApiJSONFiles(object):
153     @classmethod
154     def find_api_dir(cls, dirs):
155         """Attempt to find the best directory in which API definition
156         files may reside. If the value VPP_API_DIR exists in the environment
157         then it is first on the search list. If we're inside a recognized
158         location in a VPP source tree (src/scripts and src/vpp-api/python)
159         then entries from there to the likely locations in build-root are
160         added. Finally the location used by system packages is added.
161
162         :returns: A single directory name, or None if no such directory
163             could be found.
164         """
165
166         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
167         # in which case, plot a course to likely places in the src tree
168         import __main__ as main
169         if hasattr(main, '__file__'):
170             # get the path of the calling script
171             localdir = os.path.dirname(os.path.realpath(main.__file__))
172         else:
173             # use cwd if there is no calling script
174             localdir = os.getcwd()
175         localdir_s = localdir.split(os.path.sep)
176
177         def dmatch(dir):
178             """Match dir against right-hand components of the script dir"""
179             d = dir.split('/')  # param 'dir' assumes a / separator
180             length = len(d)
181             return len(localdir_s) > length and localdir_s[-length:] == d
182
183         def sdir(srcdir, variant):
184             """Build a path from srcdir to the staged API files of
185             'variant'  (typically '' or '_debug')"""
186             # Since 'core' and 'plugin' files are staged
187             # in separate directories, we target the parent dir.
188             return os.path.sep.join((
189                 srcdir,
190                 'build-root',
191                 'install-vpp%s-native' % variant,
192                 'vpp',
193                 'share',
194                 'vpp',
195                 'api',
196             ))
197
198         srcdir = None
199         if dmatch('src/scripts'):
200             srcdir = os.path.sep.join(localdir_s[:-2])
201         elif dmatch('src/vpp-api/python'):
202             srcdir = os.path.sep.join(localdir_s[:-3])
203         elif dmatch('test'):
204             # we're apparently running tests
205             srcdir = os.path.sep.join(localdir_s[:-1])
206
207         if srcdir:
208             # we're in the source tree, try both the debug and release
209             # variants.
210             dirs.append(sdir(srcdir, '_debug'))
211             dirs.append(sdir(srcdir, ''))
212
213         # Test for staged copies of the scripts
214         # For these, since we explicitly know if we're running a debug versus
215         # release variant, target only the relevant directory
216         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
217             srcdir = os.path.sep.join(localdir_s[:-4])
218             dirs.append(sdir(srcdir, '_debug'))
219         if dmatch('build-root/install-vpp-native/vpp/bin'):
220             srcdir = os.path.sep.join(localdir_s[:-4])
221             dirs.append(sdir(srcdir, ''))
222
223         # finally, try the location system packages typically install into
224         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
225
226         # check the directories for existence; first one wins
227         for dir in dirs:
228             if os.path.isdir(dir):
229                 return dir
230
231         return None
232
233     @classmethod
234     def find_api_files(cls, api_dir=None, patterns='*'):  # -> list
235         """Find API definition files from the given directory tree with the
236         given pattern. If no directory is given then find_api_dir() is used
237         to locate one. If no pattern is given then all definition files found
238         in the directory tree are used.
239
240         :param api_dir: A directory tree in which to locate API definition
241             files; subdirectories are descended into.
242             If this is None then find_api_dir() is called to discover it.
243         :param patterns: A list of patterns to use in each visited directory
244             when looking for files.
245             This can be a list/tuple object or a comma-separated string of
246             patterns. Each value in the list will have leading/trialing
247             whitespace stripped.
248             The pattern specifies the first part of the filename, '.api.json'
249             is appended.
250             The results are de-duplicated, thus overlapping patterns are fine.
251             If this is None it defaults to '*' meaning "all API files".
252         :returns: A list of file paths for the API files found.
253         """
254         if api_dir is None:
255             api_dir = cls.find_api_dir([])
256             if api_dir is None:
257                 raise VPPApiError("api_dir cannot be located")
258
259         if isinstance(patterns, list) or isinstance(patterns, tuple):
260             patterns = [p.strip() + '.api.json' for p in patterns]
261         else:
262             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
263
264         api_files = []
265         for root, dirnames, files in os.walk(api_dir):
266             # iterate all given patterns and de-dup the result
267             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
268             for filename in files:
269                 api_files.append(os.path.join(root, filename))
270
271         return api_files
272
273     @classmethod
274     def process_json_file(self, apidef_file):
275         api = json.load(apidef_file)
276         return self._process_json(api)
277
278     @classmethod
279     def process_json_str(self, json_str):
280         api = json.loads(json_str)
281         return self._process_json(api)
282
283     @staticmethod
284     def _process_json(api):  # -> Tuple[Dict, Dict]
285         types = {}
286         services = {}
287         messages = {}
288         try:
289             for t in api['enums']:
290                 t[0] = 'vl_api_' + t[0] + '_t'
291                 types[t[0]] = {'type': 'enum', 'data': t}
292         except KeyError:
293             pass
294
295         try:
296             for t in api['unions']:
297                 t[0] = 'vl_api_' + t[0] + '_t'
298                 types[t[0]] = {'type': 'union', 'data': t}
299         except KeyError:
300             pass
301
302         try:
303             for t in api['types']:
304                 t[0] = 'vl_api_' + t[0] + '_t'
305                 types[t[0]] = {'type': 'type', 'data': t}
306         except KeyError:
307             pass
308
309         try:
310             for t, v in api['aliases'].items():
311                 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
312         except KeyError:
313             pass
314
315         try:
316             services.update(api['services'])
317         except KeyError:
318             pass
319
320         i = 0
321         while True:
322             unresolved = {}
323             for k, v in types.items():
324                 t = v['data']
325                 if not vpp_get_type(k):
326                     if v['type'] == 'enum':
327                         try:
328                             VPPEnumType(t[0], t[1:])
329                         except ValueError:
330                             unresolved[k] = v
331                     elif v['type'] == 'union':
332                         try:
333                             VPPUnionType(t[0], t[1:])
334                         except ValueError:
335                             unresolved[k] = v
336                     elif v['type'] == 'type':
337                         try:
338                             VPPType(t[0], t[1:])
339                         except ValueError:
340                             unresolved[k] = v
341                     elif v['type'] == 'alias':
342                         try:
343                             VPPTypeAlias(k, t)
344                         except ValueError:
345                             unresolved[k] = v
346             if len(unresolved) == 0:
347                 break
348             if i > 3:
349                 raise VPPValueError('Unresolved type definitions {}'
350                                     .format(unresolved))
351             types = unresolved
352             i += 1
353         try:
354             for m in api['messages']:
355                 try:
356                     messages[m[0]] = VPPMessage(m[0], m[1:])
357                 except VPPNotImplementedError:
358                     ### OLE FIXME
359                     logger.error('Not implemented error for {}'.format(m[0]))
360         except KeyError:
361             pass
362         return messages, services
363
364
365 class VPPApiClient(object):
366     """VPP interface.
367
368     This class provides the APIs to VPP.  The APIs are loaded
369     from provided .api.json files and makes functions accordingly.
370     These functions are documented in the VPP .api files, as they
371     are dynamically created.
372
373     Additionally, VPP can send callback messages; this class
374     provides a means to register a callback function to receive
375     these messages in a background thread.
376     """
377     apidir = None
378     VPPApiError = VPPApiError
379     VPPRuntimeError = VPPRuntimeError
380     VPPValueError = VPPValueError
381     VPPNotImplementedError = VPPNotImplementedError
382     VPPIOError = VPPIOError
383
384
385     def __init__(self, apifiles=None, testmode=False, async_thread=True,
386                  logger=None, loglevel=None,
387                  read_timeout=5, use_socket=False,
388                  server_address='/run/vpp/api.sock'):
389         """Create a VPP API object.
390
391         apifiles is a list of files containing API
392         descriptions that will be loaded - methods will be
393         dynamically created reflecting these APIs.  If not
394         provided this will load the API files from VPP's
395         default install location.
396
397         logger, if supplied, is the logging logger object to log to.
398         loglevel, if supplied, is the log level this logger is set
399         to report at (from the loglevels in the logging module).
400         """
401         if logger is None:
402             logger = logging.getLogger(
403                 "{}.{}".format(__name__, self.__class__.__name__))
404             if loglevel is not None:
405                 logger.setLevel(loglevel)
406         self.logger = logger
407
408         self.messages = {}
409         self.services = {}
410         self.id_names = []
411         self.id_msgdef = []
412         self.header = VPPType('header', [['u16', 'msgid'],
413                                          ['u32', 'client_index']])
414         self.apifiles = []
415         self.event_callback = None
416         self.message_queue = queue.Queue()
417         self.read_timeout = read_timeout
418         self.async_thread = async_thread
419         self.event_thread = None
420         self.testmode = testmode
421         self.use_socket = use_socket
422         self.server_address = server_address
423         self._apifiles = apifiles
424         self.stats = {}
425
426         if use_socket:
427             from . vpp_transport_socket import VppTransport
428         else:
429             from . vpp_transport_shmem import VppTransport
430
431         if not apifiles:
432             # Pick up API definitions from default directory
433             try:
434                 apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
435             except (RuntimeError, VPPApiError):
436                 # In test mode we don't care that we can't find the API files
437                 if testmode:
438                     apifiles = []
439                 else:
440                     raise VPPRuntimeError
441
442         for file in apifiles:
443             with open(file) as apidef_file:
444                 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
445                 self.messages.update(m)
446                 self.services.update(s)
447
448         self.apifiles = apifiles
449
450         # Basic sanity check
451         if len(self.messages) == 0 and not testmode:
452             raise VPPValueError(1, 'Missing JSON message definitions')
453         if not(verify_enum_hint(VppEnum.vl_api_address_family_t)):
454             raise VPPRuntimeError("Invalid address family hints. "
455                                   "Cannot continue.")
456
457         self.transport = VppTransport(self, read_timeout=read_timeout,
458                                       server_address=server_address)
459         # Make sure we allow VPP to clean up the message rings.
460         atexit.register(vpp_atexit, weakref.ref(self))
461
462         add_convenience_methods()
463
464     def get_function(self, name):
465         return getattr(self._api, name)
466
467     class ContextId(object):
468         """Multiprocessing-safe provider of unique context IDs."""
469         def __init__(self):
470             self.context = mp.Value(ctypes.c_uint, 0)
471             self.lock = mp.Lock()
472
473         def __call__(self):
474             """Get a new unique (or, at least, not recently used) context."""
475             with self.lock:
476                 self.context.value += 1
477                 return self.context.value
478     get_context = ContextId()
479
480     def get_type(self, name):
481         return vpp_get_type(name)
482
483     @property
484     def api(self):
485         if not hasattr(self, "_api"):
486             raise VPPApiError("Not connected, api definitions not available")
487         return self._api
488
489     def make_function(self, msg, i, multipart, do_async):
490         if (do_async):
491             def f(**kwargs):
492                 return self._call_vpp_async(i, msg, **kwargs)
493         else:
494             def f(**kwargs):
495                 return self._call_vpp(i, msg, multipart, **kwargs)
496
497         f.__name__ = str(msg.name)
498         f.__doc__ = ", ".join(["%s %s" %
499                                (msg.fieldtypes[j], k)
500                                for j, k in enumerate(msg.fields)])
501         f.msg = msg
502
503         return f
504
505     def _register_functions(self, do_async=False):
506         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
507         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
508         self._api = VppApiDynamicMethodHolder()
509         for name, msg in vpp_iterator(self.messages):
510             n = name + '_' + msg.crc[2:]
511             i = self.transport.get_msg_index(n)
512             if i > 0:
513                 self.id_msgdef[i] = msg
514                 self.id_names[i] = name
515
516                 # Create function for client side messages.
517                 if name in self.services:
518                     f = self.make_function(msg, i, self.services[name], do_async)
519                     setattr(self._api, name, FuncWrapper(f))
520             else:
521                 self.logger.debug(
522                     'No such message type or failed CRC checksum: %s', n)
523
524     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
525                          do_async):
526         pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
527
528         rv = self.transport.connect(name, pfx,
529                                     msg_handler, rx_qlen)
530         if rv != 0:
531             raise VPPIOError(2, 'Connect failed')
532         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
533         self._register_functions(do_async=do_async)
534
535         # Initialise control ping
536         crc = self.messages['control_ping'].crc
537         self.control_ping_index = self.transport.get_msg_index(
538             ('control_ping' + '_' + crc[2:]))
539         self.control_ping_msgdef = self.messages['control_ping']
540         if self.async_thread:
541             self.event_thread = threading.Thread(
542                 target=self.thread_msg_handler)
543             self.event_thread.daemon = True
544             self.event_thread.start()
545         else:
546             self.event_thread = None
547         return rv
548
549     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
550         """Attach to VPP.
551
552         name - the name of the client.
553         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
554         do_async - if true, messages are sent without waiting for a reply
555         rx_qlen - the length of the VPP message receive queue between
556         client and server.
557         """
558         msg_handler = self.transport.get_callback(do_async)
559         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
560                                      do_async)
561
562     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
563         """Attach to VPP in synchronous mode. Application must poll for events.
564
565         name - the name of the client.
566         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
567         rx_qlen - the length of the VPP message receive queue between
568         client and server.
569         """
570
571         return self.connect_internal(name, None, chroot_prefix, rx_qlen,
572                                      do_async=False)
573
574     def disconnect(self):
575         """Detach from VPP."""
576         rv = self.transport.disconnect()
577         if self.event_thread is not None:
578             self.message_queue.put("terminate event thread")
579         return rv
580
581     def msg_handler_sync(self, msg):
582         """Process an incoming message from VPP in sync mode.
583
584         The message may be a reply or it may be an async notification.
585         """
586         r = self.decode_incoming_msg(msg)
587         if r is None:
588             return
589
590         # If we have a context, then use the context to find any
591         # request waiting for a reply
592         context = 0
593         if hasattr(r, 'context') and r.context > 0:
594             context = r.context
595
596         if context == 0:
597             # No context -> async notification that we feed to the callback
598             self.message_queue.put_nowait(r)
599         else:
600             raise VPPIOError(2, 'RPC reply message received in event handler')
601
602     def has_context(self, msg):
603         if len(msg) < 10:
604             return False
605
606         header = VPPType('header_with_context', [['u16', 'msgid'],
607                                                  ['u32', 'client_index'],
608                                                  ['u32', 'context']])
609
610         (i, ci, context), size = header.unpack(msg, 0)
611         if self.id_names[i] == 'rx_thread_exit':
612             return
613
614         #
615         # Decode message and returns a tuple.
616         #
617         msgobj = self.id_msgdef[i]
618         if 'context' in msgobj.field_by_name and context >= 0:
619             return True
620         return False
621
622     def decode_incoming_msg(self, msg, no_type_conversion=False):
623         if not msg:
624             logger.warning('vpp_api.read failed')
625             return
626
627         (i, ci), size = self.header.unpack(msg, 0)
628         if self.id_names[i] == 'rx_thread_exit':
629             return
630
631         #
632         # Decode message and returns a tuple.
633         #
634         msgobj = self.id_msgdef[i]
635         if not msgobj:
636             raise VPPIOError(2, 'Reply message undefined')
637
638         r, size = msgobj.unpack(msg, ntc=no_type_conversion)
639         return r
640
641     def msg_handler_async(self, msg):
642         """Process a message from VPP in async mode.
643
644         In async mode, all messages are returned to the callback.
645         """
646         r = self.decode_incoming_msg(msg)
647         if r is None:
648             return
649
650         msgname = type(r).__name__
651
652         if self.event_callback:
653             self.event_callback(msgname, r)
654
655     def _control_ping(self, context):
656         """Send a ping command."""
657         self._call_vpp_async(self.control_ping_index,
658                              self.control_ping_msgdef,
659                              context=context)
660
661     def validate_args(self, msg, kwargs):
662         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
663         if d:
664             raise VPPValueError('Invalid argument {} to {}'
665                                 .format(list(d), msg.name))
666
667     def _add_stat(self, name, ms):
668         if not name in self.stats:
669             self.stats[name] = {'max': ms, 'count': 1, 'avg': ms}
670         else:
671             if ms > self.stats[name]['max']:
672                 self.stats[name]['max'] = ms
673             self.stats[name]['count'] += 1
674             n = self.stats[name]['count']
675             self.stats[name]['avg'] = self.stats[name]['avg'] * (n - 1) / n + ms / n
676
677     def get_stats(self):
678         s = '\n=== API PAPI STATISTICS ===\n'
679         s += '{:<30} {:>4} {:>6} {:>6}\n'.format('message', 'cnt', 'avg', 'max')
680         for n in sorted(self.stats.items(), key=lambda v: v[1]['avg'], reverse=True):
681             s += '{:<30} {:>4} {:>6.2f} {:>6.2f}\n'.format(n[0], n[1]['count'],
682                                                            n[1]['avg'], n[1]['max'])
683         return s
684
685     def _call_vpp(self, i, msgdef, service, **kwargs):
686         """Given a message, send the message and await a reply.
687
688         msgdef - the message packing definition
689         i - the message type index
690         multipart - True if the message returns multiple
691         messages in return.
692         context - context number - chosen at random if not
693         supplied.
694         The remainder of the kwargs are the arguments to the API call.
695
696         The return value is the message or message array containing
697         the response.  It will raise an IOError exception if there was
698         no response within the timeout window.
699         """
700         ts = time.time()
701         if 'context' not in kwargs:
702             context = self.get_context()
703             kwargs['context'] = context
704         else:
705             context = kwargs['context']
706         kwargs['_vl_msg_id'] = i
707
708         no_type_conversion = kwargs.pop('_no_type_conversion', False)
709         timeout = kwargs.pop('_timeout', None)
710
711         try:
712             if self.transport.socket_index:
713                 kwargs['client_index'] = self.transport.socket_index
714         except AttributeError:
715             pass
716         self.validate_args(msgdef, kwargs)
717
718         s = 'Calling {}({})'.format(msgdef.name,
719             ','.join(['{!r}:{!r}'.format(k, v) for k, v in kwargs.items()]))
720         self.logger.debug(s)
721
722         b = msgdef.pack(kwargs)
723         self.transport.suspend()
724
725         self.transport.write(b)
726
727         msgreply = service['reply']
728         stream = True if 'stream' in service else False
729         if stream:
730             if 'stream_msg' in service:
731                 # New service['reply'] = _reply and service['stream_message'] = _details
732                 stream_message = service['stream_msg']
733                 modern =True
734             else:
735                 # Old  service['reply'] = _details
736                 stream_message = msgreply
737                 msgreply = 'control_ping_reply'
738                 modern = False
739                 # Send a ping after the request - we use its response
740                 # to detect that we have seen all results.
741                 self._control_ping(context)
742
743         # Block until we get a reply.
744         rl = []
745         while (True):
746             r = self.read_blocking(no_type_conversion, timeout)
747             if r is None:
748                 raise VPPIOError(2, 'VPP API client: read failed')
749             msgname = type(r).__name__
750             if context not in r or r.context == 0 or context != r.context:
751                 # Message being queued
752                 self.message_queue.put_nowait(r)
753                 continue
754             if msgname != msgreply and (stream and (msgname != stream_message)):
755                 print('REPLY MISMATCH', msgreply, msgname, stream_message, stream)
756             if not stream:
757                 rl = r
758                 break
759             if msgname == msgreply:
760                 if modern: # Return both reply and list
761                     rl = r, rl
762                 break
763
764             rl.append(r)
765
766         self.transport.resume()
767
768         s = 'Return value: {!r}'.format(r)
769         if len(s) > 80:
770             s = s[:80] + "..."
771         self.logger.debug(s)
772         te = time.time()
773         self._add_stat(msgdef.name, (te - ts) * 1000)
774         return rl
775
776     def _call_vpp_async(self, i, msg, **kwargs):
777         """Given a message, send the message and return the context.
778
779         msgdef - the message packing definition
780         i - the message type index
781         context - context number - chosen at random if not
782         supplied.
783         The remainder of the kwargs are the arguments to the API call.
784
785         The reply message(s) will be delivered later to the registered callback.
786         The returned context will help with assigning which call
787         the reply belongs to.
788         """
789         if 'context' not in kwargs:
790             context = self.get_context()
791             kwargs['context'] = context
792         else:
793             context = kwargs['context']
794         try:
795             if self.transport.socket_index:
796                 kwargs['client_index'] = self.transport.socket_index
797         except AttributeError:
798             kwargs['client_index'] = 0
799         kwargs['_vl_msg_id'] = i
800         b = msg.pack(kwargs)
801
802         self.transport.write(b)
803         return context
804
805     def read_blocking(self, no_type_conversion=False, timeout=None):
806         """Get next received message from transport within timeout, decoded.
807
808         Note that notifications have context zero
809         and are not put into receive queue (at least for socket transport),
810         use async_thread with registered callback for processing them.
811
812         If no message appears in the queue within timeout, return None.
813
814         Optionally, type conversion can be skipped,
815         as some of conversions are into less precise types.
816
817         When r is the return value of this, the caller can get message name as:
818             msgname = type(r).__name__
819         and context number (type long) as:
820             context = r.context
821
822         :param no_type_conversion: If false, type conversions are applied.
823         :type no_type_conversion: bool
824         :returns: Decoded message, or None if no message (within timeout).
825         :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
826         :raises VppTransportShmemIOError if timed out.
827         """
828         msg = self.transport.read(timeout=timeout)
829         if not msg:
830             return None
831         return self.decode_incoming_msg(msg, no_type_conversion)
832
833     def register_event_callback(self, callback):
834         """Register a callback for async messages.
835
836         This will be called for async notifications in sync mode,
837         and all messages in async mode.  In sync mode, replies to
838         requests will not come here.
839
840         callback is a fn(msg_type_name, msg_type) that will be
841         called when a message comes in.  While this function is
842         executing, note that (a) you are in a background thread and
843         may wish to use threading.Lock to protect your datastructures,
844         and (b) message processing from VPP will stop (so if you take
845         a long while about it you may provoke reply timeouts or cause
846         VPP to fill the RX buffer).  Passing None will disable the
847         callback.
848         """
849         self.event_callback = callback
850
851     def thread_msg_handler(self):
852         """Python thread calling the user registered message handler.
853
854         This is to emulate the old style event callback scheme. Modern
855         clients should provide their own thread to poll the event
856         queue.
857         """
858         while True:
859             r = self.message_queue.get()
860             if r == "terminate event thread":
861                 break
862             msgname = type(r).__name__
863             if self.event_callback:
864                 self.event_callback(msgname, r)
865
866     def validate_message_table(self, namecrctable):
867         """Take a dictionary of name_crc message names
868         and returns an array of missing messages"""
869
870         missing_table = []
871         for name_crc in namecrctable:
872             i = self.transport.get_msg_index(name_crc)
873             if i <= 0:
874                 missing_table.append(name_crc)
875         return missing_table
876
877     def dump_message_table(self):
878         """Return VPPs API message table as name_crc dictionary"""
879         return self.transport.message_table
880
881     def dump_message_table_filtered(self, msglist):
882         """Return VPPs API message table as name_crc dictionary,
883         filtered by message name list."""
884
885         replies = [self.services[n]['reply'] for n in msglist]
886         message_table_filtered = {}
887         for name in msglist + replies:
888             for k,v in self.transport.message_table.items():
889                 if k.startswith(name):
890                     message_table_filtered[k] = v
891                     break
892         return message_table_filtered
893
894     def __repr__(self):
895         return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
896                "logger=%s, read_timeout=%s, use_socket=%s, " \
897                "server_address='%s'>" % (
898                    self._apifiles, self.testmode, self.async_thread,
899                    self.logger, self.read_timeout, self.use_socket,
900                    self.server_address)
901
902     def details_iter(self, f, **kwargs):
903         cursor = 0
904         while True:
905             kwargs['cursor'] = cursor
906             rv, details = f(**kwargs)
907             #
908             # Convert to yield from details when we only support python 3
909             #
910             for d in details:
911                 yield d
912             if rv.retval == 0 or rv.retval != -165:
913                 break
914             cursor = rv.cursor
915
916 # Provide the old name for backward compatibility.
917 VPP = VPPApiClient
918
919 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4