srv6-as: fixing version
[vpp.git] / src / vpp-api / python / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 import sys
19 import os
20 import logging
21 import collections
22 import struct
23 import json
24 import threading
25 import fnmatch
26 import atexit
27 from cffi import FFI
28 import cffi
29
30 if sys.version[0] == '2':
31     import Queue as queue
32 else:
33     import queue as queue
34
35 ffi = FFI()
36 ffi.cdef("""
37 typedef void (*vac_callback_t)(unsigned char * data, int len);
38 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
39 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
40     int rx_qlen);
41 int vac_disconnect(void);
42 int vac_read(char **data, int *l, unsigned short timeout);
43 int vac_write(char *data, int len);
44 void vac_free(void * msg);
45
46 int vac_get_msg_index(unsigned char * name);
47 int vac_msg_table_size(void);
48 int vac_msg_table_max_index(void);
49
50 void vac_rx_suspend (void);
51 void vac_rx_resume (void);
52 void vac_set_error_handler(vac_error_callback_t);
53  """)
54
55 # Barfs on failure, no need to check success.
56 vpp_api = ffi.dlopen('libvppapiclient.so')
57
58 def vpp_atexit(self):
59     """Clean up VPP connection on shutdown."""
60     if self.connected:
61         self.logger.debug('Cleaning up VPP on exit')
62         self.disconnect()
63
64 vpp_object = None
65
66
67 def vpp_iterator(d):
68     if sys.version[0] == '2':
69         return d.iteritems()
70     else:
71         return d.items()
72
73
74 @ffi.callback("void(unsigned char *, int)")
75 def vac_callback_sync(data, len):
76     vpp_object.msg_handler_sync(ffi.buffer(data, len))
77
78
79 @ffi.callback("void(unsigned char *, int)")
80 def vac_callback_async(data, len):
81     vpp_object.msg_handler_async(ffi.buffer(data, len))
82
83
84 @ffi.callback("void(void *, unsigned char *, int)")
85 def vac_error_handler(arg, msg, msg_len):
86     vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
87
88
89 class Empty(object):
90     pass
91
92
93 class FuncWrapper(object):
94     def __init__(self, func):
95         self._func = func
96         self.__name__ = func.__name__
97
98     def __call__(self, **kwargs):
99         return self._func(**kwargs)
100
101
102 class VPP():
103     """VPP interface.
104
105     This class provides the APIs to VPP.  The APIs are loaded
106     from provided .api.json files and makes functions accordingly.
107     These functions are documented in the VPP .api files, as they
108     are dynamically created.
109
110     Additionally, VPP can send callback messages; this class
111     provides a means to register a callback function to receive
112     these messages in a background thread.
113     """
114     def __init__(self, apifiles=None, testmode=False, async_thread=True,
115                  logger=logging.getLogger('vpp_papi'), loglevel='debug', read_timeout=0):
116         """Create a VPP API object.
117
118         apifiles is a list of files containing API
119         descriptions that will be loaded - methods will be
120         dynamically created reflecting these APIs.  If not
121         provided this will load the API files from VPP's
122         default install location.
123         """
124         global vpp_object
125         vpp_object = self
126         self.logger = logger
127         logging.basicConfig(level=getattr(logging, loglevel.upper()))
128
129         self.messages = {}
130         self.id_names = []
131         self.id_msgdef = []
132         self.connected = False
133         self.header = struct.Struct('>HI')
134         self.apifiles = []
135         self.event_callback = None
136         self.message_queue = queue.Queue()
137         self.read_timeout = read_timeout
138         self.vpp_api = vpp_api
139         if async_thread:
140             self.event_thread = threading.Thread(
141                 target=self.thread_msg_handler)
142             self.event_thread.daemon = True
143             self.event_thread.start()
144
145         if not apifiles:
146             # Pick up API definitions from default directory
147             try:
148                 apifiles = self.find_api_files()
149             except RuntimeError:
150                 # In test mode we don't care that we can't find the API files
151                 if testmode:
152                     apifiles = []
153                 else:
154                     raise
155
156         for file in apifiles:
157             with open(file) as apidef_file:
158                 api = json.load(apidef_file)
159                 for t in api['types']:
160                     self.add_type(t[0], t[1:])
161
162                 for m in api['messages']:
163                     self.add_message(m[0], m[1:])
164         self.apifiles = apifiles
165
166         # Basic sanity check
167         if len(self.messages) == 0 and not testmode:
168             raise ValueError(1, 'Missing JSON message definitions')
169
170         # Make sure we allow VPP to clean up the message rings.
171         atexit.register(vpp_atexit, self)
172
173         # Register error handler
174         vpp_api.vac_set_error_handler(vac_error_handler)
175
176         # Support legacy CFFI
177         # from_buffer supported from 1.8.0
178         (major, minor, patch) = [int(s) for s in cffi.__version__.split('.', 3)]
179         if major >= 1 and minor >= 8:
180             self._write = self._write_new_cffi
181         else:
182             self._write = self._write_legacy_cffi
183
184     class ContextId(object):
185         """Thread-safe provider of unique context IDs."""
186         def __init__(self):
187             self.context = 0
188             self.lock = threading.Lock()
189
190         def __call__(self):
191             """Get a new unique (or, at least, not recently used) context."""
192             with self.lock:
193                 self.context += 1
194                 return self.context
195     get_context = ContextId()
196
197     @classmethod
198     def find_api_dir(cls):
199         """Attempt to find the best directory in which API definition
200         files may reside. If the value VPP_API_DIR exists in the environment
201         then it is first on the search list. If we're inside a recognized
202         location in a VPP source tree (src/scripts and src/vpp-api/python)
203         then entries from there to the likely locations in build-root are
204         added. Finally the location used by system packages is added.
205
206         :returns: A single directory name, or None if no such directory
207             could be found.
208         """
209         dirs = []
210
211         if 'VPP_API_DIR' in os.environ:
212             dirs.append(os.environ['VPP_API_DIR'])
213
214         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
215         # in which case, plot a course to likely places in the src tree
216         import __main__ as main
217         if hasattr(main, '__file__'):
218             # get the path of the calling script
219             localdir = os.path.dirname(os.path.realpath(main.__file__))
220         else:
221             # use cwd if there is no calling script
222             localdir = os.cwd()
223         localdir_s = localdir.split(os.path.sep)
224
225         def dmatch(dir):
226             """Match dir against right-hand components of the script dir"""
227             d = dir.split('/')  # param 'dir' assumes a / separator
228             l = len(d)
229             return len(localdir_s) > l and localdir_s[-l:] == d
230
231         def sdir(srcdir, variant):
232             """Build a path from srcdir to the staged API files of
233             'variant'  (typically '' or '_debug')"""
234             # Since 'core' and 'plugin' files are staged
235             # in separate directories, we target the parent dir.
236             return os.path.sep.join((
237                 srcdir,
238                 'build-root',
239                 'install-vpp%s-native' % variant,
240                 'vpp',
241                 'share',
242                 'vpp',
243                 'api',
244             ))
245
246         srcdir = None
247         if dmatch('src/scripts'):
248             srcdir = os.path.sep.join(localdir_s[:-2])
249         elif dmatch('src/vpp-api/python'):
250             srcdir = os.path.sep.join(localdir_s[:-3])
251         elif dmatch('test'):
252             # we're apparently running tests
253             srcdir = os.path.sep.join(localdir_s[:-1])
254
255         if srcdir:
256             # we're in the source tree, try both the debug and release
257             # variants.
258             x = 'vpp/share/vpp/api'
259             dirs.append(sdir(srcdir, '_debug'))
260             dirs.append(sdir(srcdir, ''))
261
262         # Test for staged copies of the scripts
263         # For these, since we explicitly know if we're running a debug versus
264         # release variant, target only the relevant directory
265         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
266             srcdir = os.path.sep.join(localdir_s[:-4])
267             dirs.append(sdir(srcdir, '_debug'))
268         if dmatch('build-root/install-vpp-native/vpp/bin'):
269             srcdir = os.path.sep.join(localdir_s[:-4])
270             dirs.append(sdir(srcdir, ''))
271
272         # finally, try the location system packages typically install into
273         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
274
275         # check the directories for existance; first one wins
276         for dir in dirs:
277             if os.path.isdir(dir):
278                 return dir
279
280         return None
281
282     @classmethod
283     def find_api_files(cls, api_dir=None, patterns='*'):
284         """Find API definition files from the given directory tree with the
285         given pattern. If no directory is given then find_api_dir() is used
286         to locate one. If no pattern is given then all definition files found
287         in the directory tree are used.
288
289         :param api_dir: A directory tree in which to locate API definition
290             files; subdirectories are descended into.
291             If this is None then find_api_dir() is called to discover it.
292         :param patterns: A list of patterns to use in each visited directory
293             when looking for files.
294             This can be a list/tuple object or a comma-separated string of
295             patterns. Each value in the list will have leading/trialing
296             whitespace stripped.
297             The pattern specifies the first part of the filename, '.api.json'
298             is appended.
299             The results are de-duplicated, thus overlapping patterns are fine.
300             If this is None it defaults to '*' meaning "all API files".
301         :returns: A list of file paths for the API files found.
302         """
303         if api_dir is None:
304             api_dir = cls.find_api_dir()
305             if api_dir is None:
306                 raise RuntimeError("api_dir cannot be located")
307
308         if isinstance(patterns, list) or isinstance(patterns, tuple):
309             patterns = [p.strip() + '.api.json' for p in patterns]
310         else:
311             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
312
313         api_files = []
314         for root, dirnames, files in os.walk(api_dir):
315             # iterate all given patterns and de-dup the result
316             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
317             for filename in files:
318                 api_files.append(os.path.join(root, filename))
319
320         return api_files
321
322     def status(self):
323         """Debug function: report current VPP API status to stdout."""
324         print('Connected') if self.connected else print('Not Connected')
325         print('Read API definitions from', ', '.join(self.apifiles))
326
327     def __struct(self, t, n=None, e=-1, vl=None):
328         """Create a packing structure for a message."""
329         base_types = {'u8': 'B',
330                       'u16': 'H',
331                       'u32': 'I',
332                       'i32': 'i',
333                       'u64': 'Q',
334                       'f64': 'd', }
335         pack = None
336         if t in base_types:
337             pack = base_types[t]
338             if not vl:
339                 if e > 0 and t == 'u8':
340                     # Fixed byte array
341                     s = struct.Struct('>' + str(e) + 's')
342                     return s.size, s
343                 if e > 0:
344                     # Fixed array of base type
345                     s = struct.Struct('>' + base_types[t])
346                     return s.size, [e, s]
347                 elif e == 0:
348                     # Old style variable array
349                     s = struct.Struct('>' + base_types[t])
350                     return s.size, [-1, s]
351             else:
352                 # Variable length array
353                 if t == 'u8':
354                     s = struct.Struct('>s')
355                     return s.size, [vl, s]
356                 else:
357                     s = struct.Struct('>' + base_types[t])
358                 return s.size, [vl, s]
359
360             s = struct.Struct('>' + base_types[t])
361             return s.size, s
362
363         if t in self.messages:
364             size = self.messages[t]['sizes'][0]
365
366             # Return a list in case of array
367             if e > 0 and not vl:
368                 return size, [e, lambda self, encode, buf, offset, args: (
369                     self.__struct_type(encode, self.messages[t], buf, offset,
370                                        args))]
371             if vl:
372                 return size, [vl, lambda self, encode, buf, offset, args: (
373                     self.__struct_type(encode, self.messages[t], buf, offset,
374                                        args))]
375             elif e == 0:
376                 # Old style VLA
377                 raise NotImplementedError(1,
378                                           'No support for compound types ' + t)
379             return size, lambda self, encode, buf, offset, args: (
380                 self.__struct_type(encode, self.messages[t], buf, offset, args)
381             )
382
383         raise ValueError(1, 'Invalid message type: ' + t)
384
385     def __struct_type(self, encode, msgdef, buf, offset, kwargs):
386         """Get a message packer or unpacker."""
387         if encode:
388             return self.__struct_type_encode(msgdef, buf, offset, kwargs)
389         else:
390             return self.__struct_type_decode(msgdef, buf, offset)
391
392     def __struct_type_encode(self, msgdef, buf, offset, kwargs):
393         off = offset
394         size = 0
395
396         for k in kwargs:
397             if k not in msgdef['args']:
398                 raise ValueError(1,'Non existing argument [' + k + ']' + \
399                                  ' used in call to: ' + \
400                                  self.id_names[kwargs['_vl_msg_id']] + '()' )
401
402         for k, v in vpp_iterator(msgdef['args']):
403             off += size
404             if k in kwargs:
405                 if type(v) is list:
406                     if callable(v[1]):
407                         e = kwargs[v[0]] if v[0] in kwargs else v[0]
408                         if e != len(kwargs[k]):
409                             raise (ValueError(1, 'Input list length mismatch: %s (%s != %s)' %  (k, e, len(kwargs[k]))))
410                         size = 0
411                         for i in range(e):
412                             size += v[1](self, True, buf, off + size,
413                                          kwargs[k][i])
414                     else:
415                         if v[0] in kwargs:
416                             l = kwargs[v[0]]
417                             if l != len(kwargs[k]):
418                                 raise ValueError(1, 'Input list length mismatch: %s (%s != %s)' % (k, l, len(kwargs[k])))
419                         else:
420                             l = len(kwargs[k])
421                         if v[1].size == 1:
422                             buf[off:off + l] = bytearray(kwargs[k])
423                             size = l
424                         else:
425                             size = 0
426                             for i in kwargs[k]:
427                                 v[1].pack_into(buf, off + size, i)
428                                 size += v[1].size
429                 else:
430                     if callable(v):
431                         size = v(self, True, buf, off, kwargs[k])
432                     else:
433                         if type(kwargs[k]) is str and v.size < len(kwargs[k]):
434                             raise ValueError(1, 'Input list length mismatch: %s (%s < %s)' % (k, v.size, len(kwargs[k])))
435                         v.pack_into(buf, off, kwargs[k])
436                         size = v.size
437             else:
438                 size = v.size if not type(v) is list else 0
439
440         return off + size - offset
441
442     def __getitem__(self, name):
443         if name in self.messages:
444             return self.messages[name]
445         return None
446
447     def get_size(self, sizes, kwargs):
448         total_size = sizes[0]
449         for e in sizes[1]:
450             if e in kwargs and type(kwargs[e]) is list:
451                 total_size += len(kwargs[e]) * sizes[1][e]
452         return total_size
453
454     def encode(self, msgdef, kwargs):
455         # Make suitably large buffer
456         size = self.get_size(msgdef['sizes'], kwargs)
457         buf = bytearray(size)
458         offset = 0
459         size = self.__struct_type(True, msgdef, buf, offset, kwargs)
460         return buf[:offset + size]
461
462     def decode(self, msgdef, buf):
463         return self.__struct_type(False, msgdef, buf, 0, None)[1]
464
465     def __struct_type_decode(self, msgdef, buf, offset):
466         res = []
467         off = offset
468         size = 0
469         for k, v in vpp_iterator(msgdef['args']):
470             off += size
471             if type(v) is list:
472                 lst = []
473                 if callable(v[1]):  # compound type
474                     size = 0
475                     if v[0] in msgdef['args']:  # vla
476                         e = res[v[2]]
477                     else:  # fixed array
478                         e = v[0]
479                     res.append(lst)
480                     for i in range(e):
481                         (s, l) = v[1](self, False, buf, off + size, None)
482                         lst.append(l)
483                         size += s
484                     continue
485                 if v[1].size == 1:
486                     if type(v[0]) is int:
487                         size = len(buf) - off
488                     else:
489                         size = res[v[2]]
490                     res.append(buf[off:off + size])
491                 else:
492                     e = v[0] if type(v[0]) is int else res[v[2]]
493                     if e == -1:
494                         e = (len(buf) - off) / v[1].size
495                     lst = []
496                     res.append(lst)
497                     size = 0
498                     for i in range(e):
499                         lst.append(v[1].unpack_from(buf, off + size)[0])
500                         size += v[1].size
501             else:
502                 if callable(v):
503                     size = 0
504                     (s, l) = v(self, False, buf, off, None)
505                     res.append(l)
506                     size += s
507                 else:
508                     res.append(v.unpack_from(buf, off)[0])
509                     size = v.size
510
511         return off + size - offset, msgdef['return_tuple']._make(res)
512
513     def ret_tup(self, name):
514         if name in self.messages and 'return_tuple' in self.messages[name]:
515             return self.messages[name]['return_tuple']
516         return None
517
518     def add_message(self, name, msgdef, typeonly=False):
519         if name in self.messages:
520             raise ValueError('Duplicate message name: ' + name)
521
522         args = collections.OrderedDict()
523         argtypes = collections.OrderedDict()
524         fields = []
525         msg = {}
526         total_size = 0
527         sizes = {}
528         for i, f in enumerate(msgdef):
529             if type(f) is dict and 'crc' in f:
530                 msg['crc'] = f['crc']
531                 continue
532             field_type = f[0]
533             field_name = f[1]
534             if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
535                 raise ValueError('Variable Length Array must be last: ' + name)
536             size, s = self.__struct(*f)
537             args[field_name] = s
538             if type(s) == list and type(s[0]) == int and type(s[1]) == struct.Struct:
539                 if s[0] < 0:
540                     sizes[field_name] = size
541                 else:
542                     sizes[field_name] = size
543                     total_size += s[0] * size
544             else:
545                 sizes[field_name] = size
546                 total_size += size
547
548             argtypes[field_name] = field_type
549             if len(f) == 4:  # Find offset to # elements field
550                 idx = list(args.keys()).index(f[3]) - i
551                 args[field_name].append(idx)
552             fields.append(field_name)
553         msg['return_tuple'] = collections.namedtuple(name, fields,
554                                                      rename=True)
555         self.messages[name] = msg
556         self.messages[name]['args'] = args
557         self.messages[name]['argtypes'] = argtypes
558         self.messages[name]['typeonly'] = typeonly
559         self.messages[name]['sizes'] = [total_size, sizes]
560         return self.messages[name]
561
562     def add_type(self, name, typedef):
563         return self.add_message('vl_api_' + name + '_t', typedef,
564                                 typeonly=True)
565
566     def make_function(self, name, i, msgdef, multipart, async):
567         if (async):
568             f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
569         else:
570             f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart,
571                                                  **kwargs))
572         args = self.messages[name]['args']
573         argtypes = self.messages[name]['argtypes']
574         f.__name__ = str(name)
575         f.__doc__ = ", ".join(["%s %s" %
576                                (argtypes[k], k) for k in args.keys()])
577         return f
578
579     @property
580     def api(self):
581         if not hasattr(self, "_api"):
582             raise Exception("Not connected, api definitions not available")
583         return self._api
584
585     def _register_functions(self, async=False):
586         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
587         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
588         self._api = Empty()
589         for name, msgdef in vpp_iterator(self.messages):
590             if self.messages[name]['typeonly']:
591                 continue
592             crc = self.messages[name]['crc']
593             n = name + '_' + crc[2:]
594             i = vpp_api.vac_get_msg_index(n.encode())
595             if i > 0:
596                 self.id_msgdef[i] = msgdef
597                 self.id_names[i] = name
598                 multipart = True if name.find('_dump') > 0 else False
599                 f = self.make_function(name, i, msgdef, multipart, async)
600                 setattr(self._api, name, FuncWrapper(f))
601
602                 # old API stuff starts here - will be removed in 17.07
603                 if hasattr(self, name):
604                     raise NameError(
605                         3, "Conflicting name in JSON definition: `%s'" % name)
606                 setattr(self, name, f)
607                 # old API stuff ends here
608             else:
609                 self.logger.debug(
610                     'No such message type or failed CRC checksum: %s', n)
611
612     def _write_new_cffi(self, buf):
613         """Send a binary-packed message to VPP."""
614         if not self.connected:
615             raise IOError(1, 'Not connected')
616         return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
617
618     def _write_legacy_cffi(self, buf):
619         """Send a binary-packed message to VPP."""
620         if not self.connected:
621             raise IOError(1, 'Not connected')
622         return vpp_api.vac_write(str(buf), len(buf))
623
624     def _read(self):
625         if not self.connected:
626             raise IOError(1, 'Not connected')
627         mem = ffi.new("char **")
628         size = ffi.new("int *")
629         rv = vpp_api.vac_read(mem, size, self.read_timeout)
630         if rv:
631             raise IOError(rv, 'vac_read failed')
632         msg = bytes(ffi.buffer(mem[0], size[0]))
633         vpp_api.vac_free(mem[0])
634         return msg
635
636     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
637                          async):
638         pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
639         rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
640         if rv != 0:
641             raise IOError(2, 'Connect failed')
642         self.connected = True
643
644         self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
645         self._register_functions(async=async)
646
647         # Initialise control ping
648         crc = self.messages['control_ping']['crc']
649         self.control_ping_index = vpp_api.vac_get_msg_index(
650             ('control_ping' + '_' + crc[2:]).encode())
651         self.control_ping_msgdef = self.messages['control_ping']
652         return rv
653
654     def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
655         """Attach to VPP.
656
657         name - the name of the client.
658         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
659         async - if true, messages are sent without waiting for a reply
660         rx_qlen - the length of the VPP message receive queue between
661         client and server.
662         """
663         msg_handler = vac_callback_sync if not async else vac_callback_async
664         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
665                                      async)
666
667     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
668         """Attach to VPP in synchronous mode. Application must poll for events.
669
670         name - the name of the client.
671         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
672         rx_qlen - the length of the VPP message receive queue between
673         client and server.
674         """
675
676         return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
677                                      async=False)
678
679     def disconnect(self):
680         """Detach from VPP."""
681         rv = vpp_api.vac_disconnect()
682         self.connected = False
683         return rv
684
685     def msg_handler_sync(self, msg):
686         """Process an incoming message from VPP in sync mode.
687
688         The message may be a reply or it may be an async notification.
689         """
690         r = self.decode_incoming_msg(msg)
691         if r is None:
692             return
693
694         # If we have a context, then use the context to find any
695         # request waiting for a reply
696         context = 0
697         if hasattr(r, 'context') and r.context > 0:
698             context = r.context
699
700         msgname = type(r).__name__
701
702         if context == 0:
703             # No context -> async notification that we feed to the callback
704             self.message_queue.put_nowait(r)
705         else:
706             raise IOError(2, 'RPC reply message received in event handler')
707
708     def decode_incoming_msg(self, msg):
709         if not msg:
710             self.logger.warning('vpp_api.read failed')
711             return
712
713         i, ci = self.header.unpack_from(msg, 0)
714         if self.id_names[i] == 'rx_thread_exit':
715             return
716
717         #
718         # Decode message and returns a tuple.
719         #
720         msgdef = self.id_msgdef[i]
721         if not msgdef:
722             raise IOError(2, 'Reply message undefined')
723
724         r = self.decode(msgdef, msg)
725
726         return r
727
728     def msg_handler_async(self, msg):
729         """Process a message from VPP in async mode.
730
731         In async mode, all messages are returned to the callback.
732         """
733         r = self.decode_incoming_msg(msg)
734         if r is None:
735             return
736
737         msgname = type(r).__name__
738
739         if self.event_callback:
740             self.event_callback(msgname, r)
741
742     def _control_ping(self, context):
743         """Send a ping command."""
744         self._call_vpp_async(self.control_ping_index,
745                              self.control_ping_msgdef,
746                              context=context)
747
748     def _call_vpp(self, i, msgdef, multipart, **kwargs):
749         """Given a message, send the message and await a reply.
750
751         msgdef - the message packing definition
752         i - the message type index
753         multipart - True if the message returns multiple
754         messages in return.
755         context - context number - chosen at random if not
756         supplied.
757         The remainder of the kwargs are the arguments to the API call.
758
759         The return value is the message or message array containing
760         the response.  It will raise an IOError exception if there was
761         no response within the timeout window.
762         """
763
764         if 'context' not in kwargs:
765             context = self.get_context()
766             kwargs['context'] = context
767         else:
768             context = kwargs['context']
769         kwargs['_vl_msg_id'] = i
770         b = self.encode(msgdef, kwargs)
771
772         vpp_api.vac_rx_suspend()
773         self._write(b)
774
775         if multipart:
776             # Send a ping after the request - we use its response
777             # to detect that we have seen all results.
778             self._control_ping(context)
779
780         # Block until we get a reply.
781         rl = []
782         while (True):
783             msg = self._read()
784             if not msg:
785                 raise IOError(2, 'VPP API client: read failed')
786
787             r = self.decode_incoming_msg(msg)
788             msgname = type(r).__name__
789             if context not in r or r.context == 0 or context != r.context:
790                 self.message_queue.put_nowait(r)
791                 continue
792
793             if not multipart:
794                 rl = r
795                 break
796             if msgname == 'control_ping_reply':
797                 break
798
799             rl.append(r)
800
801         vpp_api.vac_rx_resume()
802
803         return rl
804
805     def _call_vpp_async(self, i, msgdef, **kwargs):
806         """Given a message, send the message and await a reply.
807
808         msgdef - the message packing definition
809         i - the message type index
810         context - context number - chosen at random if not
811         supplied.
812         The remainder of the kwargs are the arguments to the API call.
813         """
814         if 'context' not in kwargs:
815             context = self.get_context()
816             kwargs['context'] = context
817         else:
818             context = kwargs['context']
819         kwargs['_vl_msg_id'] = i
820         b = self.encode(msgdef, kwargs)
821
822         self._write(b)
823
824     def register_event_callback(self, callback):
825         """Register a callback for async messages.
826
827         This will be called for async notifications in sync mode,
828         and all messages in async mode.  In sync mode, replies to
829         requests will not come here.
830
831         callback is a fn(msg_type_name, msg_type) that will be
832         called when a message comes in.  While this function is
833         executing, note that (a) you are in a background thread and
834         may wish to use threading.Lock to protect your datastructures,
835         and (b) message processing from VPP will stop (so if you take
836         a long while about it you may provoke reply timeouts or cause
837         VPP to fill the RX buffer).  Passing None will disable the
838         callback.
839         """
840         self.event_callback = callback
841
842     def thread_msg_handler(self):
843         """Python thread calling the user registerd message handler.
844
845         This is to emulate the old style event callback scheme. Modern
846         clients should provide their own thread to poll the event
847         queue.
848         """
849         while True:
850             r = self.message_queue.get()
851             msgname = type(r).__name__
852             if self.event_callback:
853                 self.event_callback(msgname, r)
854
855
856 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4