api: add new stream message convention
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import ctypes
20 import ipaddress
21 import sys
22 import multiprocessing as mp
23 import os
24 import logging
25 import functools
26 import json
27 import threading
28 import fnmatch
29 import weakref
30 import atexit
31 import time
32 from . vpp_format import verify_enum_hint
33 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType
34 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
35
36 if sys.version[0] == '2':
37     import Queue as queue
38 else:
39     import queue as queue
40
41 __all__ = ('FuncWrapper', 'VPP', 'VppApiDynamicMethodHolder',
42            'VppEnum', 'VppEnumType',
43            'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
44            'VPPApiClient', )
45
46
47 def metaclass(metaclass):
48     @functools.wraps(metaclass)
49     def wrapper(cls):
50         return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
51
52     return wrapper
53
54
55 class VppEnumType(type):
56     def __getattr__(cls, name):
57         t = vpp_get_type(name)
58         return t.enum
59
60
61 @metaclass(VppEnumType)
62 class VppEnum(object):
63     pass
64
65
66 def vpp_atexit(vpp_weakref):
67     """Clean up VPP connection on shutdown."""
68     vpp_instance = vpp_weakref()
69     if vpp_instance and vpp_instance.transport.connected:
70         vpp_instance.logger.debug('Cleaning up VPP on exit')
71         vpp_instance.disconnect()
72
73
74 if sys.version[0] == '2':
75     def vpp_iterator(d):
76         return d.iteritems()
77 else:
78     def vpp_iterator(d):
79         return d.items()
80
81
82 def add_convenience_methods():
83     # provide convenience methods to IP[46]Address.vapi_af
84     def _vapi_af(self):
85         if 6 == self._version:
86             return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
87         if 4 == self._version:
88             return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
89         raise ValueError("Invalid _version.")
90
91     def _vapi_af_name(self):
92         if 6 == self._version:
93             return 'ip6'
94         if 4 == self._version:
95             return 'ip4'
96         raise ValueError("Invalid _version.")
97
98     ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
99     ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
100
101
102 class VppApiDynamicMethodHolder(object):
103     pass
104
105
106 class FuncWrapper(object):
107     def __init__(self, func):
108         self._func = func
109         self.__name__ = func.__name__
110         self.__doc__ = func.__doc__
111
112     def __call__(self, **kwargs):
113         return self._func(**kwargs)
114
115     def __repr__(self):
116         return '<FuncWrapper(func=<%s(%s)>)>' % (self.__name__, self.__doc__)
117
118
119 class VPPApiError(Exception):
120     pass
121
122
123 class VPPNotImplementedError(NotImplementedError):
124     pass
125
126
127 class VPPIOError(IOError):
128     pass
129
130
131 class VPPRuntimeError(RuntimeError):
132     pass
133
134
135 class VPPValueError(ValueError):
136     pass
137
138
139 class VPPApiJSONFiles(object):
140     @classmethod
141     def find_api_dir(cls, dirs):
142         """Attempt to find the best directory in which API definition
143         files may reside. If the value VPP_API_DIR exists in the environment
144         then it is first on the search list. If we're inside a recognized
145         location in a VPP source tree (src/scripts and src/vpp-api/python)
146         then entries from there to the likely locations in build-root are
147         added. Finally the location used by system packages is added.
148
149         :returns: A single directory name, or None if no such directory
150             could be found.
151         """
152
153         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
154         # in which case, plot a course to likely places in the src tree
155         import __main__ as main
156         if hasattr(main, '__file__'):
157             # get the path of the calling script
158             localdir = os.path.dirname(os.path.realpath(main.__file__))
159         else:
160             # use cwd if there is no calling script
161             localdir = os.getcwd()
162         localdir_s = localdir.split(os.path.sep)
163
164         def dmatch(dir):
165             """Match dir against right-hand components of the script dir"""
166             d = dir.split('/')  # param 'dir' assumes a / separator
167             length = len(d)
168             return len(localdir_s) > length and localdir_s[-length:] == d
169
170         def sdir(srcdir, variant):
171             """Build a path from srcdir to the staged API files of
172             'variant'  (typically '' or '_debug')"""
173             # Since 'core' and 'plugin' files are staged
174             # in separate directories, we target the parent dir.
175             return os.path.sep.join((
176                 srcdir,
177                 'build-root',
178                 'install-vpp%s-native' % variant,
179                 'vpp',
180                 'share',
181                 'vpp',
182                 'api',
183             ))
184
185         srcdir = None
186         if dmatch('src/scripts'):
187             srcdir = os.path.sep.join(localdir_s[:-2])
188         elif dmatch('src/vpp-api/python'):
189             srcdir = os.path.sep.join(localdir_s[:-3])
190         elif dmatch('test'):
191             # we're apparently running tests
192             srcdir = os.path.sep.join(localdir_s[:-1])
193
194         if srcdir:
195             # we're in the source tree, try both the debug and release
196             # variants.
197             dirs.append(sdir(srcdir, '_debug'))
198             dirs.append(sdir(srcdir, ''))
199
200         # Test for staged copies of the scripts
201         # For these, since we explicitly know if we're running a debug versus
202         # release variant, target only the relevant directory
203         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
204             srcdir = os.path.sep.join(localdir_s[:-4])
205             dirs.append(sdir(srcdir, '_debug'))
206         if dmatch('build-root/install-vpp-native/vpp/bin'):
207             srcdir = os.path.sep.join(localdir_s[:-4])
208             dirs.append(sdir(srcdir, ''))
209
210         # finally, try the location system packages typically install into
211         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
212
213         # check the directories for existence; first one wins
214         for dir in dirs:
215             if os.path.isdir(dir):
216                 return dir
217
218         return None
219
220     @classmethod
221     def find_api_files(cls, api_dir=None, patterns='*'):
222         """Find API definition files from the given directory tree with the
223         given pattern. If no directory is given then find_api_dir() is used
224         to locate one. If no pattern is given then all definition files found
225         in the directory tree are used.
226
227         :param api_dir: A directory tree in which to locate API definition
228             files; subdirectories are descended into.
229             If this is None then find_api_dir() is called to discover it.
230         :param patterns: A list of patterns to use in each visited directory
231             when looking for files.
232             This can be a list/tuple object or a comma-separated string of
233             patterns. Each value in the list will have leading/trialing
234             whitespace stripped.
235             The pattern specifies the first part of the filename, '.api.json'
236             is appended.
237             The results are de-duplicated, thus overlapping patterns are fine.
238             If this is None it defaults to '*' meaning "all API files".
239         :returns: A list of file paths for the API files found.
240         """
241         if api_dir is None:
242             api_dir = cls.find_api_dir([])
243             if api_dir is None:
244                 raise VPPApiError("api_dir cannot be located")
245
246         if isinstance(patterns, list) or isinstance(patterns, tuple):
247             patterns = [p.strip() + '.api.json' for p in patterns]
248         else:
249             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
250
251         api_files = []
252         for root, dirnames, files in os.walk(api_dir):
253             # iterate all given patterns and de-dup the result
254             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
255             for filename in files:
256                 api_files.append(os.path.join(root, filename))
257
258         return api_files
259
260     @classmethod
261     def process_json_file(self, apidef_file):
262         api = json.load(apidef_file)
263         types = {}
264         services = {}
265         messages = {}
266         for t in api['enums']:
267             t[0] = 'vl_api_' + t[0] + '_t'
268             types[t[0]] = {'type': 'enum', 'data': t}
269         for t in api['unions']:
270             t[0] = 'vl_api_' + t[0] + '_t'
271             types[t[0]] = {'type': 'union', 'data': t}
272         for t in api['types']:
273             t[0] = 'vl_api_' + t[0] + '_t'
274             types[t[0]] = {'type': 'type', 'data': t}
275         for t, v in api['aliases'].items():
276             types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
277         services.update(api['services'])
278
279         i = 0
280         while True:
281             unresolved = {}
282             for k, v in types.items():
283                 t = v['data']
284                 if not vpp_get_type(k):
285                     if v['type'] == 'enum':
286                         try:
287                             VPPEnumType(t[0], t[1:])
288                         except ValueError:
289                             unresolved[k] = v
290                     elif v['type'] == 'union':
291                         try:
292                             VPPUnionType(t[0], t[1:])
293                         except ValueError:
294                             unresolved[k] = v
295                     elif v['type'] == 'type':
296                         try:
297                             VPPType(t[0], t[1:])
298                         except ValueError:
299                             unresolved[k] = v
300                     elif v['type'] == 'alias':
301                         try:
302                             VPPTypeAlias(k, t)
303                         except ValueError:
304                             unresolved[k] = v
305             if len(unresolved) == 0:
306                 break
307             if i > 3:
308                 raise VPPValueError('Unresolved type definitions {}'
309                                     .format(unresolved))
310             types = unresolved
311             i += 1
312
313         for m in api['messages']:
314             try:
315                 messages[m[0]] = VPPMessage(m[0], m[1:])
316             except VPPNotImplementedError:
317                 ### OLE FIXME
318                 self.logger.error('Not implemented error for {}'.format(m[0]))
319         return messages, services
320
321
322 class VPPApiClient(object):
323     """VPP interface.
324
325     This class provides the APIs to VPP.  The APIs are loaded
326     from provided .api.json files and makes functions accordingly.
327     These functions are documented in the VPP .api files, as they
328     are dynamically created.
329
330     Additionally, VPP can send callback messages; this class
331     provides a means to register a callback function to receive
332     these messages in a background thread.
333     """
334     apidir = None
335     VPPApiError = VPPApiError
336     VPPRuntimeError = VPPRuntimeError
337     VPPValueError = VPPValueError
338     VPPNotImplementedError = VPPNotImplementedError
339     VPPIOError = VPPIOError
340
341
342     def __init__(self, apifiles=None, testmode=False, async_thread=True,
343                  logger=None, loglevel=None,
344                  read_timeout=5, use_socket=False,
345                  server_address='/run/vpp/api.sock'):
346         """Create a VPP API object.
347
348         apifiles is a list of files containing API
349         descriptions that will be loaded - methods will be
350         dynamically created reflecting these APIs.  If not
351         provided this will load the API files from VPP's
352         default install location.
353
354         logger, if supplied, is the logging logger object to log to.
355         loglevel, if supplied, is the log level this logger is set
356         to report at (from the loglevels in the logging module).
357         """
358         if logger is None:
359             logger = logging.getLogger(
360                 "{}.{}".format(__name__, self.__class__.__name__))
361             if loglevel is not None:
362                 logger.setLevel(loglevel)
363         self.logger = logger
364
365         self.messages = {}
366         self.services = {}
367         self.id_names = []
368         self.id_msgdef = []
369         self.header = VPPType('header', [['u16', 'msgid'],
370                                          ['u32', 'client_index']])
371         self.apifiles = []
372         self.event_callback = None
373         self.message_queue = queue.Queue()
374         self.read_timeout = read_timeout
375         self.async_thread = async_thread
376         self.event_thread = None
377         self.testmode = testmode
378         self.use_socket = use_socket
379         self.server_address = server_address
380         self._apifiles = apifiles
381         self.stats = {}
382
383         if use_socket:
384             from . vpp_transport_socket import VppTransport
385         else:
386             from . vpp_transport_shmem import VppTransport
387
388         if not apifiles:
389             # Pick up API definitions from default directory
390             try:
391                 apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
392             except RuntimeError:
393                 # In test mode we don't care that we can't find the API files
394                 if testmode:
395                     apifiles = []
396                 else:
397                     raise VPPRuntimeError
398
399         for file in apifiles:
400             with open(file) as apidef_file:
401                 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
402                 self.messages.update(m)
403                 self.services.update(s)
404
405         self.apifiles = apifiles
406
407         # Basic sanity check
408         if len(self.messages) == 0 and not testmode:
409             raise VPPValueError(1, 'Missing JSON message definitions')
410         if not(verify_enum_hint(VppEnum.vl_api_address_family_t)):
411             raise VPPRuntimeError("Invalid address family hints. "
412                                   "Cannot continue.")
413
414         self.transport = VppTransport(self, read_timeout=read_timeout,
415                                       server_address=server_address)
416         # Make sure we allow VPP to clean up the message rings.
417         atexit.register(vpp_atexit, weakref.ref(self))
418
419         add_convenience_methods()
420
421     def get_function(self, name):
422         return getattr(self._api, name)
423
424     class ContextId(object):
425         """Multiprocessing-safe provider of unique context IDs."""
426         def __init__(self):
427             self.context = mp.Value(ctypes.c_uint, 0)
428             self.lock = mp.Lock()
429
430         def __call__(self):
431             """Get a new unique (or, at least, not recently used) context."""
432             with self.lock:
433                 self.context.value += 1
434                 return self.context.value
435     get_context = ContextId()
436
437     def get_type(self, name):
438         return vpp_get_type(name)
439
440     @property
441     def api(self):
442         if not hasattr(self, "_api"):
443             raise VPPApiError("Not connected, api definitions not available")
444         return self._api
445
446     def make_function(self, msg, i, multipart, do_async):
447         if (do_async):
448             def f(**kwargs):
449                 return self._call_vpp_async(i, msg, **kwargs)
450         else:
451             def f(**kwargs):
452                 return self._call_vpp(i, msg, multipart, **kwargs)
453
454         f.__name__ = str(msg.name)
455         f.__doc__ = ", ".join(["%s %s" %
456                                (msg.fieldtypes[j], k)
457                                for j, k in enumerate(msg.fields)])
458         f.msg = msg
459
460         return f
461
462     def _register_functions(self, do_async=False):
463         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
464         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
465         self._api = VppApiDynamicMethodHolder()
466         for name, msg in vpp_iterator(self.messages):
467             n = name + '_' + msg.crc[2:]
468             i = self.transport.get_msg_index(n)
469             if i > 0:
470                 self.id_msgdef[i] = msg
471                 self.id_names[i] = name
472
473                 # Create function for client side messages.
474                 if name in self.services:
475                     f = self.make_function(msg, i, self.services[name], do_async)
476                     setattr(self._api, name, FuncWrapper(f))
477             else:
478                 self.logger.debug(
479                     'No such message type or failed CRC checksum: %s', n)
480
481     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
482                          do_async):
483         pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
484
485         rv = self.transport.connect(name, pfx,
486                                     msg_handler, rx_qlen)
487         if rv != 0:
488             raise VPPIOError(2, 'Connect failed')
489         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
490         self._register_functions(do_async=do_async)
491
492         # Initialise control ping
493         crc = self.messages['control_ping'].crc
494         self.control_ping_index = self.transport.get_msg_index(
495             ('control_ping' + '_' + crc[2:]))
496         self.control_ping_msgdef = self.messages['control_ping']
497         if self.async_thread:
498             self.event_thread = threading.Thread(
499                 target=self.thread_msg_handler)
500             self.event_thread.daemon = True
501             self.event_thread.start()
502         else:
503             self.event_thread = None
504         return rv
505
506     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
507         """Attach to VPP.
508
509         name - the name of the client.
510         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
511         do_async - if true, messages are sent without waiting for a reply
512         rx_qlen - the length of the VPP message receive queue between
513         client and server.
514         """
515         msg_handler = self.transport.get_callback(do_async)
516         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
517                                      do_async)
518
519     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
520         """Attach to VPP in synchronous mode. Application must poll for events.
521
522         name - the name of the client.
523         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
524         rx_qlen - the length of the VPP message receive queue between
525         client and server.
526         """
527
528         return self.connect_internal(name, None, chroot_prefix, rx_qlen,
529                                      do_async=False)
530
531     def disconnect(self):
532         """Detach from VPP."""
533         rv = self.transport.disconnect()
534         if self.event_thread is not None:
535             self.message_queue.put("terminate event thread")
536         return rv
537
538     def msg_handler_sync(self, msg):
539         """Process an incoming message from VPP in sync mode.
540
541         The message may be a reply or it may be an async notification.
542         """
543         r = self.decode_incoming_msg(msg)
544         if r is None:
545             return
546
547         # If we have a context, then use the context to find any
548         # request waiting for a reply
549         context = 0
550         if hasattr(r, 'context') and r.context > 0:
551             context = r.context
552
553         if context == 0:
554             # No context -> async notification that we feed to the callback
555             self.message_queue.put_nowait(r)
556         else:
557             raise VPPIOError(2, 'RPC reply message received in event handler')
558
559     def has_context(self, msg):
560         if len(msg) < 10:
561             return False
562
563         header = VPPType('header_with_context', [['u16', 'msgid'],
564                                                  ['u32', 'client_index'],
565                                                  ['u32', 'context']])
566
567         (i, ci, context), size = header.unpack(msg, 0)
568         if self.id_names[i] == 'rx_thread_exit':
569             return
570
571         #
572         # Decode message and returns a tuple.
573         #
574         msgobj = self.id_msgdef[i]
575         if 'context' in msgobj.field_by_name and context >= 0:
576             return True
577         return False
578
579     def decode_incoming_msg(self, msg, no_type_conversion=False):
580         if not msg:
581             self.logger.warning('vpp_api.read failed')
582             return
583
584         (i, ci), size = self.header.unpack(msg, 0)
585         if self.id_names[i] == 'rx_thread_exit':
586             return
587
588         #
589         # Decode message and returns a tuple.
590         #
591         msgobj = self.id_msgdef[i]
592         if not msgobj:
593             raise VPPIOError(2, 'Reply message undefined')
594
595         r, size = msgobj.unpack(msg, ntc=no_type_conversion)
596         return r
597
598     def msg_handler_async(self, msg):
599         """Process a message from VPP in async mode.
600
601         In async mode, all messages are returned to the callback.
602         """
603         r = self.decode_incoming_msg(msg)
604         if r is None:
605             return
606
607         msgname = type(r).__name__
608
609         if self.event_callback:
610             self.event_callback(msgname, r)
611
612     def _control_ping(self, context):
613         """Send a ping command."""
614         self._call_vpp_async(self.control_ping_index,
615                              self.control_ping_msgdef,
616                              context=context)
617
618     def validate_args(self, msg, kwargs):
619         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
620         if d:
621             raise VPPValueError('Invalid argument {} to {}'
622                                 .format(list(d), msg.name))
623
624     def _add_stat(self, name, ms):
625         if not name in self.stats:
626             self.stats[name] = {'max': ms, 'count': 1, 'avg': ms}
627         else:
628             if ms > self.stats[name]['max']:
629                 self.stats[name]['max'] = ms
630             self.stats[name]['count'] += 1
631             n = self.stats[name]['count']
632             self.stats[name]['avg'] = self.stats[name]['avg'] * (n - 1) / n + ms / n
633
634     def get_stats(self):
635         s = '\n=== API PAPI STATISTICS ===\n'
636         s += '{:<30} {:>4} {:>6} {:>6}\n'.format('message', 'cnt', 'avg', 'max')
637         for n in sorted(self.stats.items(), key=lambda v: v[1]['avg'], reverse=True):
638             s += '{:<30} {:>4} {:>6.2f} {:>6.2f}\n'.format(n[0], n[1]['count'],
639                                                            n[1]['avg'], n[1]['max'])
640         return s
641
642     def _call_vpp(self, i, msgdef, service, **kwargs):
643         """Given a message, send the message and await a reply.
644
645         msgdef - the message packing definition
646         i - the message type index
647         multipart - True if the message returns multiple
648         messages in return.
649         context - context number - chosen at random if not
650         supplied.
651         The remainder of the kwargs are the arguments to the API call.
652
653         The return value is the message or message array containing
654         the response.  It will raise an IOError exception if there was
655         no response within the timeout window.
656         """
657         ts = time.time()
658         if 'context' not in kwargs:
659             context = self.get_context()
660             kwargs['context'] = context
661         else:
662             context = kwargs['context']
663         kwargs['_vl_msg_id'] = i
664
665         no_type_conversion = kwargs.pop('_no_type_conversion', False)
666         timeout = kwargs.pop('_timeout', None)
667
668         try:
669             if self.transport.socket_index:
670                 kwargs['client_index'] = self.transport.socket_index
671         except AttributeError:
672             pass
673         self.validate_args(msgdef, kwargs)
674
675         s = 'Calling {}({})'.format(msgdef.name,
676             ','.join(['{!r}:{!r}'.format(k, v) for k, v in kwargs.items()]))
677         self.logger.debug(s)
678
679         b = msgdef.pack(kwargs)
680         self.transport.suspend()
681
682         self.transport.write(b)
683
684         msgreply = service['reply']
685         stream = True if 'stream' in service else False
686         if stream:
687             if 'stream_msg' in service:
688                 # New service['reply'] = _reply and service['stream_message'] = _details
689                 stream_message = service['stream_msg']
690                 modern =True
691             else:
692                 # Old  service['reply'] = _details
693                 stream_message = msgreply
694                 msgreply = 'control_ping_reply'
695                 modern = False
696                 # Send a ping after the request - we use its response
697                 # to detect that we have seen all results.
698                 self._control_ping(context)
699
700         # Block until we get a reply.
701         rl = []
702         while (True):
703             r = self.read_blocking(no_type_conversion, timeout)
704             if r is None:
705                 raise VPPIOError(2, 'VPP API client: read failed')
706             msgname = type(r).__name__
707             if context not in r or r.context == 0 or context != r.context:
708                 # Message being queued
709                 self.message_queue.put_nowait(r)
710                 continue
711             if msgname != msgreply and (stream and (msgname != stream_message)):
712                 print('REPLY MISMATCH', msgreply, msgname, stream_message, stream)
713             if not stream:
714                 rl = r
715                 break
716             if msgname == msgreply:
717                 if modern: # Return both reply and list
718                     rl = r, rl
719                 break
720
721             rl.append(r)
722
723         self.transport.resume()
724
725         s = 'Return value: {!r}'.format(r)
726         if len(s) > 80:
727             s = s[:80] + "..."
728         self.logger.debug(s)
729         te = time.time()
730         self._add_stat(msgdef.name, (te - ts) * 1000)
731         return rl
732
733     def _call_vpp_async(self, i, msg, **kwargs):
734         """Given a message, send the message and return the context.
735
736         msgdef - the message packing definition
737         i - the message type index
738         context - context number - chosen at random if not
739         supplied.
740         The remainder of the kwargs are the arguments to the API call.
741
742         The reply message(s) will be delivered later to the registered callback.
743         The returned context will help with assigning which call
744         the reply belongs to.
745         """
746         if 'context' not in kwargs:
747             context = self.get_context()
748             kwargs['context'] = context
749         else:
750             context = kwargs['context']
751         try:
752             if self.transport.socket_index:
753                 kwargs['client_index'] = self.transport.socket_index
754         except AttributeError:
755             kwargs['client_index'] = 0
756         kwargs['_vl_msg_id'] = i
757         b = msg.pack(kwargs)
758
759         self.transport.write(b)
760         return context
761
762     def read_blocking(self, no_type_conversion=False, timeout=None):
763         """Get next received message from transport within timeout, decoded.
764
765         Note that notifications have context zero
766         and are not put into receive queue (at least for socket transport),
767         use async_thread with registered callback for processing them.
768
769         If no message appears in the queue within timeout, return None.
770
771         Optionally, type conversion can be skipped,
772         as some of conversions are into less precise types.
773
774         When r is the return value of this, the caller can get message name as:
775             msgname = type(r).__name__
776         and context number (type long) as:
777             context = r.context
778
779         :param no_type_conversion: If false, type conversions are applied.
780         :type no_type_conversion: bool
781         :returns: Decoded message, or None if no message (within timeout).
782         :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
783         :raises VppTransportShmemIOError if timed out.
784         """
785         msg = self.transport.read(timeout=timeout)
786         if not msg:
787             return None
788         return self.decode_incoming_msg(msg, no_type_conversion)
789
790     def register_event_callback(self, callback):
791         """Register a callback for async messages.
792
793         This will be called for async notifications in sync mode,
794         and all messages in async mode.  In sync mode, replies to
795         requests will not come here.
796
797         callback is a fn(msg_type_name, msg_type) that will be
798         called when a message comes in.  While this function is
799         executing, note that (a) you are in a background thread and
800         may wish to use threading.Lock to protect your datastructures,
801         and (b) message processing from VPP will stop (so if you take
802         a long while about it you may provoke reply timeouts or cause
803         VPP to fill the RX buffer).  Passing None will disable the
804         callback.
805         """
806         self.event_callback = callback
807
808     def thread_msg_handler(self):
809         """Python thread calling the user registered message handler.
810
811         This is to emulate the old style event callback scheme. Modern
812         clients should provide their own thread to poll the event
813         queue.
814         """
815         while True:
816             r = self.message_queue.get()
817             if r == "terminate event thread":
818                 break
819             msgname = type(r).__name__
820             if self.event_callback:
821                 self.event_callback(msgname, r)
822
823     def validate_message_table(self, namecrctable):
824         """Take a dictionary of name_crc message names
825         and returns an array of missing messages"""
826
827         missing_table = []
828         for name_crc in namecrctable:
829             i = self.transport.get_msg_index(name_crc)
830             if i <= 0:
831                 missing_table.append(name_crc)
832         return missing_table
833
834     def dump_message_table(self):
835         """Return VPPs API message table as name_crc dictionary"""
836         return self.transport.message_table
837
838     def dump_message_table_filtered(self, msglist):
839         """Return VPPs API message table as name_crc dictionary,
840         filtered by message name list."""
841
842         replies = [self.services[n]['reply'] for n in msglist]
843         message_table_filtered = {}
844         for name in msglist + replies:
845             for k,v in self.transport.message_table.items():
846                 if k.startswith(name):
847                     message_table_filtered[k] = v
848                     break
849         return message_table_filtered
850
851     def __repr__(self):
852         return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
853                "logger=%s, read_timeout=%s, use_socket=%s, " \
854                "server_address='%s'>" % (
855                    self._apifiles, self.testmode, self.async_thread,
856                    self.logger, self.read_timeout, self.use_socket,
857                    self.server_address)
858
859     def details_iter(self, f, **kwargs):
860         cursor = 0
861         while True:
862             kwargs['cursor'] = cursor
863             rv, details = f(**kwargs)
864             #
865             # Convert to yield from details when we only support python 3
866             #
867             for d in details:
868                 yield d
869             if rv.retval == 0 or rv.retval != -165:
870                 break
871             cursor = rv.cursor
872
873 # Provide the old name for backward compatibility.
874 VPP = VPPApiClient
875
876 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4