papi: add support for enumflag part 1 of 2
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import ctypes
20 import ipaddress
21 import sys
22 import multiprocessing as mp
23 import os
24 import queue
25 import logging
26 import functools
27 import json
28 import threading
29 import fnmatch
30 import weakref
31 import atexit
32 import time
33 from . vpp_format import verify_enum_hint
34 from . vpp_serializer import VPPType, VPPEnumType, VPPEnumFlagType, VPPUnionType
35 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
36
37 try:
38     import VppTransport
39 except ModuleNotFoundError:
40     class V:
41         """placeholder for VppTransport as the implementation is dependent on
42         VPPAPIClient's initialization values
43         """
44
45     VppTransport = V
46
47 logger = logging.getLogger('vpp_papi')
48 logger.addHandler(logging.NullHandler())
49
50 __all__ = ('FuncWrapper', 'VPP', 'VppApiDynamicMethodHolder',
51            'VppEnum', 'VppEnumType', 'VppEnumFlag',
52            'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
53            'VPPApiClient', )
54
55
56 def metaclass(metaclass):
57     @functools.wraps(metaclass)
58     def wrapper(cls):
59         return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
60
61     return wrapper
62
63
64 class VppEnumType(type):
65     def __getattr__(cls, name):
66         t = vpp_get_type(name)
67         return t.enum
68
69
70 @metaclass(VppEnumType)
71 class VppEnum:
72     pass
73
74
75 @metaclass(VppEnumType)
76 class VppEnumFlag:
77     pass
78
79
80 def vpp_atexit(vpp_weakref):
81     """Clean up VPP connection on shutdown."""
82     vpp_instance = vpp_weakref()
83     if vpp_instance and vpp_instance.transport.connected:
84         logger.debug('Cleaning up VPP on exit')
85         vpp_instance.disconnect()
86
87
88 def add_convenience_methods():
89     # provide convenience methods to IP[46]Address.vapi_af
90     def _vapi_af(self):
91         if 6 == self._version:
92             return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
93         if 4 == self._version:
94             return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
95         raise ValueError("Invalid _version.")
96
97     def _vapi_af_name(self):
98         if 6 == self._version:
99             return 'ip6'
100         if 4 == self._version:
101             return 'ip4'
102         raise ValueError("Invalid _version.")
103
104     ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
105     ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
106
107
108 class VppApiDynamicMethodHolder:
109     pass
110
111
112 class FuncWrapper:
113     def __init__(self, func):
114         self._func = func
115         self.__name__ = func.__name__
116         self.__doc__ = func.__doc__
117
118     def __call__(self, **kwargs):
119         return self._func(**kwargs)
120
121     def __repr__(self):
122         return '<FuncWrapper(func=<%s(%s)>)>' % (self.__name__, self.__doc__)
123
124
125 class VPPApiError(Exception):
126     pass
127
128
129 class VPPNotImplementedError(NotImplementedError):
130     pass
131
132
133 class VPPIOError(IOError):
134     pass
135
136
137 class VPPRuntimeError(RuntimeError):
138     pass
139
140
141 class VPPValueError(ValueError):
142     pass
143
144
145 class VPPApiJSONFiles:
146     @classmethod
147     def find_api_dir(cls, dirs):
148         """Attempt to find the best directory in which API definition
149         files may reside. If the value VPP_API_DIR exists in the environment
150         then it is first on the search list. If we're inside a recognized
151         location in a VPP source tree (src/scripts and src/vpp-api/python)
152         then entries from there to the likely locations in build-root are
153         added. Finally the location used by system packages is added.
154
155         :returns: A single directory name, or None if no such directory
156             could be found.
157         """
158
159         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
160         # in which case, plot a course to likely places in the src tree
161         import __main__ as main
162         if hasattr(main, '__file__'):
163             # get the path of the calling script
164             localdir = os.path.dirname(os.path.realpath(main.__file__))
165         else:
166             # use cwd if there is no calling script
167             localdir = os.getcwd()
168         localdir_s = localdir.split(os.path.sep)
169
170         def dmatch(dir):
171             """Match dir against right-hand components of the script dir"""
172             d = dir.split('/')  # param 'dir' assumes a / separator
173             length = len(d)
174             return len(localdir_s) > length and localdir_s[-length:] == d
175
176         def sdir(srcdir, variant):
177             """Build a path from srcdir to the staged API files of
178             'variant'  (typically '' or '_debug')"""
179             # Since 'core' and 'plugin' files are staged
180             # in separate directories, we target the parent dir.
181             return os.path.sep.join((
182                 srcdir,
183                 'build-root',
184                 'install-vpp%s-native' % variant,
185                 'vpp',
186                 'share',
187                 'vpp',
188                 'api',
189             ))
190
191         srcdir = None
192         if dmatch('src/scripts'):
193             srcdir = os.path.sep.join(localdir_s[:-2])
194         elif dmatch('src/vpp-api/python'):
195             srcdir = os.path.sep.join(localdir_s[:-3])
196         elif dmatch('test'):
197             # we're apparently running tests
198             srcdir = os.path.sep.join(localdir_s[:-1])
199
200         if srcdir:
201             # we're in the source tree, try both the debug and release
202             # variants.
203             dirs.append(sdir(srcdir, '_debug'))
204             dirs.append(sdir(srcdir, ''))
205
206         # Test for staged copies of the scripts
207         # For these, since we explicitly know if we're running a debug versus
208         # release variant, target only the relevant directory
209         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
210             srcdir = os.path.sep.join(localdir_s[:-4])
211             dirs.append(sdir(srcdir, '_debug'))
212         if dmatch('build-root/install-vpp-native/vpp/bin'):
213             srcdir = os.path.sep.join(localdir_s[:-4])
214             dirs.append(sdir(srcdir, ''))
215
216         # finally, try the location system packages typically install into
217         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
218
219         # check the directories for existence; first one wins
220         for dir in dirs:
221             if os.path.isdir(dir):
222                 return dir
223
224         return None
225
226     @classmethod
227     def find_api_files(cls, api_dir=None, patterns='*'):  # -> list
228         """Find API definition files from the given directory tree with the
229         given pattern. If no directory is given then find_api_dir() is used
230         to locate one. If no pattern is given then all definition files found
231         in the directory tree are used.
232
233         :param api_dir: A directory tree in which to locate API definition
234             files; subdirectories are descended into.
235             If this is None then find_api_dir() is called to discover it.
236         :param patterns: A list of patterns to use in each visited directory
237             when looking for files.
238             This can be a list/tuple object or a comma-separated string of
239             patterns. Each value in the list will have leading/trialing
240             whitespace stripped.
241             The pattern specifies the first part of the filename, '.api.json'
242             is appended.
243             The results are de-duplicated, thus overlapping patterns are fine.
244             If this is None it defaults to '*' meaning "all API files".
245         :returns: A list of file paths for the API files found.
246         """
247         if api_dir is None:
248             api_dir = cls.find_api_dir([])
249             if api_dir is None:
250                 raise VPPApiError("api_dir cannot be located")
251
252         if isinstance(patterns, list) or isinstance(patterns, tuple):
253             patterns = [p.strip() + '.api.json' for p in patterns]
254         else:
255             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
256
257         api_files = []
258         for root, dirnames, files in os.walk(api_dir):
259             # iterate all given patterns and de-dup the result
260             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
261             for filename in files:
262                 api_files.append(os.path.join(root, filename))
263
264         return api_files
265
266     @classmethod
267     def process_json_file(self, apidef_file):
268         api = json.load(apidef_file)
269         return self._process_json(api)
270
271     @classmethod
272     def process_json_str(self, json_str):
273         api = json.loads(json_str)
274         return self._process_json(api)
275
276     @staticmethod
277     def _process_json(api):  # -> Tuple[Dict, Dict]
278         types = {}
279         services = {}
280         messages = {}
281         try:
282             for t in api['enums']:
283                 t[0] = 'vl_api_' + t[0] + '_t'
284                 types[t[0]] = {'type': 'enum', 'data': t}
285         except KeyError:
286             pass
287         try:
288             for t in api['enumflags']:
289                 t[0] = 'vl_api_' + t[0] + '_t'
290                 types[t[0]] = {'type': 'enum', 'data': t}
291         except KeyError:
292             pass
293         try:
294             for t in api['unions']:
295                 t[0] = 'vl_api_' + t[0] + '_t'
296                 types[t[0]] = {'type': 'union', 'data': t}
297         except KeyError:
298             pass
299
300         try:
301             for t in api['types']:
302                 t[0] = 'vl_api_' + t[0] + '_t'
303                 types[t[0]] = {'type': 'type', 'data': t}
304         except KeyError:
305             pass
306
307         try:
308             for t, v in api['aliases'].items():
309                 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
310         except KeyError:
311             pass
312
313         try:
314             services.update(api['services'])
315         except KeyError:
316             pass
317
318         i = 0
319         while True:
320             unresolved = {}
321             for k, v in types.items():
322                 t = v['data']
323                 if not vpp_get_type(k):
324                     if v['type'] == 'enum':
325                         try:
326                             VPPEnumType(t[0], t[1:])
327                         except ValueError:
328                             unresolved[k] = v
329                 if not vpp_get_type(k):
330                     if v['type'] == 'enumflag':
331                         try:
332                             VPPEnumFlagType(t[0], t[1:])
333                         except ValueError:
334                             unresolved[k] = v
335                     elif v['type'] == 'union':
336                         try:
337                             VPPUnionType(t[0], t[1:])
338                         except ValueError:
339                             unresolved[k] = v
340                     elif v['type'] == 'type':
341                         try:
342                             VPPType(t[0], t[1:])
343                         except ValueError:
344                             unresolved[k] = v
345                     elif v['type'] == 'alias':
346                         try:
347                             VPPTypeAlias(k, t)
348                         except ValueError:
349                             unresolved[k] = v
350             if len(unresolved) == 0:
351                 break
352             if i > 3:
353                 raise VPPValueError('Unresolved type definitions {}'
354                                     .format(unresolved))
355             types = unresolved
356             i += 1
357         try:
358             for m in api['messages']:
359                 try:
360                     messages[m[0]] = VPPMessage(m[0], m[1:])
361                 except VPPNotImplementedError:
362                     ### OLE FIXME
363                     logger.error('Not implemented error for {}'.format(m[0]))
364         except KeyError:
365             pass
366         return messages, services
367
368
369 class VPPApiClient:
370     """VPP interface.
371
372     This class provides the APIs to VPP.  The APIs are loaded
373     from provided .api.json files and makes functions accordingly.
374     These functions are documented in the VPP .api files, as they
375     are dynamically created.
376
377     Additionally, VPP can send callback messages; this class
378     provides a means to register a callback function to receive
379     these messages in a background thread.
380     """
381     apidir = None
382     VPPApiError = VPPApiError
383     VPPRuntimeError = VPPRuntimeError
384     VPPValueError = VPPValueError
385     VPPNotImplementedError = VPPNotImplementedError
386     VPPIOError = VPPIOError
387
388
389     def __init__(self, apifiles=None, testmode=False, async_thread=True,
390                  logger=None, loglevel=None,
391                  read_timeout=5, use_socket=False,
392                  server_address='/run/vpp/api.sock'):
393         """Create a VPP API object.
394
395         apifiles is a list of files containing API
396         descriptions that will be loaded - methods will be
397         dynamically created reflecting these APIs.  If not
398         provided this will load the API files from VPP's
399         default install location.
400
401         logger, if supplied, is the logging logger object to log to.
402         loglevel, if supplied, is the log level this logger is set
403         to report at (from the loglevels in the logging module).
404         """
405         if logger is None:
406             logger = logging.getLogger(
407                 "{}.{}".format(__name__, self.__class__.__name__))
408             if loglevel is not None:
409                 logger.setLevel(loglevel)
410         self.logger = logger
411
412         self.messages = {}
413         self.services = {}
414         self.id_names = []
415         self.id_msgdef = []
416         self.header = VPPType('header', [['u16', 'msgid'],
417                                          ['u32', 'client_index']])
418         self.apifiles = []
419         self.event_callback = None
420         self.message_queue = queue.Queue()
421         self.read_timeout = read_timeout
422         self.async_thread = async_thread
423         self.event_thread = None
424         self.testmode = testmode
425         self.use_socket = use_socket
426         self.server_address = server_address
427         self._apifiles = apifiles
428         self.stats = {}
429
430         if use_socket:
431             from . vpp_transport_socket import VppTransport
432         else:
433             from . vpp_transport_shmem import VppTransport
434
435         if not apifiles:
436             # Pick up API definitions from default directory
437             try:
438                 apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
439             except (RuntimeError, VPPApiError):
440                 # In test mode we don't care that we can't find the API files
441                 if testmode:
442                     apifiles = []
443                 else:
444                     raise VPPRuntimeError
445
446         for file in apifiles:
447             with open(file) as apidef_file:
448                 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
449                 self.messages.update(m)
450                 self.services.update(s)
451
452         self.apifiles = apifiles
453
454         # Basic sanity check
455         if len(self.messages) == 0 and not testmode:
456             raise VPPValueError(1, 'Missing JSON message definitions')
457         if not(verify_enum_hint(VppEnum.vl_api_address_family_t)):
458             raise VPPRuntimeError("Invalid address family hints. "
459                                   "Cannot continue.")
460
461         self.transport = VppTransport(self, read_timeout=read_timeout,
462                                       server_address=server_address)
463         # Make sure we allow VPP to clean up the message rings.
464         atexit.register(vpp_atexit, weakref.ref(self))
465
466         add_convenience_methods()
467
468     def get_function(self, name):
469         return getattr(self._api, name)
470
471     class ContextId:
472         """Multiprocessing-safe provider of unique context IDs."""
473         def __init__(self):
474             self.context = mp.Value(ctypes.c_uint, 0)
475             self.lock = mp.Lock()
476
477         def __call__(self):
478             """Get a new unique (or, at least, not recently used) context."""
479             with self.lock:
480                 self.context.value += 1
481                 return self.context.value
482     get_context = ContextId()
483
484     def get_type(self, name):
485         return vpp_get_type(name)
486
487     @property
488     def api(self):
489         if not hasattr(self, "_api"):
490             raise VPPApiError("Not connected, api definitions not available")
491         return self._api
492
493     def make_function(self, msg, i, multipart, do_async):
494         if (do_async):
495             def f(**kwargs):
496                 return self._call_vpp_async(i, msg, **kwargs)
497         else:
498             def f(**kwargs):
499                 return self._call_vpp(i, msg, multipart, **kwargs)
500
501         f.__name__ = str(msg.name)
502         f.__doc__ = ", ".join(["%s %s" %
503                                (msg.fieldtypes[j], k)
504                                for j, k in enumerate(msg.fields)])
505         f.msg = msg
506
507         return f
508
509     def _register_functions(self, do_async=False):
510         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
511         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
512         self._api = VppApiDynamicMethodHolder()
513         for name, msg in self.messages.items():
514             n = name + '_' + msg.crc[2:]
515             i = self.transport.get_msg_index(n)
516             if i > 0:
517                 self.id_msgdef[i] = msg
518                 self.id_names[i] = name
519
520                 # Create function for client side messages.
521                 if name in self.services:
522                     f = self.make_function(msg, i, self.services[name], do_async)
523                     setattr(self._api, name, FuncWrapper(f))
524             else:
525                 self.logger.debug(
526                     'No such message type or failed CRC checksum: %s', n)
527
528     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
529                          do_async):
530         pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
531
532         rv = self.transport.connect(name, pfx,
533                                     msg_handler, rx_qlen)
534         if rv != 0:
535             raise VPPIOError(2, 'Connect failed')
536         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
537         self._register_functions(do_async=do_async)
538
539         # Initialise control ping
540         crc = self.messages['control_ping'].crc
541         self.control_ping_index = self.transport.get_msg_index(
542             ('control_ping' + '_' + crc[2:]))
543         self.control_ping_msgdef = self.messages['control_ping']
544         if self.async_thread:
545             self.event_thread = threading.Thread(
546                 target=self.thread_msg_handler)
547             self.event_thread.daemon = True
548             self.event_thread.start()
549         else:
550             self.event_thread = None
551         return rv
552
553     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
554         """Attach to VPP.
555
556         name - the name of the client.
557         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
558         do_async - if true, messages are sent without waiting for a reply
559         rx_qlen - the length of the VPP message receive queue between
560         client and server.
561         """
562         msg_handler = self.transport.get_callback(do_async)
563         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
564                                      do_async)
565
566     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
567         """Attach to VPP in synchronous mode. Application must poll for events.
568
569         name - the name of the client.
570         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
571         rx_qlen - the length of the VPP message receive queue between
572         client and server.
573         """
574
575         return self.connect_internal(name, None, chroot_prefix, rx_qlen,
576                                      do_async=False)
577
578     def disconnect(self):
579         """Detach from VPP."""
580         rv = self.transport.disconnect()
581         if self.event_thread is not None:
582             self.message_queue.put("terminate event thread")
583         return rv
584
585     def msg_handler_sync(self, msg):
586         """Process an incoming message from VPP in sync mode.
587
588         The message may be a reply or it may be an async notification.
589         """
590         r = self.decode_incoming_msg(msg)
591         if r is None:
592             return
593
594         # If we have a context, then use the context to find any
595         # request waiting for a reply
596         context = 0
597         if hasattr(r, 'context') and r.context > 0:
598             context = r.context
599
600         if context == 0:
601             # No context -> async notification that we feed to the callback
602             self.message_queue.put_nowait(r)
603         else:
604             raise VPPIOError(2, 'RPC reply message received in event handler')
605
606     def has_context(self, msg):
607         if len(msg) < 10:
608             return False
609
610         header = VPPType('header_with_context', [['u16', 'msgid'],
611                                                  ['u32', 'client_index'],
612                                                  ['u32', 'context']])
613
614         (i, ci, context), size = header.unpack(msg, 0)
615         if self.id_names[i] == 'rx_thread_exit':
616             return
617
618         #
619         # Decode message and returns a tuple.
620         #
621         msgobj = self.id_msgdef[i]
622         if 'context' in msgobj.field_by_name and context >= 0:
623             return True
624         return False
625
626     def decode_incoming_msg(self, msg, no_type_conversion=False):
627         if not msg:
628             logger.warning('vpp_api.read failed')
629             return
630
631         (i, ci), size = self.header.unpack(msg, 0)
632         if self.id_names[i] == 'rx_thread_exit':
633             return
634
635         #
636         # Decode message and returns a tuple.
637         #
638         msgobj = self.id_msgdef[i]
639         if not msgobj:
640             raise VPPIOError(2, 'Reply message undefined')
641
642         r, size = msgobj.unpack(msg, ntc=no_type_conversion)
643         return r
644
645     def msg_handler_async(self, msg):
646         """Process a message from VPP in async mode.
647
648         In async mode, all messages are returned to the callback.
649         """
650         r = self.decode_incoming_msg(msg)
651         if r is None:
652             return
653
654         msgname = type(r).__name__
655
656         if self.event_callback:
657             self.event_callback(msgname, r)
658
659     def _control_ping(self, context):
660         """Send a ping command."""
661         self._call_vpp_async(self.control_ping_index,
662                              self.control_ping_msgdef,
663                              context=context)
664
665     def validate_args(self, msg, kwargs):
666         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
667         if d:
668             raise VPPValueError('Invalid argument {} to {}'
669                                 .format(list(d), msg.name))
670
671     def _add_stat(self, name, ms):
672         if not name in self.stats:
673             self.stats[name] = {'max': ms, 'count': 1, 'avg': ms}
674         else:
675             if ms > self.stats[name]['max']:
676                 self.stats[name]['max'] = ms
677             self.stats[name]['count'] += 1
678             n = self.stats[name]['count']
679             self.stats[name]['avg'] = self.stats[name]['avg'] * (n - 1) / n + ms / n
680
681     def get_stats(self):
682         s = '\n=== API PAPI STATISTICS ===\n'
683         s += '{:<30} {:>4} {:>6} {:>6}\n'.format('message', 'cnt', 'avg', 'max')
684         for n in sorted(self.stats.items(), key=lambda v: v[1]['avg'], reverse=True):
685             s += '{:<30} {:>4} {:>6.2f} {:>6.2f}\n'.format(n[0], n[1]['count'],
686                                                            n[1]['avg'], n[1]['max'])
687         return s
688
689     def _call_vpp(self, i, msgdef, service, **kwargs):
690         """Given a message, send the message and await a reply.
691
692         msgdef - the message packing definition
693         i - the message type index
694         multipart - True if the message returns multiple
695         messages in return.
696         context - context number - chosen at random if not
697         supplied.
698         The remainder of the kwargs are the arguments to the API call.
699
700         The return value is the message or message array containing
701         the response.  It will raise an IOError exception if there was
702         no response within the timeout window.
703         """
704         ts = time.time()
705         if 'context' not in kwargs:
706             context = self.get_context()
707             kwargs['context'] = context
708         else:
709             context = kwargs['context']
710         kwargs['_vl_msg_id'] = i
711
712         no_type_conversion = kwargs.pop('_no_type_conversion', False)
713         timeout = kwargs.pop('_timeout', None)
714
715         try:
716             if self.transport.socket_index:
717                 kwargs['client_index'] = self.transport.socket_index
718         except AttributeError:
719             pass
720         self.validate_args(msgdef, kwargs)
721
722         s = 'Calling {}({})'.format(msgdef.name,
723             ','.join(['{!r}:{!r}'.format(k, v) for k, v in kwargs.items()]))
724         self.logger.debug(s)
725
726         b = msgdef.pack(kwargs)
727         self.transport.suspend()
728
729         self.transport.write(b)
730
731         msgreply = service['reply']
732         stream = True if 'stream' in service else False
733         if stream:
734             if 'stream_msg' in service:
735                 # New service['reply'] = _reply and service['stream_message'] = _details
736                 stream_message = service['stream_msg']
737                 modern =True
738             else:
739                 # Old  service['reply'] = _details
740                 stream_message = msgreply
741                 msgreply = 'control_ping_reply'
742                 modern = False
743                 # Send a ping after the request - we use its response
744                 # to detect that we have seen all results.
745                 self._control_ping(context)
746
747         # Block until we get a reply.
748         rl = []
749         while (True):
750             r = self.read_blocking(no_type_conversion, timeout)
751             if r is None:
752                 raise VPPIOError(2, 'VPP API client: read failed')
753             msgname = type(r).__name__
754             if context not in r or r.context == 0 or context != r.context:
755                 # Message being queued
756                 self.message_queue.put_nowait(r)
757                 continue
758             if msgname != msgreply and (stream and (msgname != stream_message)):
759                 print('REPLY MISMATCH', msgreply, msgname, stream_message, stream)
760             if not stream:
761                 rl = r
762                 break
763             if msgname == msgreply:
764                 if modern: # Return both reply and list
765                     rl = r, rl
766                 break
767
768             rl.append(r)
769
770         self.transport.resume()
771
772         s = 'Return value: {!r}'.format(r)
773         if len(s) > 80:
774             s = s[:80] + "..."
775         self.logger.debug(s)
776         te = time.time()
777         self._add_stat(msgdef.name, (te - ts) * 1000)
778         return rl
779
780     def _call_vpp_async(self, i, msg, **kwargs):
781         """Given a message, send the message and return the context.
782
783         msgdef - the message packing definition
784         i - the message type index
785         context - context number - chosen at random if not
786         supplied.
787         The remainder of the kwargs are the arguments to the API call.
788
789         The reply message(s) will be delivered later to the registered callback.
790         The returned context will help with assigning which call
791         the reply belongs to.
792         """
793         if 'context' not in kwargs:
794             context = self.get_context()
795             kwargs['context'] = context
796         else:
797             context = kwargs['context']
798         try:
799             if self.transport.socket_index:
800                 kwargs['client_index'] = self.transport.socket_index
801         except AttributeError:
802             kwargs['client_index'] = 0
803         kwargs['_vl_msg_id'] = i
804         b = msg.pack(kwargs)
805
806         self.transport.write(b)
807         return context
808
809     def read_blocking(self, no_type_conversion=False, timeout=None):
810         """Get next received message from transport within timeout, decoded.
811
812         Note that notifications have context zero
813         and are not put into receive queue (at least for socket transport),
814         use async_thread with registered callback for processing them.
815
816         If no message appears in the queue within timeout, return None.
817
818         Optionally, type conversion can be skipped,
819         as some of conversions are into less precise types.
820
821         When r is the return value of this, the caller can get message name as:
822             msgname = type(r).__name__
823         and context number (type long) as:
824             context = r.context
825
826         :param no_type_conversion: If false, type conversions are applied.
827         :type no_type_conversion: bool
828         :returns: Decoded message, or None if no message (within timeout).
829         :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
830         :raises VppTransportShmemIOError if timed out.
831         """
832         msg = self.transport.read(timeout=timeout)
833         if not msg:
834             return None
835         return self.decode_incoming_msg(msg, no_type_conversion)
836
837     def register_event_callback(self, callback):
838         """Register a callback for async messages.
839
840         This will be called for async notifications in sync mode,
841         and all messages in async mode.  In sync mode, replies to
842         requests will not come here.
843
844         callback is a fn(msg_type_name, msg_type) that will be
845         called when a message comes in.  While this function is
846         executing, note that (a) you are in a background thread and
847         may wish to use threading.Lock to protect your datastructures,
848         and (b) message processing from VPP will stop (so if you take
849         a long while about it you may provoke reply timeouts or cause
850         VPP to fill the RX buffer).  Passing None will disable the
851         callback.
852         """
853         self.event_callback = callback
854
855     def thread_msg_handler(self):
856         """Python thread calling the user registered message handler.
857
858         This is to emulate the old style event callback scheme. Modern
859         clients should provide their own thread to poll the event
860         queue.
861         """
862         while True:
863             r = self.message_queue.get()
864             if r == "terminate event thread":
865                 break
866             msgname = type(r).__name__
867             if self.event_callback:
868                 self.event_callback(msgname, r)
869
870     def validate_message_table(self, namecrctable):
871         """Take a dictionary of name_crc message names
872         and returns an array of missing messages"""
873
874         missing_table = []
875         for name_crc in namecrctable:
876             i = self.transport.get_msg_index(name_crc)
877             if i <= 0:
878                 missing_table.append(name_crc)
879         return missing_table
880
881     def dump_message_table(self):
882         """Return VPPs API message table as name_crc dictionary"""
883         return self.transport.message_table
884
885     def dump_message_table_filtered(self, msglist):
886         """Return VPPs API message table as name_crc dictionary,
887         filtered by message name list."""
888
889         replies = [self.services[n]['reply'] for n in msglist]
890         message_table_filtered = {}
891         for name in msglist + replies:
892             for k,v in self.transport.message_table.items():
893                 if k.startswith(name):
894                     message_table_filtered[k] = v
895                     break
896         return message_table_filtered
897
898     def __repr__(self):
899         return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
900                "logger=%s, read_timeout=%s, use_socket=%s, " \
901                "server_address='%s'>" % (
902                    self._apifiles, self.testmode, self.async_thread,
903                    self.logger, self.read_timeout, self.use_socket,
904                    self.server_address)
905
906     def details_iter(self, f, **kwargs):
907         cursor = 0
908         while True:
909             kwargs['cursor'] = cursor
910             rv, details = f(**kwargs)
911             #
912             # Convert to yield from details when we only support python 3
913             #
914             for d in details:
915                 yield d
916             if rv.retval == 0 or rv.retval != -165:
917                 break
918             cursor = rv.cursor
919
920 # Provide the old name for backward compatibility.
921 VPP = VPPApiClient
922
923 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4