papi: add method to retrieve field options
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import ctypes
20 import ipaddress
21 import sys
22 import multiprocessing as mp
23 import os
24 import queue
25 import logging
26 import functools
27 import json
28 import threading
29 import fnmatch
30 import weakref
31 import atexit
32 import time
33 from . vpp_format import verify_enum_hint
34 from . vpp_serializer import VPPType, VPPEnumType, VPPEnumFlagType, VPPUnionType
35 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
36
37 try:
38     import VppTransport
39 except ModuleNotFoundError:
40     class V:
41         """placeholder for VppTransport as the implementation is dependent on
42         VPPAPIClient's initialization values
43         """
44
45     VppTransport = V
46
47 logger = logging.getLogger('vpp_papi')
48 logger.addHandler(logging.NullHandler())
49
50 __all__ = ('FuncWrapper', 'VppApiDynamicMethodHolder',
51            'VppEnum', 'VppEnumType', 'VppEnumFlag',
52            'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
53            'VPPApiClient', )
54
55
56 def metaclass(metaclass):
57     @functools.wraps(metaclass)
58     def wrapper(cls):
59         return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
60
61     return wrapper
62
63
64 class VppEnumType(type):
65     def __getattr__(cls, name):
66         t = vpp_get_type(name)
67         return t.enum
68
69
70 @metaclass(VppEnumType)
71 class VppEnum:
72     pass
73
74
75 @metaclass(VppEnumType)
76 class VppEnumFlag:
77     pass
78
79
80 def vpp_atexit(vpp_weakref):
81     """Clean up VPP connection on shutdown."""
82     vpp_instance = vpp_weakref()
83     if vpp_instance and vpp_instance.transport.connected:
84         logger.debug('Cleaning up VPP on exit')
85         vpp_instance.disconnect()
86
87
88 def add_convenience_methods():
89     # provide convenience methods to IP[46]Address.vapi_af
90     def _vapi_af(self):
91         if 6 == self._version:
92             return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
93         if 4 == self._version:
94             return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
95         raise ValueError("Invalid _version.")
96
97     def _vapi_af_name(self):
98         if 6 == self._version:
99             return 'ip6'
100         if 4 == self._version:
101             return 'ip4'
102         raise ValueError("Invalid _version.")
103
104     ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
105     ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
106
107
108 class VppApiDynamicMethodHolder:
109     pass
110
111
112 class FuncWrapper:
113     def __init__(self, func):
114         self._func = func
115         self.__name__ = func.__name__
116         self.__doc__ = func.__doc__
117
118     def __call__(self, **kwargs):
119         return self._func(**kwargs)
120
121     def __repr__(self):
122         return '<FuncWrapper(func=<%s(%s)>)>' % (self.__name__, self.__doc__)
123
124
125 class VPPApiError(Exception):
126     pass
127
128
129 class VPPNotImplementedError(NotImplementedError):
130     pass
131
132
133 class VPPIOError(IOError):
134     pass
135
136
137 class VPPRuntimeError(RuntimeError):
138     pass
139
140
141 class VPPValueError(ValueError):
142     pass
143
144
145 class VPPApiJSONFiles:
146     @classmethod
147     def find_api_dir(cls, dirs):
148         """Attempt to find the best directory in which API definition
149         files may reside. If the value VPP_API_DIR exists in the environment
150         then it is first on the search list. If we're inside a recognized
151         location in a VPP source tree (src/scripts and src/vpp-api/python)
152         then entries from there to the likely locations in build-root are
153         added. Finally the location used by system packages is added.
154
155         :returns: A single directory name, or None if no such directory
156             could be found.
157         """
158
159         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
160         # in which case, plot a course to likely places in the src tree
161         import __main__ as main
162         if hasattr(main, '__file__'):
163             # get the path of the calling script
164             localdir = os.path.dirname(os.path.realpath(main.__file__))
165         else:
166             # use cwd if there is no calling script
167             localdir = os.getcwd()
168         localdir_s = localdir.split(os.path.sep)
169
170         def dmatch(dir):
171             """Match dir against right-hand components of the script dir"""
172             d = dir.split('/')  # param 'dir' assumes a / separator
173             length = len(d)
174             return len(localdir_s) > length and localdir_s[-length:] == d
175
176         def sdir(srcdir, variant):
177             """Build a path from srcdir to the staged API files of
178             'variant'  (typically '' or '_debug')"""
179             # Since 'core' and 'plugin' files are staged
180             # in separate directories, we target the parent dir.
181             return os.path.sep.join((
182                 srcdir,
183                 'build-root',
184                 'install-vpp%s-native' % variant,
185                 'vpp',
186                 'share',
187                 'vpp',
188                 'api',
189             ))
190
191         srcdir = None
192         if dmatch('src/scripts'):
193             srcdir = os.path.sep.join(localdir_s[:-2])
194         elif dmatch('src/vpp-api/python'):
195             srcdir = os.path.sep.join(localdir_s[:-3])
196         elif dmatch('test'):
197             # we're apparently running tests
198             srcdir = os.path.sep.join(localdir_s[:-1])
199
200         if srcdir:
201             # we're in the source tree, try both the debug and release
202             # variants.
203             dirs.append(sdir(srcdir, '_debug'))
204             dirs.append(sdir(srcdir, ''))
205
206         # Test for staged copies of the scripts
207         # For these, since we explicitly know if we're running a debug versus
208         # release variant, target only the relevant directory
209         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
210             srcdir = os.path.sep.join(localdir_s[:-4])
211             dirs.append(sdir(srcdir, '_debug'))
212         if dmatch('build-root/install-vpp-native/vpp/bin'):
213             srcdir = os.path.sep.join(localdir_s[:-4])
214             dirs.append(sdir(srcdir, ''))
215
216         # finally, try the location system packages typically install into
217         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
218
219         # check the directories for existence; first one wins
220         for dir in dirs:
221             if os.path.isdir(dir):
222                 return dir
223
224         return None
225
226     @classmethod
227     def find_api_files(cls, api_dir=None, patterns='*'):  # -> list
228         """Find API definition files from the given directory tree with the
229         given pattern. If no directory is given then find_api_dir() is used
230         to locate one. If no pattern is given then all definition files found
231         in the directory tree are used.
232
233         :param api_dir: A directory tree in which to locate API definition
234             files; subdirectories are descended into.
235             If this is None then find_api_dir() is called to discover it.
236         :param patterns: A list of patterns to use in each visited directory
237             when looking for files.
238             This can be a list/tuple object or a comma-separated string of
239             patterns. Each value in the list will have leading/trialing
240             whitespace stripped.
241             The pattern specifies the first part of the filename, '.api.json'
242             is appended.
243             The results are de-duplicated, thus overlapping patterns are fine.
244             If this is None it defaults to '*' meaning "all API files".
245         :returns: A list of file paths for the API files found.
246         """
247         if api_dir is None:
248             api_dir = cls.find_api_dir([])
249             if api_dir is None:
250                 raise VPPApiError("api_dir cannot be located")
251
252         if isinstance(patterns, list) or isinstance(patterns, tuple):
253             patterns = [p.strip() + '.api.json' for p in patterns]
254         else:
255             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
256
257         api_files = []
258         for root, dirnames, files in os.walk(api_dir):
259             # iterate all given patterns and de-dup the result
260             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
261             for filename in files:
262                 api_files.append(os.path.join(root, filename))
263
264         return api_files
265
266     @classmethod
267     def process_json_file(self, apidef_file):
268         api = json.load(apidef_file)
269         return self._process_json(api)
270
271     @classmethod
272     def process_json_str(self, json_str):
273         api = json.loads(json_str)
274         return self._process_json(api)
275
276     @staticmethod
277     def _process_json(api):  # -> Tuple[Dict, Dict]
278         types = {}
279         services = {}
280         messages = {}
281         try:
282             for t in api['enums']:
283                 t[0] = 'vl_api_' + t[0] + '_t'
284                 types[t[0]] = {'type': 'enum', 'data': t}
285         except KeyError:
286             pass
287         try:
288             for t in api['enumflags']:
289                 t[0] = 'vl_api_' + t[0] + '_t'
290                 types[t[0]] = {'type': 'enum', 'data': t}
291         except KeyError:
292             pass
293         try:
294             for t in api['unions']:
295                 t[0] = 'vl_api_' + t[0] + '_t'
296                 types[t[0]] = {'type': 'union', 'data': t}
297         except KeyError:
298             pass
299
300         try:
301             for t in api['types']:
302                 t[0] = 'vl_api_' + t[0] + '_t'
303                 types[t[0]] = {'type': 'type', 'data': t}
304         except KeyError:
305             pass
306
307         try:
308             for t, v in api['aliases'].items():
309                 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
310         except KeyError:
311             pass
312
313         try:
314             services.update(api['services'])
315         except KeyError:
316             pass
317
318         i = 0
319         while True:
320             unresolved = {}
321             for k, v in types.items():
322                 t = v['data']
323                 if not vpp_get_type(k):
324                     if v['type'] == 'enum':
325                         try:
326                             VPPEnumType(t[0], t[1:])
327                         except ValueError:
328                             unresolved[k] = v
329                 if not vpp_get_type(k):
330                     if v['type'] == 'enumflag':
331                         try:
332                             VPPEnumFlagType(t[0], t[1:])
333                         except ValueError:
334                             unresolved[k] = v
335                     elif v['type'] == 'union':
336                         try:
337                             VPPUnionType(t[0], t[1:])
338                         except ValueError:
339                             unresolved[k] = v
340                     elif v['type'] == 'type':
341                         try:
342                             VPPType(t[0], t[1:])
343                         except ValueError:
344                             unresolved[k] = v
345                     elif v['type'] == 'alias':
346                         try:
347                             VPPTypeAlias(k, t)
348                         except ValueError:
349                             unresolved[k] = v
350             if len(unresolved) == 0:
351                 break
352             if i > 3:
353                 raise VPPValueError('Unresolved type definitions {}'
354                                     .format(unresolved))
355             types = unresolved
356             i += 1
357         try:
358             for m in api['messages']:
359                 try:
360                     messages[m[0]] = VPPMessage(m[0], m[1:])
361                 except VPPNotImplementedError:
362                     ### OLE FIXME
363                     logger.error('Not implemented error for {}'.format(m[0]))
364         except KeyError:
365             pass
366         return messages, services
367
368
369 class VPPApiClient:
370     """VPP interface.
371
372     This class provides the APIs to VPP.  The APIs are loaded
373     from provided .api.json files and makes functions accordingly.
374     These functions are documented in the VPP .api files, as they
375     are dynamically created.
376
377     Additionally, VPP can send callback messages; this class
378     provides a means to register a callback function to receive
379     these messages in a background thread.
380     """
381     apidir = None
382     VPPApiError = VPPApiError
383     VPPRuntimeError = VPPRuntimeError
384     VPPValueError = VPPValueError
385     VPPNotImplementedError = VPPNotImplementedError
386     VPPIOError = VPPIOError
387
388
389     def __init__(self, apifiles=None, testmode=False, async_thread=True,
390                  logger=None, loglevel=None,
391                  read_timeout=5, use_socket=False,
392                  server_address='/run/vpp/api.sock'):
393         """Create a VPP API object.
394
395         apifiles is a list of files containing API
396         descriptions that will be loaded - methods will be
397         dynamically created reflecting these APIs.  If not
398         provided this will load the API files from VPP's
399         default install location.
400
401         logger, if supplied, is the logging logger object to log to.
402         loglevel, if supplied, is the log level this logger is set
403         to report at (from the loglevels in the logging module).
404         """
405         if logger is None:
406             logger = logging.getLogger(
407                 "{}.{}".format(__name__, self.__class__.__name__))
408             if loglevel is not None:
409                 logger.setLevel(loglevel)
410         self.logger = logger
411
412         self.messages = {}
413         self.services = {}
414         self.id_names = []
415         self.id_msgdef = []
416         self.header = VPPType('header', [['u16', 'msgid'],
417                                          ['u32', 'client_index']])
418         self.apifiles = []
419         self.event_callback = None
420         self.message_queue = queue.Queue()
421         self.read_timeout = read_timeout
422         self.async_thread = async_thread
423         self.event_thread = None
424         self.testmode = testmode
425         self.use_socket = use_socket
426         self.server_address = server_address
427         self._apifiles = apifiles
428         self.stats = {}
429
430         if use_socket:
431             from . vpp_transport_socket import VppTransport
432         else:
433             from . vpp_transport_shmem import VppTransport
434
435         if not apifiles:
436             # Pick up API definitions from default directory
437             try:
438                 apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
439             except (RuntimeError, VPPApiError):
440                 # In test mode we don't care that we can't find the API files
441                 if testmode:
442                     apifiles = []
443                 else:
444                     raise VPPRuntimeError
445
446         for file in apifiles:
447             with open(file) as apidef_file:
448                 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
449                 self.messages.update(m)
450                 self.services.update(s)
451
452         self.apifiles = apifiles
453
454         # Basic sanity check
455         if len(self.messages) == 0 and not testmode:
456             raise VPPValueError(1, 'Missing JSON message definitions')
457         if not(verify_enum_hint(VppEnum.vl_api_address_family_t)):
458             raise VPPRuntimeError("Invalid address family hints. "
459                                   "Cannot continue.")
460
461         self.transport = VppTransport(self, read_timeout=read_timeout,
462                                       server_address=server_address)
463         # Make sure we allow VPP to clean up the message rings.
464         atexit.register(vpp_atexit, weakref.ref(self))
465
466         add_convenience_methods()
467
468     def get_function(self, name):
469         return getattr(self._api, name)
470
471     class ContextId:
472         """Multiprocessing-safe provider of unique context IDs."""
473         def __init__(self):
474             self.context = mp.Value(ctypes.c_uint, 0)
475             self.lock = mp.Lock()
476
477         def __call__(self):
478             """Get a new unique (or, at least, not recently used) context."""
479             with self.lock:
480                 self.context.value += 1
481                 return self.context.value
482     get_context = ContextId()
483
484     def get_type(self, name):
485         return vpp_get_type(name)
486
487     @property
488     def api(self):
489         if not hasattr(self, "_api"):
490             raise VPPApiError("Not connected, api definitions not available")
491         return self._api
492
493     def make_function(self, msg, i, multipart, do_async):
494         if (do_async):
495             def f(**kwargs):
496                 return self._call_vpp_async(i, msg, **kwargs)
497         else:
498             def f(**kwargs):
499                 return self._call_vpp(i, msg, multipart, **kwargs)
500
501         f.__name__ = str(msg.name)
502         f.__doc__ = ", ".join(["%s %s" %
503                                (msg.fieldtypes[j], k)
504                                for j, k in enumerate(msg.fields)])
505         f.msg = msg
506
507         return f
508
509     def _register_functions(self, do_async=False):
510         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
511         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
512         self._api = VppApiDynamicMethodHolder()
513         for name, msg in self.messages.items():
514             n = name + '_' + msg.crc[2:]
515             i = self.transport.get_msg_index(n)
516             if i > 0:
517                 self.id_msgdef[i] = msg
518                 self.id_names[i] = name
519
520                 # Create function for client side messages.
521                 if name in self.services:
522                     f = self.make_function(msg, i, self.services[name], do_async)
523                     setattr(self._api, name, FuncWrapper(f))
524             else:
525                 self.logger.debug(
526                     'No such message type or failed CRC checksum: %s', n)
527
528     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
529                          do_async):
530         pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
531
532         rv = self.transport.connect(name, pfx,
533                                     msg_handler, rx_qlen)
534         if rv != 0:
535             raise VPPIOError(2, 'Connect failed')
536         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
537         self._register_functions(do_async=do_async)
538
539         # Initialise control ping
540         crc = self.messages['control_ping'].crc
541         self.control_ping_index = self.transport.get_msg_index(
542             ('control_ping' + '_' + crc[2:]))
543         self.control_ping_msgdef = self.messages['control_ping']
544         if self.async_thread:
545             self.event_thread = threading.Thread(
546                 target=self.thread_msg_handler)
547             self.event_thread.daemon = True
548             self.event_thread.start()
549         else:
550             self.event_thread = None
551         return rv
552
553     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
554         """Attach to VPP.
555
556         name - the name of the client.
557         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
558         do_async - if true, messages are sent without waiting for a reply
559         rx_qlen - the length of the VPP message receive queue between
560         client and server.
561         """
562         msg_handler = self.transport.get_callback(do_async)
563         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
564                                      do_async)
565
566     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
567         """Attach to VPP in synchronous mode. Application must poll for events.
568
569         name - the name of the client.
570         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
571         rx_qlen - the length of the VPP message receive queue between
572         client and server.
573         """
574
575         return self.connect_internal(name, None, chroot_prefix, rx_qlen,
576                                      do_async=False)
577
578     def disconnect(self):
579         """Detach from VPP."""
580         rv = self.transport.disconnect()
581         if self.event_thread is not None:
582             self.message_queue.put("terminate event thread")
583         return rv
584
585     def msg_handler_sync(self, msg):
586         """Process an incoming message from VPP in sync mode.
587
588         The message may be a reply or it may be an async notification.
589         """
590         r = self.decode_incoming_msg(msg)
591         if r is None:
592             return
593
594         # If we have a context, then use the context to find any
595         # request waiting for a reply
596         context = 0
597         if hasattr(r, 'context') and r.context > 0:
598             context = r.context
599
600         if context == 0:
601             # No context -> async notification that we feed to the callback
602             self.message_queue.put_nowait(r)
603         else:
604             raise VPPIOError(2, 'RPC reply message received in event handler')
605
606     def has_context(self, msg):
607         if len(msg) < 10:
608             return False
609
610         header = VPPType('header_with_context', [['u16', 'msgid'],
611                                                  ['u32', 'client_index'],
612                                                  ['u32', 'context']])
613
614         (i, ci, context), size = header.unpack(msg, 0)
615         if self.id_names[i] == 'rx_thread_exit':
616             return
617
618         #
619         # Decode message and returns a tuple.
620         #
621         msgobj = self.id_msgdef[i]
622         if 'context' in msgobj.field_by_name and context >= 0:
623             return True
624         return False
625
626     def decode_incoming_msg(self, msg, no_type_conversion=False):
627         if not msg:
628             logger.warning('vpp_api.read failed')
629             return
630
631         (i, ci), size = self.header.unpack(msg, 0)
632         if self.id_names[i] == 'rx_thread_exit':
633             return
634
635         #
636         # Decode message and returns a tuple.
637         #
638         msgobj = self.id_msgdef[i]
639         if not msgobj:
640             raise VPPIOError(2, 'Reply message undefined')
641
642         r, size = msgobj.unpack(msg, ntc=no_type_conversion)
643         return r
644
645     def msg_handler_async(self, msg):
646         """Process a message from VPP in async mode.
647
648         In async mode, all messages are returned to the callback.
649         """
650         r = self.decode_incoming_msg(msg)
651         if r is None:
652             return
653
654         msgname = type(r).__name__
655
656         if self.event_callback:
657             self.event_callback(msgname, r)
658
659     def _control_ping(self, context):
660         """Send a ping command."""
661         self._call_vpp_async(self.control_ping_index,
662                              self.control_ping_msgdef,
663                              context=context)
664
665     def validate_args(self, msg, kwargs):
666         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
667         if d:
668             raise VPPValueError('Invalid argument {} to {}'
669                                 .format(list(d), msg.name))
670
671     def _add_stat(self, name, ms):
672         if not name in self.stats:
673             self.stats[name] = {'max': ms, 'count': 1, 'avg': ms}
674         else:
675             if ms > self.stats[name]['max']:
676                 self.stats[name]['max'] = ms
677             self.stats[name]['count'] += 1
678             n = self.stats[name]['count']
679             self.stats[name]['avg'] = self.stats[name]['avg'] * (n - 1) / n + ms / n
680
681     def get_stats(self):
682         s = '\n=== API PAPI STATISTICS ===\n'
683         s += '{:<30} {:>4} {:>6} {:>6}\n'.format('message', 'cnt', 'avg', 'max')
684         for n in sorted(self.stats.items(), key=lambda v: v[1]['avg'], reverse=True):
685             s += '{:<30} {:>4} {:>6.2f} {:>6.2f}\n'.format(n[0], n[1]['count'],
686                                                            n[1]['avg'], n[1]['max'])
687         return s
688
689     def get_field_options(self, msg, fld_name):
690         # when there is an option, the msgdef has 3 elements.
691         # ['u32', 'ring_size', {'default': 1024}]
692         for _def in self.messages[msg].msgdef:
693             if isinstance(_def, list) and \
694                     len(_def) == 3 and \
695                     _def[1] == fld_name:
696                 return _def[2]
697
698     def _call_vpp(self, i, msgdef, service, **kwargs):
699         """Given a message, send the message and await a reply.
700
701         msgdef - the message packing definition
702         i - the message type index
703         multipart - True if the message returns multiple
704         messages in return.
705         context - context number - chosen at random if not
706         supplied.
707         The remainder of the kwargs are the arguments to the API call.
708
709         The return value is the message or message array containing
710         the response.  It will raise an IOError exception if there was
711         no response within the timeout window.
712         """
713         ts = time.time()
714         if 'context' not in kwargs:
715             context = self.get_context()
716             kwargs['context'] = context
717         else:
718             context = kwargs['context']
719         kwargs['_vl_msg_id'] = i
720
721         no_type_conversion = kwargs.pop('_no_type_conversion', False)
722         timeout = kwargs.pop('_timeout', None)
723
724         try:
725             if self.transport.socket_index:
726                 kwargs['client_index'] = self.transport.socket_index
727         except AttributeError:
728             pass
729         self.validate_args(msgdef, kwargs)
730
731         s = 'Calling {}({})'.format(msgdef.name,
732             ','.join(['{!r}:{!r}'.format(k, v) for k, v in kwargs.items()]))
733         self.logger.debug(s)
734
735         b = msgdef.pack(kwargs)
736         self.transport.suspend()
737
738         self.transport.write(b)
739
740         msgreply = service['reply']
741         stream = True if 'stream' in service else False
742         if stream:
743             if 'stream_msg' in service:
744                 # New service['reply'] = _reply and service['stream_message'] = _details
745                 stream_message = service['stream_msg']
746                 modern =True
747             else:
748                 # Old  service['reply'] = _details
749                 stream_message = msgreply
750                 msgreply = 'control_ping_reply'
751                 modern = False
752                 # Send a ping after the request - we use its response
753                 # to detect that we have seen all results.
754                 self._control_ping(context)
755
756         # Block until we get a reply.
757         rl = []
758         while (True):
759             r = self.read_blocking(no_type_conversion, timeout)
760             if r is None:
761                 raise VPPIOError(2, 'VPP API client: read failed')
762             msgname = type(r).__name__
763             if context not in r or r.context == 0 or context != r.context:
764                 # Message being queued
765                 self.message_queue.put_nowait(r)
766                 continue
767             if msgname != msgreply and (stream and (msgname != stream_message)):
768                 print('REPLY MISMATCH', msgreply, msgname, stream_message, stream)
769             if not stream:
770                 rl = r
771                 break
772             if msgname == msgreply:
773                 if modern: # Return both reply and list
774                     rl = r, rl
775                 break
776
777             rl.append(r)
778
779         self.transport.resume()
780
781         s = 'Return value: {!r}'.format(r)
782         if len(s) > 80:
783             s = s[:80] + "..."
784         self.logger.debug(s)
785         te = time.time()
786         self._add_stat(msgdef.name, (te - ts) * 1000)
787         return rl
788
789     def _call_vpp_async(self, i, msg, **kwargs):
790         """Given a message, send the message and return the context.
791
792         msgdef - the message packing definition
793         i - the message type index
794         context - context number - chosen at random if not
795         supplied.
796         The remainder of the kwargs are the arguments to the API call.
797
798         The reply message(s) will be delivered later to the registered callback.
799         The returned context will help with assigning which call
800         the reply belongs to.
801         """
802         if 'context' not in kwargs:
803             context = self.get_context()
804             kwargs['context'] = context
805         else:
806             context = kwargs['context']
807         try:
808             if self.transport.socket_index:
809                 kwargs['client_index'] = self.transport.socket_index
810         except AttributeError:
811             kwargs['client_index'] = 0
812         kwargs['_vl_msg_id'] = i
813         b = msg.pack(kwargs)
814
815         self.transport.write(b)
816         return context
817
818     def read_blocking(self, no_type_conversion=False, timeout=None):
819         """Get next received message from transport within timeout, decoded.
820
821         Note that notifications have context zero
822         and are not put into receive queue (at least for socket transport),
823         use async_thread with registered callback for processing them.
824
825         If no message appears in the queue within timeout, return None.
826
827         Optionally, type conversion can be skipped,
828         as some of conversions are into less precise types.
829
830         When r is the return value of this, the caller can get message name as:
831             msgname = type(r).__name__
832         and context number (type long) as:
833             context = r.context
834
835         :param no_type_conversion: If false, type conversions are applied.
836         :type no_type_conversion: bool
837         :returns: Decoded message, or None if no message (within timeout).
838         :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
839         :raises VppTransportShmemIOError if timed out.
840         """
841         msg = self.transport.read(timeout=timeout)
842         if not msg:
843             return None
844         return self.decode_incoming_msg(msg, no_type_conversion)
845
846     def register_event_callback(self, callback):
847         """Register a callback for async messages.
848
849         This will be called for async notifications in sync mode,
850         and all messages in async mode.  In sync mode, replies to
851         requests will not come here.
852
853         callback is a fn(msg_type_name, msg_type) that will be
854         called when a message comes in.  While this function is
855         executing, note that (a) you are in a background thread and
856         may wish to use threading.Lock to protect your datastructures,
857         and (b) message processing from VPP will stop (so if you take
858         a long while about it you may provoke reply timeouts or cause
859         VPP to fill the RX buffer).  Passing None will disable the
860         callback.
861         """
862         self.event_callback = callback
863
864     def thread_msg_handler(self):
865         """Python thread calling the user registered message handler.
866
867         This is to emulate the old style event callback scheme. Modern
868         clients should provide their own thread to poll the event
869         queue.
870         """
871         while True:
872             r = self.message_queue.get()
873             if r == "terminate event thread":
874                 break
875             msgname = type(r).__name__
876             if self.event_callback:
877                 self.event_callback(msgname, r)
878
879     def validate_message_table(self, namecrctable):
880         """Take a dictionary of name_crc message names
881         and returns an array of missing messages"""
882
883         missing_table = []
884         for name_crc in namecrctable:
885             i = self.transport.get_msg_index(name_crc)
886             if i <= 0:
887                 missing_table.append(name_crc)
888         return missing_table
889
890     def dump_message_table(self):
891         """Return VPPs API message table as name_crc dictionary"""
892         return self.transport.message_table
893
894     def dump_message_table_filtered(self, msglist):
895         """Return VPPs API message table as name_crc dictionary,
896         filtered by message name list."""
897
898         replies = [self.services[n]['reply'] for n in msglist]
899         message_table_filtered = {}
900         for name in msglist + replies:
901             for k,v in self.transport.message_table.items():
902                 if k.startswith(name):
903                     message_table_filtered[k] = v
904                     break
905         return message_table_filtered
906
907     def __repr__(self):
908         return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
909                "logger=%s, read_timeout=%s, use_socket=%s, " \
910                "server_address='%s'>" % (
911                    self._apifiles, self.testmode, self.async_thread,
912                    self.logger, self.read_timeout, self.use_socket,
913                    self.server_address)
914
915     def details_iter(self, f, **kwargs):
916         cursor = 0
917         while True:
918             kwargs['cursor'] = cursor
919             rv, details = f(**kwargs)
920             #
921             # Convert to yield from details when we only support python 3
922             #
923             for d in details:
924                 yield d
925             if rv.retval == 0 or rv.retval != -165:
926                 break
927             cursor = rv.cursor
928
929 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4