vpp_papi: reduce memory leaks
[vpp.git] / src / vpp-api / python / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 import sys
19 import os
20 import logging
21 import collections
22 import struct
23 import json
24 import threading
25 import fnmatch
26 import weakref
27 import atexit
28 from cffi import FFI
29 import cffi
30
31 if sys.version[0] == '2':
32     import Queue as queue
33 else:
34     import queue as queue
35
36 ffi = FFI()
37 ffi.cdef("""
38 typedef void (*vac_callback_t)(unsigned char * data, int len);
39 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
40 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
41     int rx_qlen);
42 int vac_disconnect(void);
43 int vac_read(char **data, int *l, unsigned short timeout);
44 int vac_write(char *data, int len);
45 void vac_free(void * msg);
46
47 int vac_get_msg_index(unsigned char * name);
48 int vac_msg_table_size(void);
49 int vac_msg_table_max_index(void);
50
51 void vac_rx_suspend (void);
52 void vac_rx_resume (void);
53 void vac_set_error_handler(vac_error_callback_t);
54  """)
55
56 # Barfs on failure, no need to check success.
57 vpp_api = ffi.dlopen('libvppapiclient.so')
58
59 def vpp_atexit(vpp_weakref):
60     """Clean up VPP connection on shutdown."""
61     vpp_instance = vpp_weakref()
62     if vpp_instance.connected:
63         vpp_instance.logger.debug('Cleaning up VPP on exit')
64         vpp_instance.disconnect()
65
66 vpp_object = None
67
68
69 def vpp_iterator(d):
70     if sys.version[0] == '2':
71         return d.iteritems()
72     else:
73         return d.items()
74
75
76 @ffi.callback("void(unsigned char *, int)")
77 def vac_callback_sync(data, len):
78     vpp_object.msg_handler_sync(ffi.buffer(data, len))
79
80
81 @ffi.callback("void(unsigned char *, int)")
82 def vac_callback_async(data, len):
83     vpp_object.msg_handler_async(ffi.buffer(data, len))
84
85
86 @ffi.callback("void(void *, unsigned char *, int)")
87 def vac_error_handler(arg, msg, msg_len):
88     vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
89
90
91 class Empty(object):
92     pass
93
94
95 class FuncWrapper(object):
96     def __init__(self, func):
97         self._func = func
98         self.__name__ = func.__name__
99
100     def __call__(self, **kwargs):
101         return self._func(**kwargs)
102
103
104 class VPP():
105     """VPP interface.
106
107     This class provides the APIs to VPP.  The APIs are loaded
108     from provided .api.json files and makes functions accordingly.
109     These functions are documented in the VPP .api files, as they
110     are dynamically created.
111
112     Additionally, VPP can send callback messages; this class
113     provides a means to register a callback function to receive
114     these messages in a background thread.
115     """
116     def __init__(self, apifiles=None, testmode=False, async_thread=True,
117                  logger=logging.getLogger('vpp_papi'), loglevel='debug', read_timeout=0):
118         """Create a VPP API object.
119
120         apifiles is a list of files containing API
121         descriptions that will be loaded - methods will be
122         dynamically created reflecting these APIs.  If not
123         provided this will load the API files from VPP's
124         default install location.
125         """
126         global vpp_object
127         vpp_object = self
128         self.logger = logger
129         logging.basicConfig(level=getattr(logging, loglevel.upper()))
130
131         self.messages = {}
132         self.id_names = []
133         self.id_msgdef = []
134         self.connected = False
135         self.header = struct.Struct('>HI')
136         self.apifiles = []
137         self.event_callback = None
138         self.message_queue = queue.Queue()
139         self.read_timeout = read_timeout
140         self.vpp_api = vpp_api
141         self.async_thread = async_thread
142
143         if not apifiles:
144             # Pick up API definitions from default directory
145             try:
146                 apifiles = self.find_api_files()
147             except RuntimeError:
148                 # In test mode we don't care that we can't find the API files
149                 if testmode:
150                     apifiles = []
151                 else:
152                     raise
153
154         for file in apifiles:
155             with open(file) as apidef_file:
156                 api = json.load(apidef_file)
157                 for t in api['types']:
158                     self.add_type(t[0], t[1:])
159
160                 for m in api['messages']:
161                     self.add_message(m[0], m[1:])
162         self.apifiles = apifiles
163
164         # Basic sanity check
165         if len(self.messages) == 0 and not testmode:
166             raise ValueError(1, 'Missing JSON message definitions')
167
168         # Make sure we allow VPP to clean up the message rings.
169         atexit.register(vpp_atexit, weakref.ref(self))
170
171         # Register error handler
172         vpp_api.vac_set_error_handler(vac_error_handler)
173
174         # Support legacy CFFI
175         # from_buffer supported from 1.8.0
176         (major, minor, patch) = [int(s) for s in cffi.__version__.split('.', 3)]
177         if major >= 1 and minor >= 8:
178             self._write = self._write_new_cffi
179         else:
180             self._write = self._write_legacy_cffi
181
182     class ContextId(object):
183         """Thread-safe provider of unique context IDs."""
184         def __init__(self):
185             self.context = 0
186             self.lock = threading.Lock()
187
188         def __call__(self):
189             """Get a new unique (or, at least, not recently used) context."""
190             with self.lock:
191                 self.context += 1
192                 return self.context
193     get_context = ContextId()
194
195     @classmethod
196     def find_api_dir(cls):
197         """Attempt to find the best directory in which API definition
198         files may reside. If the value VPP_API_DIR exists in the environment
199         then it is first on the search list. If we're inside a recognized
200         location in a VPP source tree (src/scripts and src/vpp-api/python)
201         then entries from there to the likely locations in build-root are
202         added. Finally the location used by system packages is added.
203
204         :returns: A single directory name, or None if no such directory
205             could be found.
206         """
207         dirs = []
208
209         if 'VPP_API_DIR' in os.environ:
210             dirs.append(os.environ['VPP_API_DIR'])
211
212         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
213         # in which case, plot a course to likely places in the src tree
214         import __main__ as main
215         if hasattr(main, '__file__'):
216             # get the path of the calling script
217             localdir = os.path.dirname(os.path.realpath(main.__file__))
218         else:
219             # use cwd if there is no calling script
220             localdir = os.cwd()
221         localdir_s = localdir.split(os.path.sep)
222
223         def dmatch(dir):
224             """Match dir against right-hand components of the script dir"""
225             d = dir.split('/')  # param 'dir' assumes a / separator
226             l = len(d)
227             return len(localdir_s) > l and localdir_s[-l:] == d
228
229         def sdir(srcdir, variant):
230             """Build a path from srcdir to the staged API files of
231             'variant'  (typically '' or '_debug')"""
232             # Since 'core' and 'plugin' files are staged
233             # in separate directories, we target the parent dir.
234             return os.path.sep.join((
235                 srcdir,
236                 'build-root',
237                 'install-vpp%s-native' % variant,
238                 'vpp',
239                 'share',
240                 'vpp',
241                 'api',
242             ))
243
244         srcdir = None
245         if dmatch('src/scripts'):
246             srcdir = os.path.sep.join(localdir_s[:-2])
247         elif dmatch('src/vpp-api/python'):
248             srcdir = os.path.sep.join(localdir_s[:-3])
249         elif dmatch('test'):
250             # we're apparently running tests
251             srcdir = os.path.sep.join(localdir_s[:-1])
252
253         if srcdir:
254             # we're in the source tree, try both the debug and release
255             # variants.
256             x = 'vpp/share/vpp/api'
257             dirs.append(sdir(srcdir, '_debug'))
258             dirs.append(sdir(srcdir, ''))
259
260         # Test for staged copies of the scripts
261         # For these, since we explicitly know if we're running a debug versus
262         # release variant, target only the relevant directory
263         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
264             srcdir = os.path.sep.join(localdir_s[:-4])
265             dirs.append(sdir(srcdir, '_debug'))
266         if dmatch('build-root/install-vpp-native/vpp/bin'):
267             srcdir = os.path.sep.join(localdir_s[:-4])
268             dirs.append(sdir(srcdir, ''))
269
270         # finally, try the location system packages typically install into
271         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
272
273         # check the directories for existance; first one wins
274         for dir in dirs:
275             if os.path.isdir(dir):
276                 return dir
277
278         return None
279
280     @classmethod
281     def find_api_files(cls, api_dir=None, patterns='*'):
282         """Find API definition files from the given directory tree with the
283         given pattern. If no directory is given then find_api_dir() is used
284         to locate one. If no pattern is given then all definition files found
285         in the directory tree are used.
286
287         :param api_dir: A directory tree in which to locate API definition
288             files; subdirectories are descended into.
289             If this is None then find_api_dir() is called to discover it.
290         :param patterns: A list of patterns to use in each visited directory
291             when looking for files.
292             This can be a list/tuple object or a comma-separated string of
293             patterns. Each value in the list will have leading/trialing
294             whitespace stripped.
295             The pattern specifies the first part of the filename, '.api.json'
296             is appended.
297             The results are de-duplicated, thus overlapping patterns are fine.
298             If this is None it defaults to '*' meaning "all API files".
299         :returns: A list of file paths for the API files found.
300         """
301         if api_dir is None:
302             api_dir = cls.find_api_dir()
303             if api_dir is None:
304                 raise RuntimeError("api_dir cannot be located")
305
306         if isinstance(patterns, list) or isinstance(patterns, tuple):
307             patterns = [p.strip() + '.api.json' for p in patterns]
308         else:
309             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
310
311         api_files = []
312         for root, dirnames, files in os.walk(api_dir):
313             # iterate all given patterns and de-dup the result
314             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
315             for filename in files:
316                 api_files.append(os.path.join(root, filename))
317
318         return api_files
319
320     def status(self):
321         """Debug function: report current VPP API status to stdout."""
322         print('Connected') if self.connected else print('Not Connected')
323         print('Read API definitions from', ', '.join(self.apifiles))
324
325     def __struct(self, t, n=None, e=-1, vl=None):
326         """Create a packing structure for a message."""
327         base_types = {'u8': 'B',
328                       'u16': 'H',
329                       'u32': 'I',
330                       'i32': 'i',
331                       'u64': 'Q',
332                       'f64': 'd', }
333         pack = None
334         if t in base_types:
335             pack = base_types[t]
336             if not vl:
337                 if e > 0 and t == 'u8':
338                     # Fixed byte array
339                     s = struct.Struct('>' + str(e) + 's')
340                     return s.size, s
341                 if e > 0:
342                     # Fixed array of base type
343                     s = struct.Struct('>' + base_types[t])
344                     return s.size, [e, s]
345                 elif e == 0:
346                     # Old style variable array
347                     s = struct.Struct('>' + base_types[t])
348                     return s.size, [-1, s]
349             else:
350                 # Variable length array
351                 if t == 'u8':
352                     s = struct.Struct('>s')
353                     return s.size, [vl, s]
354                 else:
355                     s = struct.Struct('>' + base_types[t])
356                 return s.size, [vl, s]
357
358             s = struct.Struct('>' + base_types[t])
359             return s.size, s
360
361         if t in self.messages:
362             size = self.messages[t]['sizes'][0]
363
364             # Return a list in case of array
365             if e > 0 and not vl:
366                 return size, [e, lambda self, encode, buf, offset, args: (
367                     self.__struct_type(encode, self.messages[t], buf, offset,
368                                        args))]
369             if vl:
370                 return size, [vl, lambda self, encode, buf, offset, args: (
371                     self.__struct_type(encode, self.messages[t], buf, offset,
372                                        args))]
373             elif e == 0:
374                 # Old style VLA
375                 raise NotImplementedError(1,
376                                           'No support for compound types ' + t)
377             return size, lambda self, encode, buf, offset, args: (
378                 self.__struct_type(encode, self.messages[t], buf, offset, args)
379             )
380
381         raise ValueError(1, 'Invalid message type: ' + t)
382
383     def __struct_type(self, encode, msgdef, buf, offset, kwargs):
384         """Get a message packer or unpacker."""
385         if encode:
386             return self.__struct_type_encode(msgdef, buf, offset, kwargs)
387         else:
388             return self.__struct_type_decode(msgdef, buf, offset)
389
390     def __struct_type_encode(self, msgdef, buf, offset, kwargs):
391         off = offset
392         size = 0
393
394         for k in kwargs:
395             if k not in msgdef['args']:
396                 raise ValueError(1,'Non existing argument [' + k + ']' + \
397                                  ' used in call to: ' + \
398                                  self.id_names[kwargs['_vl_msg_id']] + '()' )
399
400         for k, v in vpp_iterator(msgdef['args']):
401             off += size
402             if k in kwargs:
403                 if type(v) is list:
404                     if callable(v[1]):
405                         e = kwargs[v[0]] if v[0] in kwargs else v[0]
406                         if e != len(kwargs[k]):
407                             raise (ValueError(1, 'Input list length mismatch: %s (%s != %s)' %  (k, e, len(kwargs[k]))))
408                         size = 0
409                         for i in range(e):
410                             size += v[1](self, True, buf, off + size,
411                                          kwargs[k][i])
412                     else:
413                         if v[0] in kwargs:
414                             l = kwargs[v[0]]
415                             if l != len(kwargs[k]):
416                                 raise ValueError(1, 'Input list length mismatch: %s (%s != %s)' % (k, l, len(kwargs[k])))
417                         else:
418                             l = len(kwargs[k])
419                         if v[1].size == 1:
420                             buf[off:off + l] = bytearray(kwargs[k])
421                             size = l
422                         else:
423                             size = 0
424                             for i in kwargs[k]:
425                                 v[1].pack_into(buf, off + size, i)
426                                 size += v[1].size
427                 else:
428                     if callable(v):
429                         size = v(self, True, buf, off, kwargs[k])
430                     else:
431                         if type(kwargs[k]) is str and v.size < len(kwargs[k]):
432                             raise ValueError(1, 'Input list length mismatch: %s (%s < %s)' % (k, v.size, len(kwargs[k])))
433                         v.pack_into(buf, off, kwargs[k])
434                         size = v.size
435             else:
436                 size = v.size if not type(v) is list else 0
437
438         return off + size - offset
439
440     def __getitem__(self, name):
441         if name in self.messages:
442             return self.messages[name]
443         return None
444
445     def get_size(self, sizes, kwargs):
446         total_size = sizes[0]
447         for e in sizes[1]:
448             if e in kwargs and type(kwargs[e]) is list:
449                 total_size += len(kwargs[e]) * sizes[1][e]
450         return total_size
451
452     def encode(self, msgdef, kwargs):
453         # Make suitably large buffer
454         size = self.get_size(msgdef['sizes'], kwargs)
455         buf = bytearray(size)
456         offset = 0
457         size = self.__struct_type(True, msgdef, buf, offset, kwargs)
458         return buf[:offset + size]
459
460     def decode(self, msgdef, buf):
461         return self.__struct_type(False, msgdef, buf, 0, None)[1]
462
463     def __struct_type_decode(self, msgdef, buf, offset):
464         res = []
465         off = offset
466         size = 0
467         for k, v in vpp_iterator(msgdef['args']):
468             off += size
469             if type(v) is list:
470                 lst = []
471                 if callable(v[1]):  # compound type
472                     size = 0
473                     if v[0] in msgdef['args']:  # vla
474                         e = res[v[2]]
475                     else:  # fixed array
476                         e = v[0]
477                     res.append(lst)
478                     for i in range(e):
479                         (s, l) = v[1](self, False, buf, off + size, None)
480                         lst.append(l)
481                         size += s
482                     continue
483                 if v[1].size == 1:
484                     if type(v[0]) is int:
485                         size = len(buf) - off
486                     else:
487                         size = res[v[2]]
488                     res.append(buf[off:off + size])
489                 else:
490                     e = v[0] if type(v[0]) is int else res[v[2]]
491                     if e == -1:
492                         e = (len(buf) - off) / v[1].size
493                     lst = []
494                     res.append(lst)
495                     size = 0
496                     for i in range(e):
497                         lst.append(v[1].unpack_from(buf, off + size)[0])
498                         size += v[1].size
499             else:
500                 if callable(v):
501                     size = 0
502                     (s, l) = v(self, False, buf, off, None)
503                     res.append(l)
504                     size += s
505                 else:
506                     res.append(v.unpack_from(buf, off)[0])
507                     size = v.size
508
509         return off + size - offset, msgdef['return_tuple']._make(res)
510
511     def ret_tup(self, name):
512         if name in self.messages and 'return_tuple' in self.messages[name]:
513             return self.messages[name]['return_tuple']
514         return None
515
516     def duplicate_check_ok(self, name, msgdef):
517         crc = None
518         for c in msgdef:
519             if type(c) is dict and 'crc' in c:
520                 crc = c['crc']
521                 break
522         if crc:
523             # We can get duplicates because of imports
524             if crc == self.messages[name]['crc']:
525                 return True
526         return False
527
528     def add_message(self, name, msgdef, typeonly=False):
529         if name in self.messages:
530             if typeonly:
531                 if self.duplicate_check_ok(name, msgdef):
532                     return
533             raise ValueError('Duplicate message name: ' + name)
534
535         args = collections.OrderedDict()
536         argtypes = collections.OrderedDict()
537         fields = []
538         msg = {}
539         total_size = 0
540         sizes = {}
541         for i, f in enumerate(msgdef):
542             if type(f) is dict and 'crc' in f:
543                 msg['crc'] = f['crc']
544                 continue
545             field_type = f[0]
546             field_name = f[1]
547             if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
548                 raise ValueError('Variable Length Array must be last: ' + name)
549             size, s = self.__struct(*f)
550             args[field_name] = s
551             if type(s) == list and type(s[0]) == int and type(s[1]) == struct.Struct:
552                 if s[0] < 0:
553                     sizes[field_name] = size
554                 else:
555                     sizes[field_name] = size
556                     total_size += s[0] * size
557             else:
558                 sizes[field_name] = size
559                 total_size += size
560
561             argtypes[field_name] = field_type
562             if len(f) == 4:  # Find offset to # elements field
563                 idx = list(args.keys()).index(f[3]) - i
564                 args[field_name].append(idx)
565             fields.append(field_name)
566         msg['return_tuple'] = collections.namedtuple(name, fields,
567                                                      rename=True)
568         self.messages[name] = msg
569         self.messages[name]['args'] = args
570         self.messages[name]['argtypes'] = argtypes
571         self.messages[name]['typeonly'] = typeonly
572         self.messages[name]['sizes'] = [total_size, sizes]
573         return self.messages[name]
574
575     def add_type(self, name, typedef):
576         return self.add_message('vl_api_' + name + '_t', typedef,
577                                 typeonly=True)
578
579     def make_function(self, name, i, msgdef, multipart, async):
580         if (async):
581             f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
582         else:
583             f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart,
584                                                  **kwargs))
585         args = self.messages[name]['args']
586         argtypes = self.messages[name]['argtypes']
587         f.__name__ = str(name)
588         f.__doc__ = ", ".join(["%s %s" %
589                                (argtypes[k], k) for k in args.keys()])
590         return f
591
592     @property
593     def api(self):
594         if not hasattr(self, "_api"):
595             raise Exception("Not connected, api definitions not available")
596         return self._api
597
598     def _register_functions(self, async=False):
599         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
600         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
601         self._api = Empty()
602         for name, msgdef in vpp_iterator(self.messages):
603             if self.messages[name]['typeonly']:
604                 continue
605             crc = self.messages[name]['crc']
606             n = name + '_' + crc[2:]
607             i = vpp_api.vac_get_msg_index(n.encode())
608             if i > 0:
609                 self.id_msgdef[i] = msgdef
610                 self.id_names[i] = name
611                 multipart = True if name.find('_dump') > 0 else False
612                 f = self.make_function(name, i, msgdef, multipart, async)
613                 setattr(self._api, name, FuncWrapper(f))
614
615                 # old API stuff starts here - will be removed in 17.07
616                 if hasattr(self, name):
617                     raise NameError(
618                         3, "Conflicting name in JSON definition: `%s'" % name)
619                 setattr(self, name, f)
620                 # old API stuff ends here
621             else:
622                 self.logger.debug(
623                     'No such message type or failed CRC checksum: %s', n)
624
625     def _write_new_cffi(self, buf):
626         """Send a binary-packed message to VPP."""
627         if not self.connected:
628             raise IOError(1, 'Not connected')
629         return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
630
631     def _write_legacy_cffi(self, buf):
632         """Send a binary-packed message to VPP."""
633         if not self.connected:
634             raise IOError(1, 'Not connected')
635         return vpp_api.vac_write(str(buf), len(buf))
636
637     def _read(self):
638         if not self.connected:
639             raise IOError(1, 'Not connected')
640         mem = ffi.new("char **")
641         size = ffi.new("int *")
642         rv = vpp_api.vac_read(mem, size, self.read_timeout)
643         if rv:
644             raise IOError(rv, 'vac_read failed')
645         msg = bytes(ffi.buffer(mem[0], size[0]))
646         vpp_api.vac_free(mem[0])
647         return msg
648
649     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
650                          async):
651         pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
652         rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
653         if rv != 0:
654             raise IOError(2, 'Connect failed')
655         self.connected = True
656
657         self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
658         self._register_functions(async=async)
659
660         # Initialise control ping
661         crc = self.messages['control_ping']['crc']
662         self.control_ping_index = vpp_api.vac_get_msg_index(
663             ('control_ping' + '_' + crc[2:]).encode())
664         self.control_ping_msgdef = self.messages['control_ping']
665         if self.async_thread:
666             self.event_thread = threading.Thread(
667                 target=self.thread_msg_handler)
668             self.event_thread.daemon = True
669             self.event_thread.start()
670         return rv
671
672     def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
673         """Attach to VPP.
674
675         name - the name of the client.
676         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
677         async - if true, messages are sent without waiting for a reply
678         rx_qlen - the length of the VPP message receive queue between
679         client and server.
680         """
681         msg_handler = vac_callback_sync if not async else vac_callback_async
682         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
683                                      async)
684
685     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
686         """Attach to VPP in synchronous mode. Application must poll for events.
687
688         name - the name of the client.
689         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
690         rx_qlen - the length of the VPP message receive queue between
691         client and server.
692         """
693
694         return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
695                                      async=False)
696
697     def disconnect(self):
698         """Detach from VPP."""
699         rv = vpp_api.vac_disconnect()
700         self.connected = False
701         self.message_queue.put("terminate event thread")
702         return rv
703
704     def msg_handler_sync(self, msg):
705         """Process an incoming message from VPP in sync mode.
706
707         The message may be a reply or it may be an async notification.
708         """
709         r = self.decode_incoming_msg(msg)
710         if r is None:
711             return
712
713         # If we have a context, then use the context to find any
714         # request waiting for a reply
715         context = 0
716         if hasattr(r, 'context') and r.context > 0:
717             context = r.context
718
719         if context == 0:
720             # No context -> async notification that we feed to the callback
721             self.message_queue.put_nowait(r)
722         else:
723             raise IOError(2, 'RPC reply message received in event handler')
724
725     def decode_incoming_msg(self, msg):
726         if not msg:
727             self.logger.warning('vpp_api.read failed')
728             return
729
730         i, ci = self.header.unpack_from(msg, 0)
731         if self.id_names[i] == 'rx_thread_exit':
732             return
733
734         #
735         # Decode message and returns a tuple.
736         #
737         msgdef = self.id_msgdef[i]
738         if not msgdef:
739             raise IOError(2, 'Reply message undefined')
740
741         r = self.decode(msgdef, msg)
742
743         return r
744
745     def msg_handler_async(self, msg):
746         """Process a message from VPP in async mode.
747
748         In async mode, all messages are returned to the callback.
749         """
750         r = self.decode_incoming_msg(msg)
751         if r is None:
752             return
753
754         msgname = type(r).__name__
755
756         if self.event_callback:
757             self.event_callback(msgname, r)
758
759     def _control_ping(self, context):
760         """Send a ping command."""
761         self._call_vpp_async(self.control_ping_index,
762                              self.control_ping_msgdef,
763                              context=context)
764
765     def _call_vpp(self, i, msgdef, multipart, **kwargs):
766         """Given a message, send the message and await a reply.
767
768         msgdef - the message packing definition
769         i - the message type index
770         multipart - True if the message returns multiple
771         messages in return.
772         context - context number - chosen at random if not
773         supplied.
774         The remainder of the kwargs are the arguments to the API call.
775
776         The return value is the message or message array containing
777         the response.  It will raise an IOError exception if there was
778         no response within the timeout window.
779         """
780
781         if 'context' not in kwargs:
782             context = self.get_context()
783             kwargs['context'] = context
784         else:
785             context = kwargs['context']
786         kwargs['_vl_msg_id'] = i
787         b = self.encode(msgdef, kwargs)
788
789         vpp_api.vac_rx_suspend()
790         self._write(b)
791
792         if multipart:
793             # Send a ping after the request - we use its response
794             # to detect that we have seen all results.
795             self._control_ping(context)
796
797         # Block until we get a reply.
798         rl = []
799         while (True):
800             msg = self._read()
801             if not msg:
802                 raise IOError(2, 'VPP API client: read failed')
803
804             r = self.decode_incoming_msg(msg)
805             msgname = type(r).__name__
806             if context not in r or r.context == 0 or context != r.context:
807                 self.message_queue.put_nowait(r)
808                 continue
809
810             if not multipart:
811                 rl = r
812                 break
813             if msgname == 'control_ping_reply':
814                 break
815
816             rl.append(r)
817
818         vpp_api.vac_rx_resume()
819
820         return rl
821
822     def _call_vpp_async(self, i, msgdef, **kwargs):
823         """Given a message, send the message and await a reply.
824
825         msgdef - the message packing definition
826         i - the message type index
827         context - context number - chosen at random if not
828         supplied.
829         The remainder of the kwargs are the arguments to the API call.
830         """
831         if 'context' not in kwargs:
832             context = self.get_context()
833             kwargs['context'] = context
834         else:
835             context = kwargs['context']
836         kwargs['_vl_msg_id'] = i
837         b = self.encode(msgdef, kwargs)
838
839         self._write(b)
840
841     def register_event_callback(self, callback):
842         """Register a callback for async messages.
843
844         This will be called for async notifications in sync mode,
845         and all messages in async mode.  In sync mode, replies to
846         requests will not come here.
847
848         callback is a fn(msg_type_name, msg_type) that will be
849         called when a message comes in.  While this function is
850         executing, note that (a) you are in a background thread and
851         may wish to use threading.Lock to protect your datastructures,
852         and (b) message processing from VPP will stop (so if you take
853         a long while about it you may provoke reply timeouts or cause
854         VPP to fill the RX buffer).  Passing None will disable the
855         callback.
856         """
857         self.event_callback = callback
858
859     def thread_msg_handler(self):
860         """Python thread calling the user registerd message handler.
861
862         This is to emulate the old style event callback scheme. Modern
863         clients should provide their own thread to poll the event
864         queue.
865         """
866         while True:
867             r = self.message_queue.get()
868             if r == "terminate event thread":
869                 break
870             msgname = type(r).__name__
871             if self.event_callback:
872                 self.event_callback(msgname, r)
873
874
875 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4