vpp_papi: remove legacy way of calling VPP APIs
[vpp.git] / src / vpp-api / python / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 import sys
19 import os
20 import logging
21 import collections
22 import struct
23 import json
24 import threading
25 import fnmatch
26 import weakref
27 import atexit
28 from cffi import FFI
29 import cffi
30
31 if sys.version[0] == '2':
32     import Queue as queue
33 else:
34     import queue as queue
35
36 ffi = FFI()
37 ffi.cdef("""
38 typedef void (*vac_callback_t)(unsigned char * data, int len);
39 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
40 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
41     int rx_qlen);
42 int vac_disconnect(void);
43 int vac_read(char **data, int *l, unsigned short timeout);
44 int vac_write(char *data, int len);
45 void vac_free(void * msg);
46
47 int vac_get_msg_index(unsigned char * name);
48 int vac_msg_table_size(void);
49 int vac_msg_table_max_index(void);
50
51 void vac_rx_suspend (void);
52 void vac_rx_resume (void);
53 void vac_set_error_handler(vac_error_callback_t);
54  """)
55
56 # Barfs on failure, no need to check success.
57 vpp_api = ffi.dlopen('libvppapiclient.so')
58
59 def vpp_atexit(vpp_weakref):
60     """Clean up VPP connection on shutdown."""
61     vpp_instance = vpp_weakref()
62     if vpp_instance.connected:
63         vpp_instance.logger.debug('Cleaning up VPP on exit')
64         vpp_instance.disconnect()
65
66 vpp_object = None
67
68
69 def vpp_iterator(d):
70     if sys.version[0] == '2':
71         return d.iteritems()
72     else:
73         return d.items()
74
75
76 @ffi.callback("void(unsigned char *, int)")
77 def vac_callback_sync(data, len):
78     vpp_object.msg_handler_sync(ffi.buffer(data, len))
79
80
81 @ffi.callback("void(unsigned char *, int)")
82 def vac_callback_async(data, len):
83     vpp_object.msg_handler_async(ffi.buffer(data, len))
84
85
86 @ffi.callback("void(void *, unsigned char *, int)")
87 def vac_error_handler(arg, msg, msg_len):
88     vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
89
90
91 class Empty(object):
92     pass
93
94
95 class FuncWrapper(object):
96     def __init__(self, func):
97         self._func = func
98         self.__name__ = func.__name__
99
100     def __call__(self, **kwargs):
101         return self._func(**kwargs)
102
103
104 class VPP():
105     """VPP interface.
106
107     This class provides the APIs to VPP.  The APIs are loaded
108     from provided .api.json files and makes functions accordingly.
109     These functions are documented in the VPP .api files, as they
110     are dynamically created.
111
112     Additionally, VPP can send callback messages; this class
113     provides a means to register a callback function to receive
114     these messages in a background thread.
115     """
116     def __init__(self, apifiles=None, testmode=False, async_thread=True,
117                  logger=logging.getLogger('vpp_papi'), loglevel='debug', read_timeout=0):
118         """Create a VPP API object.
119
120         apifiles is a list of files containing API
121         descriptions that will be loaded - methods will be
122         dynamically created reflecting these APIs.  If not
123         provided this will load the API files from VPP's
124         default install location.
125         """
126         global vpp_object
127         vpp_object = self
128         self.logger = logger
129         logging.basicConfig(level=getattr(logging, loglevel.upper()))
130
131         self.messages = {}
132         self.id_names = []
133         self.id_msgdef = []
134         self.connected = False
135         self.header = struct.Struct('>HI')
136         self.apifiles = []
137         self.event_callback = None
138         self.message_queue = queue.Queue()
139         self.read_timeout = read_timeout
140         self.vpp_api = vpp_api
141         self.async_thread = async_thread
142
143         if not apifiles:
144             # Pick up API definitions from default directory
145             try:
146                 apifiles = self.find_api_files()
147             except RuntimeError:
148                 # In test mode we don't care that we can't find the API files
149                 if testmode:
150                     apifiles = []
151                 else:
152                     raise
153
154         for file in apifiles:
155             with open(file) as apidef_file:
156                 api = json.load(apidef_file)
157                 for t in api['types']:
158                     self.add_type(t[0], t[1:])
159
160                 for m in api['messages']:
161                     self.add_message(m[0], m[1:])
162         self.apifiles = apifiles
163
164         # Basic sanity check
165         if len(self.messages) == 0 and not testmode:
166             raise ValueError(1, 'Missing JSON message definitions')
167
168         # Make sure we allow VPP to clean up the message rings.
169         atexit.register(vpp_atexit, weakref.ref(self))
170
171         # Register error handler
172         vpp_api.vac_set_error_handler(vac_error_handler)
173
174         # Support legacy CFFI
175         # from_buffer supported from 1.8.0
176         (major, minor, patch) = [int(s) for s in cffi.__version__.split('.', 3)]
177         if major >= 1 and minor >= 8:
178             self._write = self._write_new_cffi
179         else:
180             self._write = self._write_legacy_cffi
181
182     class ContextId(object):
183         """Thread-safe provider of unique context IDs."""
184         def __init__(self):
185             self.context = 0
186             self.lock = threading.Lock()
187
188         def __call__(self):
189             """Get a new unique (or, at least, not recently used) context."""
190             with self.lock:
191                 self.context += 1
192                 return self.context
193     get_context = ContextId()
194
195     @classmethod
196     def find_api_dir(cls):
197         """Attempt to find the best directory in which API definition
198         files may reside. If the value VPP_API_DIR exists in the environment
199         then it is first on the search list. If we're inside a recognized
200         location in a VPP source tree (src/scripts and src/vpp-api/python)
201         then entries from there to the likely locations in build-root are
202         added. Finally the location used by system packages is added.
203
204         :returns: A single directory name, or None if no such directory
205             could be found.
206         """
207         dirs = []
208
209         if 'VPP_API_DIR' in os.environ:
210             dirs.append(os.environ['VPP_API_DIR'])
211
212         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
213         # in which case, plot a course to likely places in the src tree
214         import __main__ as main
215         if hasattr(main, '__file__'):
216             # get the path of the calling script
217             localdir = os.path.dirname(os.path.realpath(main.__file__))
218         else:
219             # use cwd if there is no calling script
220             localdir = os.cwd()
221         localdir_s = localdir.split(os.path.sep)
222
223         def dmatch(dir):
224             """Match dir against right-hand components of the script dir"""
225             d = dir.split('/')  # param 'dir' assumes a / separator
226             l = len(d)
227             return len(localdir_s) > l and localdir_s[-l:] == d
228
229         def sdir(srcdir, variant):
230             """Build a path from srcdir to the staged API files of
231             'variant'  (typically '' or '_debug')"""
232             # Since 'core' and 'plugin' files are staged
233             # in separate directories, we target the parent dir.
234             return os.path.sep.join((
235                 srcdir,
236                 'build-root',
237                 'install-vpp%s-native' % variant,
238                 'vpp',
239                 'share',
240                 'vpp',
241                 'api',
242             ))
243
244         srcdir = None
245         if dmatch('src/scripts'):
246             srcdir = os.path.sep.join(localdir_s[:-2])
247         elif dmatch('src/vpp-api/python'):
248             srcdir = os.path.sep.join(localdir_s[:-3])
249         elif dmatch('test'):
250             # we're apparently running tests
251             srcdir = os.path.sep.join(localdir_s[:-1])
252
253         if srcdir:
254             # we're in the source tree, try both the debug and release
255             # variants.
256             x = 'vpp/share/vpp/api'
257             dirs.append(sdir(srcdir, '_debug'))
258             dirs.append(sdir(srcdir, ''))
259
260         # Test for staged copies of the scripts
261         # For these, since we explicitly know if we're running a debug versus
262         # release variant, target only the relevant directory
263         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
264             srcdir = os.path.sep.join(localdir_s[:-4])
265             dirs.append(sdir(srcdir, '_debug'))
266         if dmatch('build-root/install-vpp-native/vpp/bin'):
267             srcdir = os.path.sep.join(localdir_s[:-4])
268             dirs.append(sdir(srcdir, ''))
269
270         # finally, try the location system packages typically install into
271         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
272
273         # check the directories for existance; first one wins
274         for dir in dirs:
275             if os.path.isdir(dir):
276                 return dir
277
278         return None
279
280     @classmethod
281     def find_api_files(cls, api_dir=None, patterns='*'):
282         """Find API definition files from the given directory tree with the
283         given pattern. If no directory is given then find_api_dir() is used
284         to locate one. If no pattern is given then all definition files found
285         in the directory tree are used.
286
287         :param api_dir: A directory tree in which to locate API definition
288             files; subdirectories are descended into.
289             If this is None then find_api_dir() is called to discover it.
290         :param patterns: A list of patterns to use in each visited directory
291             when looking for files.
292             This can be a list/tuple object or a comma-separated string of
293             patterns. Each value in the list will have leading/trialing
294             whitespace stripped.
295             The pattern specifies the first part of the filename, '.api.json'
296             is appended.
297             The results are de-duplicated, thus overlapping patterns are fine.
298             If this is None it defaults to '*' meaning "all API files".
299         :returns: A list of file paths for the API files found.
300         """
301         if api_dir is None:
302             api_dir = cls.find_api_dir()
303             if api_dir is None:
304                 raise RuntimeError("api_dir cannot be located")
305
306         if isinstance(patterns, list) or isinstance(patterns, tuple):
307             patterns = [p.strip() + '.api.json' for p in patterns]
308         else:
309             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
310
311         api_files = []
312         for root, dirnames, files in os.walk(api_dir):
313             # iterate all given patterns and de-dup the result
314             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
315             for filename in files:
316                 api_files.append(os.path.join(root, filename))
317
318         return api_files
319
320     def status(self):
321         """Debug function: report current VPP API status to stdout."""
322         print('Connected') if self.connected else print('Not Connected')
323         print('Read API definitions from', ', '.join(self.apifiles))
324
325     def __struct(self, t, n=None, e=-1, vl=None):
326         """Create a packing structure for a message."""
327         base_types = {'u8': 'B',
328                       'u16': 'H',
329                       'u32': 'I',
330                       'i32': 'i',
331                       'u64': 'Q',
332                       'f64': 'd', }
333         pack = None
334         if t in base_types:
335             pack = base_types[t]
336             if not vl:
337                 if e > 0 and t == 'u8':
338                     # Fixed byte array
339                     s = struct.Struct('>' + str(e) + 's')
340                     return s.size, s
341                 if e > 0:
342                     # Fixed array of base type
343                     s = struct.Struct('>' + base_types[t])
344                     return s.size, [e, s]
345                 elif e == 0:
346                     # Old style variable array
347                     s = struct.Struct('>' + base_types[t])
348                     return s.size, [-1, s]
349             else:
350                 # Variable length array
351                 if t == 'u8':
352                     s = struct.Struct('>s')
353                     return s.size, [vl, s]
354                 else:
355                     s = struct.Struct('>' + base_types[t])
356                 return s.size, [vl, s]
357
358             s = struct.Struct('>' + base_types[t])
359             return s.size, s
360
361         if t in self.messages:
362             size = self.messages[t]['sizes'][0]
363
364             # Return a list in case of array
365             if e > 0 and not vl:
366                 return size, [e, lambda self, encode, buf, offset, args: (
367                     self.__struct_type(encode, self.messages[t], buf, offset,
368                                        args))]
369             if vl:
370                 return size, [vl, lambda self, encode, buf, offset, args: (
371                     self.__struct_type(encode, self.messages[t], buf, offset,
372                                        args))]
373             elif e == 0:
374                 # Old style VLA
375                 raise NotImplementedError(1,
376                                           'No support for compound types ' + t)
377             return size, lambda self, encode, buf, offset, args: (
378                 self.__struct_type(encode, self.messages[t], buf, offset, args)
379             )
380
381         raise ValueError(1, 'Invalid message type: ' + t)
382
383     def __struct_type(self, encode, msgdef, buf, offset, kwargs):
384         """Get a message packer or unpacker."""
385         if encode:
386             return self.__struct_type_encode(msgdef, buf, offset, kwargs)
387         else:
388             return self.__struct_type_decode(msgdef, buf, offset)
389
390     def __struct_type_encode(self, msgdef, buf, offset, kwargs):
391         off = offset
392         size = 0
393
394         for k in kwargs:
395             if k not in msgdef['args']:
396                 raise ValueError(1,'Non existing argument [' + k + ']' + \
397                                  ' used in call to: ' + \
398                                  self.id_names[kwargs['_vl_msg_id']] + '()' )
399
400         for k, v in vpp_iterator(msgdef['args']):
401             off += size
402             if k in kwargs:
403                 if type(v) is list:
404                     if callable(v[1]):
405                         e = kwargs[v[0]] if v[0] in kwargs else v[0]
406                         if e != len(kwargs[k]):
407                             raise (ValueError(1, 'Input list length mismatch: %s (%s != %s)' %  (k, e, len(kwargs[k]))))
408                         size = 0
409                         for i in range(e):
410                             size += v[1](self, True, buf, off + size,
411                                          kwargs[k][i])
412                     else:
413                         if v[0] in kwargs:
414                             l = kwargs[v[0]]
415                             if l != len(kwargs[k]):
416                                 raise ValueError(1, 'Input list length mismatch: %s (%s != %s)' % (k, l, len(kwargs[k])))
417                         else:
418                             l = len(kwargs[k])
419                         if v[1].size == 1:
420                             buf[off:off + l] = bytearray(kwargs[k])
421                             size = l
422                         else:
423                             size = 0
424                             for i in kwargs[k]:
425                                 v[1].pack_into(buf, off + size, i)
426                                 size += v[1].size
427                 else:
428                     if callable(v):
429                         size = v(self, True, buf, off, kwargs[k])
430                     else:
431                         if type(kwargs[k]) is str and v.size < len(kwargs[k]):
432                             raise ValueError(1, 'Input list length mismatch: %s (%s < %s)' % (k, v.size, len(kwargs[k])))
433                         v.pack_into(buf, off, kwargs[k])
434                         size = v.size
435             else:
436                 size = v.size if not type(v) is list else 0
437
438         return off + size - offset
439
440     def __getitem__(self, name):
441         if name in self.messages:
442             return self.messages[name]
443         return None
444
445     def get_size(self, sizes, kwargs):
446         total_size = sizes[0]
447         for e in sizes[1]:
448             if e in kwargs and type(kwargs[e]) is list:
449                 total_size += len(kwargs[e]) * sizes[1][e]
450         return total_size
451
452     def encode(self, msgdef, kwargs):
453         # Make suitably large buffer
454         size = self.get_size(msgdef['sizes'], kwargs)
455         buf = bytearray(size)
456         offset = 0
457         size = self.__struct_type(True, msgdef, buf, offset, kwargs)
458         return buf[:offset + size]
459
460     def decode(self, msgdef, buf):
461         return self.__struct_type(False, msgdef, buf, 0, None)[1]
462
463     def __struct_type_decode(self, msgdef, buf, offset):
464         res = []
465         off = offset
466         size = 0
467         for k, v in vpp_iterator(msgdef['args']):
468             off += size
469             if type(v) is list:
470                 lst = []
471                 if callable(v[1]):  # compound type
472                     size = 0
473                     if v[0] in msgdef['args']:  # vla
474                         e = res[v[2]]
475                     else:  # fixed array
476                         e = v[0]
477                     res.append(lst)
478                     for i in range(e):
479                         (s, l) = v[1](self, False, buf, off + size, None)
480                         lst.append(l)
481                         size += s
482                     continue
483                 if v[1].size == 1:
484                     if type(v[0]) is int:
485                         size = len(buf) - off
486                     else:
487                         size = res[v[2]]
488                     res.append(buf[off:off + size])
489                 else:
490                     e = v[0] if type(v[0]) is int else res[v[2]]
491                     if e == -1:
492                         e = (len(buf) - off) / v[1].size
493                     lst = []
494                     res.append(lst)
495                     size = 0
496                     for i in range(e):
497                         lst.append(v[1].unpack_from(buf, off + size)[0])
498                         size += v[1].size
499             else:
500                 if callable(v):
501                     size = 0
502                     (s, l) = v(self, False, buf, off, None)
503                     res.append(l)
504                     size += s
505                 else:
506                     res.append(v.unpack_from(buf, off)[0])
507                     size = v.size
508
509         return off + size - offset, msgdef['return_tuple']._make(res)
510
511     def ret_tup(self, name):
512         if name in self.messages and 'return_tuple' in self.messages[name]:
513             return self.messages[name]['return_tuple']
514         return None
515
516     def duplicate_check_ok(self, name, msgdef):
517         crc = None
518         for c in msgdef:
519             if type(c) is dict and 'crc' in c:
520                 crc = c['crc']
521                 break
522         if crc:
523             # We can get duplicates because of imports
524             if crc == self.messages[name]['crc']:
525                 return True
526         return False
527
528     def add_message(self, name, msgdef, typeonly=False):
529         if name in self.messages:
530             if typeonly:
531                 if self.duplicate_check_ok(name, msgdef):
532                     return
533             raise ValueError('Duplicate message name: ' + name)
534
535         args = collections.OrderedDict()
536         argtypes = collections.OrderedDict()
537         fields = []
538         msg = {}
539         total_size = 0
540         sizes = {}
541         for i, f in enumerate(msgdef):
542             if type(f) is dict and 'crc' in f:
543                 msg['crc'] = f['crc']
544                 continue
545             field_type = f[0]
546             field_name = f[1]
547             if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
548                 raise ValueError('Variable Length Array must be last: ' + name)
549             size, s = self.__struct(*f)
550             args[field_name] = s
551             if type(s) == list and type(s[0]) == int and type(s[1]) == struct.Struct:
552                 if s[0] < 0:
553                     sizes[field_name] = size
554                 else:
555                     sizes[field_name] = size
556                     total_size += s[0] * size
557             else:
558                 sizes[field_name] = size
559                 total_size += size
560
561             argtypes[field_name] = field_type
562             if len(f) == 4:  # Find offset to # elements field
563                 idx = list(args.keys()).index(f[3]) - i
564                 args[field_name].append(idx)
565             fields.append(field_name)
566         msg['return_tuple'] = collections.namedtuple(name, fields,
567                                                      rename=True)
568         self.messages[name] = msg
569         self.messages[name]['args'] = args
570         self.messages[name]['argtypes'] = argtypes
571         self.messages[name]['typeonly'] = typeonly
572         self.messages[name]['sizes'] = [total_size, sizes]
573         return self.messages[name]
574
575     def add_type(self, name, typedef):
576         return self.add_message('vl_api_' + name + '_t', typedef,
577                                 typeonly=True)
578
579     def make_function(self, name, i, msgdef, multipart, async):
580         if (async):
581             f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
582         else:
583             f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart,
584                                                  **kwargs))
585         args = self.messages[name]['args']
586         argtypes = self.messages[name]['argtypes']
587         f.__name__ = str(name)
588         f.__doc__ = ", ".join(["%s %s" %
589                                (argtypes[k], k) for k in args.keys()])
590         return f
591
592     @property
593     def api(self):
594         if not hasattr(self, "_api"):
595             raise Exception("Not connected, api definitions not available")
596         return self._api
597
598     def _register_functions(self, async=False):
599         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
600         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
601         self._api = Empty()
602         for name, msgdef in vpp_iterator(self.messages):
603             if self.messages[name]['typeonly']:
604                 continue
605             crc = self.messages[name]['crc']
606             n = name + '_' + crc[2:]
607             i = vpp_api.vac_get_msg_index(n.encode())
608             if i > 0:
609                 self.id_msgdef[i] = msgdef
610                 self.id_names[i] = name
611                 multipart = True if name.find('_dump') > 0 else False
612                 f = self.make_function(name, i, msgdef, multipart, async)
613                 setattr(self._api, name, FuncWrapper(f))
614             else:
615                 self.logger.debug(
616                     'No such message type or failed CRC checksum: %s', n)
617
618     def _write_new_cffi(self, buf):
619         """Send a binary-packed message to VPP."""
620         if not self.connected:
621             raise IOError(1, 'Not connected')
622         return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
623
624     def _write_legacy_cffi(self, buf):
625         """Send a binary-packed message to VPP."""
626         if not self.connected:
627             raise IOError(1, 'Not connected')
628         return vpp_api.vac_write(str(buf), len(buf))
629
630     def _read(self):
631         if not self.connected:
632             raise IOError(1, 'Not connected')
633         mem = ffi.new("char **")
634         size = ffi.new("int *")
635         rv = vpp_api.vac_read(mem, size, self.read_timeout)
636         if rv:
637             raise IOError(rv, 'vac_read failed')
638         msg = bytes(ffi.buffer(mem[0], size[0]))
639         vpp_api.vac_free(mem[0])
640         return msg
641
642     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
643                          async):
644         pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
645         rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
646         if rv != 0:
647             raise IOError(2, 'Connect failed')
648         self.connected = True
649
650         self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
651         self._register_functions(async=async)
652
653         # Initialise control ping
654         crc = self.messages['control_ping']['crc']
655         self.control_ping_index = vpp_api.vac_get_msg_index(
656             ('control_ping' + '_' + crc[2:]).encode())
657         self.control_ping_msgdef = self.messages['control_ping']
658         if self.async_thread:
659             self.event_thread = threading.Thread(
660                 target=self.thread_msg_handler)
661             self.event_thread.daemon = True
662             self.event_thread.start()
663         return rv
664
665     def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
666         """Attach to VPP.
667
668         name - the name of the client.
669         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
670         async - if true, messages are sent without waiting for a reply
671         rx_qlen - the length of the VPP message receive queue between
672         client and server.
673         """
674         msg_handler = vac_callback_sync if not async else vac_callback_async
675         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
676                                      async)
677
678     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
679         """Attach to VPP in synchronous mode. Application must poll for events.
680
681         name - the name of the client.
682         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
683         rx_qlen - the length of the VPP message receive queue between
684         client and server.
685         """
686
687         return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
688                                      async=False)
689
690     def disconnect(self):
691         """Detach from VPP."""
692         rv = vpp_api.vac_disconnect()
693         self.connected = False
694         self.message_queue.put("terminate event thread")
695         return rv
696
697     def msg_handler_sync(self, msg):
698         """Process an incoming message from VPP in sync mode.
699
700         The message may be a reply or it may be an async notification.
701         """
702         r = self.decode_incoming_msg(msg)
703         if r is None:
704             return
705
706         # If we have a context, then use the context to find any
707         # request waiting for a reply
708         context = 0
709         if hasattr(r, 'context') and r.context > 0:
710             context = r.context
711
712         if context == 0:
713             # No context -> async notification that we feed to the callback
714             self.message_queue.put_nowait(r)
715         else:
716             raise IOError(2, 'RPC reply message received in event handler')
717
718     def decode_incoming_msg(self, msg):
719         if not msg:
720             self.logger.warning('vpp_api.read failed')
721             return
722
723         i, ci = self.header.unpack_from(msg, 0)
724         if self.id_names[i] == 'rx_thread_exit':
725             return
726
727         #
728         # Decode message and returns a tuple.
729         #
730         msgdef = self.id_msgdef[i]
731         if not msgdef:
732             raise IOError(2, 'Reply message undefined')
733
734         r = self.decode(msgdef, msg)
735
736         return r
737
738     def msg_handler_async(self, msg):
739         """Process a message from VPP in async mode.
740
741         In async mode, all messages are returned to the callback.
742         """
743         r = self.decode_incoming_msg(msg)
744         if r is None:
745             return
746
747         msgname = type(r).__name__
748
749         if self.event_callback:
750             self.event_callback(msgname, r)
751
752     def _control_ping(self, context):
753         """Send a ping command."""
754         self._call_vpp_async(self.control_ping_index,
755                              self.control_ping_msgdef,
756                              context=context)
757
758     def _call_vpp(self, i, msgdef, multipart, **kwargs):
759         """Given a message, send the message and await a reply.
760
761         msgdef - the message packing definition
762         i - the message type index
763         multipart - True if the message returns multiple
764         messages in return.
765         context - context number - chosen at random if not
766         supplied.
767         The remainder of the kwargs are the arguments to the API call.
768
769         The return value is the message or message array containing
770         the response.  It will raise an IOError exception if there was
771         no response within the timeout window.
772         """
773
774         if 'context' not in kwargs:
775             context = self.get_context()
776             kwargs['context'] = context
777         else:
778             context = kwargs['context']
779         kwargs['_vl_msg_id'] = i
780         b = self.encode(msgdef, kwargs)
781
782         vpp_api.vac_rx_suspend()
783         self._write(b)
784
785         if multipart:
786             # Send a ping after the request - we use its response
787             # to detect that we have seen all results.
788             self._control_ping(context)
789
790         # Block until we get a reply.
791         rl = []
792         while (True):
793             msg = self._read()
794             if not msg:
795                 raise IOError(2, 'VPP API client: read failed')
796
797             r = self.decode_incoming_msg(msg)
798             msgname = type(r).__name__
799             if context not in r or r.context == 0 or context != r.context:
800                 self.message_queue.put_nowait(r)
801                 continue
802
803             if not multipart:
804                 rl = r
805                 break
806             if msgname == 'control_ping_reply':
807                 break
808
809             rl.append(r)
810
811         vpp_api.vac_rx_resume()
812
813         return rl
814
815     def _call_vpp_async(self, i, msgdef, **kwargs):
816         """Given a message, send the message and await a reply.
817
818         msgdef - the message packing definition
819         i - the message type index
820         context - context number - chosen at random if not
821         supplied.
822         The remainder of the kwargs are the arguments to the API call.
823         """
824         if 'context' not in kwargs:
825             context = self.get_context()
826             kwargs['context'] = context
827         else:
828             context = kwargs['context']
829         kwargs['_vl_msg_id'] = i
830         b = self.encode(msgdef, kwargs)
831
832         self._write(b)
833
834     def register_event_callback(self, callback):
835         """Register a callback for async messages.
836
837         This will be called for async notifications in sync mode,
838         and all messages in async mode.  In sync mode, replies to
839         requests will not come here.
840
841         callback is a fn(msg_type_name, msg_type) that will be
842         called when a message comes in.  While this function is
843         executing, note that (a) you are in a background thread and
844         may wish to use threading.Lock to protect your datastructures,
845         and (b) message processing from VPP will stop (so if you take
846         a long while about it you may provoke reply timeouts or cause
847         VPP to fill the RX buffer).  Passing None will disable the
848         callback.
849         """
850         self.event_callback = callback
851
852     def thread_msg_handler(self):
853         """Python thread calling the user registerd message handler.
854
855         This is to emulate the old style event callback scheme. Modern
856         clients should provide their own thread to poll the event
857         queue.
858         """
859         while True:
860             r = self.message_queue.get()
861             if r == "terminate event thread":
862                 break
863             msgname = type(r).__name__
864             if self.event_callback:
865                 self.event_callback(msgname, r)
866
867
868 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4