VPP-1144: PAPI Import leads to duplicate type definition.
[vpp.git] / src / vpp-api / python / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 import sys
19 import os
20 import logging
21 import collections
22 import struct
23 import json
24 import threading
25 import fnmatch
26 import atexit
27 from cffi import FFI
28 import cffi
29
30 if sys.version[0] == '2':
31     import Queue as queue
32 else:
33     import queue as queue
34
35 ffi = FFI()
36 ffi.cdef("""
37 typedef void (*vac_callback_t)(unsigned char * data, int len);
38 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
39 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
40     int rx_qlen);
41 int vac_disconnect(void);
42 int vac_read(char **data, int *l, unsigned short timeout);
43 int vac_write(char *data, int len);
44 void vac_free(void * msg);
45
46 int vac_get_msg_index(unsigned char * name);
47 int vac_msg_table_size(void);
48 int vac_msg_table_max_index(void);
49
50 void vac_rx_suspend (void);
51 void vac_rx_resume (void);
52 void vac_set_error_handler(vac_error_callback_t);
53  """)
54
55 # Barfs on failure, no need to check success.
56 vpp_api = ffi.dlopen('libvppapiclient.so')
57
58 def vpp_atexit(self):
59     """Clean up VPP connection on shutdown."""
60     if self.connected:
61         self.logger.debug('Cleaning up VPP on exit')
62         self.disconnect()
63
64 vpp_object = None
65
66
67 def vpp_iterator(d):
68     if sys.version[0] == '2':
69         return d.iteritems()
70     else:
71         return d.items()
72
73
74 @ffi.callback("void(unsigned char *, int)")
75 def vac_callback_sync(data, len):
76     vpp_object.msg_handler_sync(ffi.buffer(data, len))
77
78
79 @ffi.callback("void(unsigned char *, int)")
80 def vac_callback_async(data, len):
81     vpp_object.msg_handler_async(ffi.buffer(data, len))
82
83
84 @ffi.callback("void(void *, unsigned char *, int)")
85 def vac_error_handler(arg, msg, msg_len):
86     vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
87
88
89 class Empty(object):
90     pass
91
92
93 class FuncWrapper(object):
94     def __init__(self, func):
95         self._func = func
96         self.__name__ = func.__name__
97
98     def __call__(self, **kwargs):
99         return self._func(**kwargs)
100
101
102 class VPP():
103     """VPP interface.
104
105     This class provides the APIs to VPP.  The APIs are loaded
106     from provided .api.json files and makes functions accordingly.
107     These functions are documented in the VPP .api files, as they
108     are dynamically created.
109
110     Additionally, VPP can send callback messages; this class
111     provides a means to register a callback function to receive
112     these messages in a background thread.
113     """
114     def __init__(self, apifiles=None, testmode=False, async_thread=True,
115                  logger=logging.getLogger('vpp_papi'), loglevel='debug', read_timeout=0):
116         """Create a VPP API object.
117
118         apifiles is a list of files containing API
119         descriptions that will be loaded - methods will be
120         dynamically created reflecting these APIs.  If not
121         provided this will load the API files from VPP's
122         default install location.
123         """
124         global vpp_object
125         vpp_object = self
126         self.logger = logger
127         logging.basicConfig(level=getattr(logging, loglevel.upper()))
128
129         self.messages = {}
130         self.id_names = []
131         self.id_msgdef = []
132         self.connected = False
133         self.header = struct.Struct('>HI')
134         self.apifiles = []
135         self.event_callback = None
136         self.message_queue = queue.Queue()
137         self.read_timeout = read_timeout
138         self.vpp_api = vpp_api
139         if async_thread:
140             self.event_thread = threading.Thread(
141                 target=self.thread_msg_handler)
142             self.event_thread.daemon = True
143             self.event_thread.start()
144
145         if not apifiles:
146             # Pick up API definitions from default directory
147             try:
148                 apifiles = self.find_api_files()
149             except RuntimeError:
150                 # In test mode we don't care that we can't find the API files
151                 if testmode:
152                     apifiles = []
153                 else:
154                     raise
155
156         for file in apifiles:
157             with open(file) as apidef_file:
158                 api = json.load(apidef_file)
159                 for t in api['types']:
160                     self.add_type(t[0], t[1:])
161
162                 for m in api['messages']:
163                     self.add_message(m[0], m[1:])
164         self.apifiles = apifiles
165
166         # Basic sanity check
167         if len(self.messages) == 0 and not testmode:
168             raise ValueError(1, 'Missing JSON message definitions')
169
170         # Make sure we allow VPP to clean up the message rings.
171         atexit.register(vpp_atexit, self)
172
173         # Register error handler
174         vpp_api.vac_set_error_handler(vac_error_handler)
175
176         # Support legacy CFFI
177         # from_buffer supported from 1.8.0
178         (major, minor, patch) = [int(s) for s in cffi.__version__.split('.', 3)]
179         if major >= 1 and minor >= 8:
180             self._write = self._write_new_cffi
181         else:
182             self._write = self._write_legacy_cffi
183
184     class ContextId(object):
185         """Thread-safe provider of unique context IDs."""
186         def __init__(self):
187             self.context = 0
188             self.lock = threading.Lock()
189
190         def __call__(self):
191             """Get a new unique (or, at least, not recently used) context."""
192             with self.lock:
193                 self.context += 1
194                 return self.context
195     get_context = ContextId()
196
197     @classmethod
198     def find_api_dir(cls):
199         """Attempt to find the best directory in which API definition
200         files may reside. If the value VPP_API_DIR exists in the environment
201         then it is first on the search list. If we're inside a recognized
202         location in a VPP source tree (src/scripts and src/vpp-api/python)
203         then entries from there to the likely locations in build-root are
204         added. Finally the location used by system packages is added.
205
206         :returns: A single directory name, or None if no such directory
207             could be found.
208         """
209         dirs = []
210
211         if 'VPP_API_DIR' in os.environ:
212             dirs.append(os.environ['VPP_API_DIR'])
213
214         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
215         # in which case, plot a course to likely places in the src tree
216         import __main__ as main
217         if hasattr(main, '__file__'):
218             # get the path of the calling script
219             localdir = os.path.dirname(os.path.realpath(main.__file__))
220         else:
221             # use cwd if there is no calling script
222             localdir = os.cwd()
223         localdir_s = localdir.split(os.path.sep)
224
225         def dmatch(dir):
226             """Match dir against right-hand components of the script dir"""
227             d = dir.split('/')  # param 'dir' assumes a / separator
228             l = len(d)
229             return len(localdir_s) > l and localdir_s[-l:] == d
230
231         def sdir(srcdir, variant):
232             """Build a path from srcdir to the staged API files of
233             'variant'  (typically '' or '_debug')"""
234             # Since 'core' and 'plugin' files are staged
235             # in separate directories, we target the parent dir.
236             return os.path.sep.join((
237                 srcdir,
238                 'build-root',
239                 'install-vpp%s-native' % variant,
240                 'vpp',
241                 'share',
242                 'vpp',
243                 'api',
244             ))
245
246         srcdir = None
247         if dmatch('src/scripts'):
248             srcdir = os.path.sep.join(localdir_s[:-2])
249         elif dmatch('src/vpp-api/python'):
250             srcdir = os.path.sep.join(localdir_s[:-3])
251         elif dmatch('test'):
252             # we're apparently running tests
253             srcdir = os.path.sep.join(localdir_s[:-1])
254
255         if srcdir:
256             # we're in the source tree, try both the debug and release
257             # variants.
258             x = 'vpp/share/vpp/api'
259             dirs.append(sdir(srcdir, '_debug'))
260             dirs.append(sdir(srcdir, ''))
261
262         # Test for staged copies of the scripts
263         # For these, since we explicitly know if we're running a debug versus
264         # release variant, target only the relevant directory
265         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
266             srcdir = os.path.sep.join(localdir_s[:-4])
267             dirs.append(sdir(srcdir, '_debug'))
268         if dmatch('build-root/install-vpp-native/vpp/bin'):
269             srcdir = os.path.sep.join(localdir_s[:-4])
270             dirs.append(sdir(srcdir, ''))
271
272         # finally, try the location system packages typically install into
273         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
274
275         # check the directories for existance; first one wins
276         for dir in dirs:
277             if os.path.isdir(dir):
278                 return dir
279
280         return None
281
282     @classmethod
283     def find_api_files(cls, api_dir=None, patterns='*'):
284         """Find API definition files from the given directory tree with the
285         given pattern. If no directory is given then find_api_dir() is used
286         to locate one. If no pattern is given then all definition files found
287         in the directory tree are used.
288
289         :param api_dir: A directory tree in which to locate API definition
290             files; subdirectories are descended into.
291             If this is None then find_api_dir() is called to discover it.
292         :param patterns: A list of patterns to use in each visited directory
293             when looking for files.
294             This can be a list/tuple object or a comma-separated string of
295             patterns. Each value in the list will have leading/trialing
296             whitespace stripped.
297             The pattern specifies the first part of the filename, '.api.json'
298             is appended.
299             The results are de-duplicated, thus overlapping patterns are fine.
300             If this is None it defaults to '*' meaning "all API files".
301         :returns: A list of file paths for the API files found.
302         """
303         if api_dir is None:
304             api_dir = cls.find_api_dir()
305             if api_dir is None:
306                 raise RuntimeError("api_dir cannot be located")
307
308         if isinstance(patterns, list) or isinstance(patterns, tuple):
309             patterns = [p.strip() + '.api.json' for p in patterns]
310         else:
311             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
312
313         api_files = []
314         for root, dirnames, files in os.walk(api_dir):
315             # iterate all given patterns and de-dup the result
316             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
317             for filename in files:
318                 api_files.append(os.path.join(root, filename))
319
320         return api_files
321
322     def status(self):
323         """Debug function: report current VPP API status to stdout."""
324         print('Connected') if self.connected else print('Not Connected')
325         print('Read API definitions from', ', '.join(self.apifiles))
326
327     def __struct(self, t, n=None, e=-1, vl=None):
328         """Create a packing structure for a message."""
329         base_types = {'u8': 'B',
330                       'u16': 'H',
331                       'u32': 'I',
332                       'i32': 'i',
333                       'u64': 'Q',
334                       'f64': 'd', }
335         pack = None
336         if t in base_types:
337             pack = base_types[t]
338             if not vl:
339                 if e > 0 and t == 'u8':
340                     # Fixed byte array
341                     s = struct.Struct('>' + str(e) + 's')
342                     return s.size, s
343                 if e > 0:
344                     # Fixed array of base type
345                     s = struct.Struct('>' + base_types[t])
346                     return s.size, [e, s]
347                 elif e == 0:
348                     # Old style variable array
349                     s = struct.Struct('>' + base_types[t])
350                     return s.size, [-1, s]
351             else:
352                 # Variable length array
353                 if t == 'u8':
354                     s = struct.Struct('>s')
355                     return s.size, [vl, s]
356                 else:
357                     s = struct.Struct('>' + base_types[t])
358                 return s.size, [vl, s]
359
360             s = struct.Struct('>' + base_types[t])
361             return s.size, s
362
363         if t in self.messages:
364             size = self.messages[t]['sizes'][0]
365
366             # Return a list in case of array
367             if e > 0 and not vl:
368                 return size, [e, lambda self, encode, buf, offset, args: (
369                     self.__struct_type(encode, self.messages[t], buf, offset,
370                                        args))]
371             if vl:
372                 return size, [vl, lambda self, encode, buf, offset, args: (
373                     self.__struct_type(encode, self.messages[t], buf, offset,
374                                        args))]
375             elif e == 0:
376                 # Old style VLA
377                 raise NotImplementedError(1,
378                                           'No support for compound types ' + t)
379             return size, lambda self, encode, buf, offset, args: (
380                 self.__struct_type(encode, self.messages[t], buf, offset, args)
381             )
382
383         raise ValueError(1, 'Invalid message type: ' + t)
384
385     def __struct_type(self, encode, msgdef, buf, offset, kwargs):
386         """Get a message packer or unpacker."""
387         if encode:
388             return self.__struct_type_encode(msgdef, buf, offset, kwargs)
389         else:
390             return self.__struct_type_decode(msgdef, buf, offset)
391
392     def __struct_type_encode(self, msgdef, buf, offset, kwargs):
393         off = offset
394         size = 0
395
396         for k in kwargs:
397             if k not in msgdef['args']:
398                 raise ValueError(1,'Non existing argument [' + k + ']' + \
399                                  ' used in call to: ' + \
400                                  self.id_names[kwargs['_vl_msg_id']] + '()' )
401
402         for k, v in vpp_iterator(msgdef['args']):
403             off += size
404             if k in kwargs:
405                 if type(v) is list:
406                     if callable(v[1]):
407                         e = kwargs[v[0]] if v[0] in kwargs else v[0]
408                         if e != len(kwargs[k]):
409                             raise (ValueError(1, 'Input list length mismatch: %s (%s != %s)' %  (k, e, len(kwargs[k]))))
410                         size = 0
411                         for i in range(e):
412                             size += v[1](self, True, buf, off + size,
413                                          kwargs[k][i])
414                     else:
415                         if v[0] in kwargs:
416                             l = kwargs[v[0]]
417                             if l != len(kwargs[k]):
418                                 raise ValueError(1, 'Input list length mismatch: %s (%s != %s)' % (k, l, len(kwargs[k])))
419                         else:
420                             l = len(kwargs[k])
421                         if v[1].size == 1:
422                             buf[off:off + l] = bytearray(kwargs[k])
423                             size = l
424                         else:
425                             size = 0
426                             for i in kwargs[k]:
427                                 v[1].pack_into(buf, off + size, i)
428                                 size += v[1].size
429                 else:
430                     if callable(v):
431                         size = v(self, True, buf, off, kwargs[k])
432                     else:
433                         if type(kwargs[k]) is str and v.size < len(kwargs[k]):
434                             raise ValueError(1, 'Input list length mismatch: %s (%s < %s)' % (k, v.size, len(kwargs[k])))
435                         v.pack_into(buf, off, kwargs[k])
436                         size = v.size
437             else:
438                 size = v.size if not type(v) is list else 0
439
440         return off + size - offset
441
442     def __getitem__(self, name):
443         if name in self.messages:
444             return self.messages[name]
445         return None
446
447     def get_size(self, sizes, kwargs):
448         total_size = sizes[0]
449         for e in sizes[1]:
450             if e in kwargs and type(kwargs[e]) is list:
451                 total_size += len(kwargs[e]) * sizes[1][e]
452         return total_size
453
454     def encode(self, msgdef, kwargs):
455         # Make suitably large buffer
456         size = self.get_size(msgdef['sizes'], kwargs)
457         buf = bytearray(size)
458         offset = 0
459         size = self.__struct_type(True, msgdef, buf, offset, kwargs)
460         return buf[:offset + size]
461
462     def decode(self, msgdef, buf):
463         return self.__struct_type(False, msgdef, buf, 0, None)[1]
464
465     def __struct_type_decode(self, msgdef, buf, offset):
466         res = []
467         off = offset
468         size = 0
469         for k, v in vpp_iterator(msgdef['args']):
470             off += size
471             if type(v) is list:
472                 lst = []
473                 if callable(v[1]):  # compound type
474                     size = 0
475                     if v[0] in msgdef['args']:  # vla
476                         e = res[v[2]]
477                     else:  # fixed array
478                         e = v[0]
479                     res.append(lst)
480                     for i in range(e):
481                         (s, l) = v[1](self, False, buf, off + size, None)
482                         lst.append(l)
483                         size += s
484                     continue
485                 if v[1].size == 1:
486                     if type(v[0]) is int:
487                         size = len(buf) - off
488                     else:
489                         size = res[v[2]]
490                     res.append(buf[off:off + size])
491                 else:
492                     e = v[0] if type(v[0]) is int else res[v[2]]
493                     if e == -1:
494                         e = (len(buf) - off) / v[1].size
495                     lst = []
496                     res.append(lst)
497                     size = 0
498                     for i in range(e):
499                         lst.append(v[1].unpack_from(buf, off + size)[0])
500                         size += v[1].size
501             else:
502                 if callable(v):
503                     size = 0
504                     (s, l) = v(self, False, buf, off, None)
505                     res.append(l)
506                     size += s
507                 else:
508                     res.append(v.unpack_from(buf, off)[0])
509                     size = v.size
510
511         return off + size - offset, msgdef['return_tuple']._make(res)
512
513     def ret_tup(self, name):
514         if name in self.messages and 'return_tuple' in self.messages[name]:
515             return self.messages[name]['return_tuple']
516         return None
517
518     def duplicate_check_ok(self, name, msgdef):
519         crc = None
520         for c in msgdef:
521             if type(c) is dict and 'crc' in c:
522                 crc = c['crc']
523                 break
524         if crc:
525             # We can get duplicates because of imports
526             if crc == self.messages[name]['crc']:
527                 return True
528         return False
529
530     def add_message(self, name, msgdef, typeonly=False):
531         if name in self.messages:
532             if typeonly:
533                 if self.duplicate_check_ok(name, msgdef):
534                     return
535             raise ValueError('Duplicate message name: ' + name)
536
537         args = collections.OrderedDict()
538         argtypes = collections.OrderedDict()
539         fields = []
540         msg = {}
541         total_size = 0
542         sizes = {}
543         for i, f in enumerate(msgdef):
544             if type(f) is dict and 'crc' in f:
545                 msg['crc'] = f['crc']
546                 continue
547             field_type = f[0]
548             field_name = f[1]
549             if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
550                 raise ValueError('Variable Length Array must be last: ' + name)
551             size, s = self.__struct(*f)
552             args[field_name] = s
553             if type(s) == list and type(s[0]) == int and type(s[1]) == struct.Struct:
554                 if s[0] < 0:
555                     sizes[field_name] = size
556                 else:
557                     sizes[field_name] = size
558                     total_size += s[0] * size
559             else:
560                 sizes[field_name] = size
561                 total_size += size
562
563             argtypes[field_name] = field_type
564             if len(f) == 4:  # Find offset to # elements field
565                 idx = list(args.keys()).index(f[3]) - i
566                 args[field_name].append(idx)
567             fields.append(field_name)
568         msg['return_tuple'] = collections.namedtuple(name, fields,
569                                                      rename=True)
570         self.messages[name] = msg
571         self.messages[name]['args'] = args
572         self.messages[name]['argtypes'] = argtypes
573         self.messages[name]['typeonly'] = typeonly
574         self.messages[name]['sizes'] = [total_size, sizes]
575         return self.messages[name]
576
577     def add_type(self, name, typedef):
578         return self.add_message('vl_api_' + name + '_t', typedef,
579                                 typeonly=True)
580
581     def make_function(self, name, i, msgdef, multipart, async):
582         if (async):
583             f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
584         else:
585             f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart,
586                                                  **kwargs))
587         args = self.messages[name]['args']
588         argtypes = self.messages[name]['argtypes']
589         f.__name__ = str(name)
590         f.__doc__ = ", ".join(["%s %s" %
591                                (argtypes[k], k) for k in args.keys()])
592         return f
593
594     @property
595     def api(self):
596         if not hasattr(self, "_api"):
597             raise Exception("Not connected, api definitions not available")
598         return self._api
599
600     def _register_functions(self, async=False):
601         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
602         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
603         self._api = Empty()
604         for name, msgdef in vpp_iterator(self.messages):
605             if self.messages[name]['typeonly']:
606                 continue
607             crc = self.messages[name]['crc']
608             n = name + '_' + crc[2:]
609             i = vpp_api.vac_get_msg_index(n.encode())
610             if i > 0:
611                 self.id_msgdef[i] = msgdef
612                 self.id_names[i] = name
613                 multipart = True if name.find('_dump') > 0 else False
614                 f = self.make_function(name, i, msgdef, multipart, async)
615                 setattr(self._api, name, FuncWrapper(f))
616
617                 # old API stuff starts here - will be removed in 17.07
618                 if hasattr(self, name):
619                     raise NameError(
620                         3, "Conflicting name in JSON definition: `%s'" % name)
621                 setattr(self, name, f)
622                 # old API stuff ends here
623             else:
624                 self.logger.debug(
625                     'No such message type or failed CRC checksum: %s', n)
626
627     def _write_new_cffi(self, buf):
628         """Send a binary-packed message to VPP."""
629         if not self.connected:
630             raise IOError(1, 'Not connected')
631         return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
632
633     def _write_legacy_cffi(self, buf):
634         """Send a binary-packed message to VPP."""
635         if not self.connected:
636             raise IOError(1, 'Not connected')
637         return vpp_api.vac_write(str(buf), len(buf))
638
639     def _read(self):
640         if not self.connected:
641             raise IOError(1, 'Not connected')
642         mem = ffi.new("char **")
643         size = ffi.new("int *")
644         rv = vpp_api.vac_read(mem, size, self.read_timeout)
645         if rv:
646             raise IOError(rv, 'vac_read failed')
647         msg = bytes(ffi.buffer(mem[0], size[0]))
648         vpp_api.vac_free(mem[0])
649         return msg
650
651     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
652                          async):
653         pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
654         rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
655         if rv != 0:
656             raise IOError(2, 'Connect failed')
657         self.connected = True
658
659         self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
660         self._register_functions(async=async)
661
662         # Initialise control ping
663         crc = self.messages['control_ping']['crc']
664         self.control_ping_index = vpp_api.vac_get_msg_index(
665             ('control_ping' + '_' + crc[2:]).encode())
666         self.control_ping_msgdef = self.messages['control_ping']
667         return rv
668
669     def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
670         """Attach to VPP.
671
672         name - the name of the client.
673         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
674         async - if true, messages are sent without waiting for a reply
675         rx_qlen - the length of the VPP message receive queue between
676         client and server.
677         """
678         msg_handler = vac_callback_sync if not async else vac_callback_async
679         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
680                                      async)
681
682     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
683         """Attach to VPP in synchronous mode. Application must poll for events.
684
685         name - the name of the client.
686         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
687         rx_qlen - the length of the VPP message receive queue between
688         client and server.
689         """
690
691         return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
692                                      async=False)
693
694     def disconnect(self):
695         """Detach from VPP."""
696         rv = vpp_api.vac_disconnect()
697         self.connected = False
698         return rv
699
700     def msg_handler_sync(self, msg):
701         """Process an incoming message from VPP in sync mode.
702
703         The message may be a reply or it may be an async notification.
704         """
705         r = self.decode_incoming_msg(msg)
706         if r is None:
707             return
708
709         # If we have a context, then use the context to find any
710         # request waiting for a reply
711         context = 0
712         if hasattr(r, 'context') and r.context > 0:
713             context = r.context
714
715         msgname = type(r).__name__
716
717         if context == 0:
718             # No context -> async notification that we feed to the callback
719             self.message_queue.put_nowait(r)
720         else:
721             raise IOError(2, 'RPC reply message received in event handler')
722
723     def decode_incoming_msg(self, msg):
724         if not msg:
725             self.logger.warning('vpp_api.read failed')
726             return
727
728         i, ci = self.header.unpack_from(msg, 0)
729         if self.id_names[i] == 'rx_thread_exit':
730             return
731
732         #
733         # Decode message and returns a tuple.
734         #
735         msgdef = self.id_msgdef[i]
736         if not msgdef:
737             raise IOError(2, 'Reply message undefined')
738
739         r = self.decode(msgdef, msg)
740
741         return r
742
743     def msg_handler_async(self, msg):
744         """Process a message from VPP in async mode.
745
746         In async mode, all messages are returned to the callback.
747         """
748         r = self.decode_incoming_msg(msg)
749         if r is None:
750             return
751
752         msgname = type(r).__name__
753
754         if self.event_callback:
755             self.event_callback(msgname, r)
756
757     def _control_ping(self, context):
758         """Send a ping command."""
759         self._call_vpp_async(self.control_ping_index,
760                              self.control_ping_msgdef,
761                              context=context)
762
763     def _call_vpp(self, i, msgdef, multipart, **kwargs):
764         """Given a message, send the message and await a reply.
765
766         msgdef - the message packing definition
767         i - the message type index
768         multipart - True if the message returns multiple
769         messages in return.
770         context - context number - chosen at random if not
771         supplied.
772         The remainder of the kwargs are the arguments to the API call.
773
774         The return value is the message or message array containing
775         the response.  It will raise an IOError exception if there was
776         no response within the timeout window.
777         """
778
779         if 'context' not in kwargs:
780             context = self.get_context()
781             kwargs['context'] = context
782         else:
783             context = kwargs['context']
784         kwargs['_vl_msg_id'] = i
785         b = self.encode(msgdef, kwargs)
786
787         vpp_api.vac_rx_suspend()
788         self._write(b)
789
790         if multipart:
791             # Send a ping after the request - we use its response
792             # to detect that we have seen all results.
793             self._control_ping(context)
794
795         # Block until we get a reply.
796         rl = []
797         while (True):
798             msg = self._read()
799             if not msg:
800                 raise IOError(2, 'VPP API client: read failed')
801
802             r = self.decode_incoming_msg(msg)
803             msgname = type(r).__name__
804             if context not in r or r.context == 0 or context != r.context:
805                 self.message_queue.put_nowait(r)
806                 continue
807
808             if not multipart:
809                 rl = r
810                 break
811             if msgname == 'control_ping_reply':
812                 break
813
814             rl.append(r)
815
816         vpp_api.vac_rx_resume()
817
818         return rl
819
820     def _call_vpp_async(self, i, msgdef, **kwargs):
821         """Given a message, send the message and await a reply.
822
823         msgdef - the message packing definition
824         i - the message type index
825         context - context number - chosen at random if not
826         supplied.
827         The remainder of the kwargs are the arguments to the API call.
828         """
829         if 'context' not in kwargs:
830             context = self.get_context()
831             kwargs['context'] = context
832         else:
833             context = kwargs['context']
834         kwargs['_vl_msg_id'] = i
835         b = self.encode(msgdef, kwargs)
836
837         self._write(b)
838
839     def register_event_callback(self, callback):
840         """Register a callback for async messages.
841
842         This will be called for async notifications in sync mode,
843         and all messages in async mode.  In sync mode, replies to
844         requests will not come here.
845
846         callback is a fn(msg_type_name, msg_type) that will be
847         called when a message comes in.  While this function is
848         executing, note that (a) you are in a background thread and
849         may wish to use threading.Lock to protect your datastructures,
850         and (b) message processing from VPP will stop (so if you take
851         a long while about it you may provoke reply timeouts or cause
852         VPP to fill the RX buffer).  Passing None will disable the
853         callback.
854         """
855         self.event_callback = callback
856
857     def thread_msg_handler(self):
858         """Python thread calling the user registerd message handler.
859
860         This is to emulate the old style event callback scheme. Modern
861         clients should provide their own thread to poll the event
862         queue.
863         """
864         while True:
865             r = self.message_queue.get()
866             msgname = type(r).__name__
867             if self.event_callback:
868                 self.event_callback(msgname, r)
869
870
871 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4