5fa82a30acb186ab2a7383905521ec1512cc3c76
[vpp.git] / src / vpp-api / python / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 import sys
19 import os
20 import logging
21 import collections
22 import struct
23 import json
24 import threading
25 import fnmatch
26 import weakref
27 import atexit
28 from cffi import FFI
29 import cffi
30
31 if sys.version[0] == '2':
32     import Queue as queue
33 else:
34     import queue as queue
35
36 ffi = FFI()
37 ffi.cdef("""
38 typedef void (*vac_callback_t)(unsigned char * data, int len);
39 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
40 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
41     int rx_qlen);
42 int vac_disconnect(void);
43 int vac_read(char **data, int *l, unsigned short timeout);
44 int vac_write(char *data, int len);
45 void vac_free(void * msg);
46
47 int vac_get_msg_index(unsigned char * name);
48 int vac_msg_table_size(void);
49 int vac_msg_table_max_index(void);
50
51 void vac_rx_suspend (void);
52 void vac_rx_resume (void);
53 void vac_set_error_handler(vac_error_callback_t);
54  """)
55
56 # Barfs on failure, no need to check success.
57 vpp_api = ffi.dlopen('libvppapiclient.so')
58
59
60 def vpp_atexit(vpp_weakref):
61     """Clean up VPP connection on shutdown."""
62     vpp_instance = vpp_weakref()
63     if vpp_instance.connected:
64         vpp_instance.logger.debug('Cleaning up VPP on exit')
65         vpp_instance.disconnect()
66
67
68 vpp_object = None
69
70
71 def vpp_iterator(d):
72     if sys.version[0] == '2':
73         return d.iteritems()
74     else:
75         return d.items()
76
77
78 @ffi.callback("void(unsigned char *, int)")
79 def vac_callback_sync(data, len):
80     vpp_object.msg_handler_sync(ffi.buffer(data, len))
81
82
83 @ffi.callback("void(unsigned char *, int)")
84 def vac_callback_async(data, len):
85     vpp_object.msg_handler_async(ffi.buffer(data, len))
86
87
88 @ffi.callback("void(void *, unsigned char *, int)")
89 def vac_error_handler(arg, msg, msg_len):
90     vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
91
92
93 class Empty(object):
94     pass
95
96
97 class FuncWrapper(object):
98     def __init__(self, func):
99         self._func = func
100         self.__name__ = func.__name__
101
102     def __call__(self, **kwargs):
103         return self._func(**kwargs)
104
105
106 class VPP():
107     """VPP interface.
108
109     This class provides the APIs to VPP.  The APIs are loaded
110     from provided .api.json files and makes functions accordingly.
111     These functions are documented in the VPP .api files, as they
112     are dynamically created.
113
114     Additionally, VPP can send callback messages; this class
115     provides a means to register a callback function to receive
116     these messages in a background thread.
117     """
118     def __init__(self, apifiles=None, testmode=False, async_thread=True,
119                  logger=logging.getLogger('vpp_papi'), loglevel='debug',
120                  read_timeout=0):
121         """Create a VPP API object.
122
123         apifiles is a list of files containing API
124         descriptions that will be loaded - methods will be
125         dynamically created reflecting these APIs.  If not
126         provided this will load the API files from VPP's
127         default install location.
128         """
129         global vpp_object
130         vpp_object = self
131         self.logger = logger
132         logging.basicConfig(level=getattr(logging, loglevel.upper()))
133
134         self.messages = {}
135         self.id_names = []
136         self.id_msgdef = []
137         self.connected = False
138         self.header = struct.Struct('>HI')
139         self.apifiles = []
140         self.event_callback = None
141         self.message_queue = queue.Queue()
142         self.read_timeout = read_timeout
143         self.vpp_api = vpp_api
144         self.async_thread = async_thread
145
146         if not apifiles:
147             # Pick up API definitions from default directory
148             try:
149                 apifiles = self.find_api_files()
150             except RuntimeError:
151                 # In test mode we don't care that we can't find the API files
152                 if testmode:
153                     apifiles = []
154                 else:
155                     raise
156
157         for file in apifiles:
158             with open(file) as apidef_file:
159                 api = json.load(apidef_file)
160                 for t in api['types']:
161                     self.add_type(t[0], t[1:])
162
163                 for m in api['messages']:
164                     self.add_message(m[0], m[1:])
165         self.apifiles = apifiles
166
167         # Basic sanity check
168         if len(self.messages) == 0 and not testmode:
169             raise ValueError(1, 'Missing JSON message definitions')
170
171         # Make sure we allow VPP to clean up the message rings.
172         atexit.register(vpp_atexit, weakref.ref(self))
173
174         # Register error handler
175         vpp_api.vac_set_error_handler(vac_error_handler)
176
177         # Support legacy CFFI
178         # from_buffer supported from 1.8.0
179         (major, minor, patch) = [int(s) for s in
180                                  cffi.__version__.split('.', 3)]
181         if major >= 1 and minor >= 8:
182             self._write = self._write_new_cffi
183         else:
184             self._write = self._write_legacy_cffi
185
186     class ContextId(object):
187         """Thread-safe provider of unique context IDs."""
188         def __init__(self):
189             self.context = 0
190             self.lock = threading.Lock()
191
192         def __call__(self):
193             """Get a new unique (or, at least, not recently used) context."""
194             with self.lock:
195                 self.context += 1
196                 return self.context
197     get_context = ContextId()
198
199     @classmethod
200     def find_api_dir(cls):
201         """Attempt to find the best directory in which API definition
202         files may reside. If the value VPP_API_DIR exists in the environment
203         then it is first on the search list. If we're inside a recognized
204         location in a VPP source tree (src/scripts and src/vpp-api/python)
205         then entries from there to the likely locations in build-root are
206         added. Finally the location used by system packages is added.
207
208         :returns: A single directory name, or None if no such directory
209             could be found.
210         """
211         dirs = []
212
213         if 'VPP_API_DIR' in os.environ:
214             dirs.append(os.environ['VPP_API_DIR'])
215
216         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
217         # in which case, plot a course to likely places in the src tree
218         import __main__ as main
219         if hasattr(main, '__file__'):
220             # get the path of the calling script
221             localdir = os.path.dirname(os.path.realpath(main.__file__))
222         else:
223             # use cwd if there is no calling script
224             localdir = os.cwd()
225         localdir_s = localdir.split(os.path.sep)
226
227         def dmatch(dir):
228             """Match dir against right-hand components of the script dir"""
229             d = dir.split('/')  # param 'dir' assumes a / separator
230             length = len(d)
231             return len(localdir_s) > length and localdir_s[-length:] == d
232
233         def sdir(srcdir, variant):
234             """Build a path from srcdir to the staged API files of
235             'variant'  (typically '' or '_debug')"""
236             # Since 'core' and 'plugin' files are staged
237             # in separate directories, we target the parent dir.
238             return os.path.sep.join((
239                 srcdir,
240                 'build-root',
241                 'install-vpp%s-native' % variant,
242                 'vpp',
243                 'share',
244                 'vpp',
245                 'api',
246             ))
247
248         srcdir = None
249         if dmatch('src/scripts'):
250             srcdir = os.path.sep.join(localdir_s[:-2])
251         elif dmatch('src/vpp-api/python'):
252             srcdir = os.path.sep.join(localdir_s[:-3])
253         elif dmatch('test'):
254             # we're apparently running tests
255             srcdir = os.path.sep.join(localdir_s[:-1])
256
257         if srcdir:
258             # we're in the source tree, try both the debug and release
259             # variants.
260             dirs.append(sdir(srcdir, '_debug'))
261             dirs.append(sdir(srcdir, ''))
262
263         # Test for staged copies of the scripts
264         # For these, since we explicitly know if we're running a debug versus
265         # release variant, target only the relevant directory
266         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
267             srcdir = os.path.sep.join(localdir_s[:-4])
268             dirs.append(sdir(srcdir, '_debug'))
269         if dmatch('build-root/install-vpp-native/vpp/bin'):
270             srcdir = os.path.sep.join(localdir_s[:-4])
271             dirs.append(sdir(srcdir, ''))
272
273         # finally, try the location system packages typically install into
274         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
275
276         # check the directories for existance; first one wins
277         for dir in dirs:
278             if os.path.isdir(dir):
279                 return dir
280
281         return None
282
283     @classmethod
284     def find_api_files(cls, api_dir=None, patterns='*'):
285         """Find API definition files from the given directory tree with the
286         given pattern. If no directory is given then find_api_dir() is used
287         to locate one. If no pattern is given then all definition files found
288         in the directory tree are used.
289
290         :param api_dir: A directory tree in which to locate API definition
291             files; subdirectories are descended into.
292             If this is None then find_api_dir() is called to discover it.
293         :param patterns: A list of patterns to use in each visited directory
294             when looking for files.
295             This can be a list/tuple object or a comma-separated string of
296             patterns. Each value in the list will have leading/trialing
297             whitespace stripped.
298             The pattern specifies the first part of the filename, '.api.json'
299             is appended.
300             The results are de-duplicated, thus overlapping patterns are fine.
301             If this is None it defaults to '*' meaning "all API files".
302         :returns: A list of file paths for the API files found.
303         """
304         if api_dir is None:
305             api_dir = cls.find_api_dir()
306             if api_dir is None:
307                 raise RuntimeError("api_dir cannot be located")
308
309         if isinstance(patterns, list) or isinstance(patterns, tuple):
310             patterns = [p.strip() + '.api.json' for p in patterns]
311         else:
312             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
313
314         api_files = []
315         for root, dirnames, files in os.walk(api_dir):
316             # iterate all given patterns and de-dup the result
317             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
318             for filename in files:
319                 api_files.append(os.path.join(root, filename))
320
321         return api_files
322
323     def status(self):
324         """Debug function: report current VPP API status to stdout."""
325         print('Connected') if self.connected else print('Not Connected')
326         print('Read API definitions from', ', '.join(self.apifiles))
327
328     def __struct(self, t, n=None, e=-1, vl=None):
329         """Create a packing structure for a message."""
330         base_types = {'u8': 'B',
331                       'u16': 'H',
332                       'u32': 'I',
333                       'i32': 'i',
334                       'u64': 'Q',
335                       'f64': 'd', }
336
337         if t in base_types:
338             if not vl:
339                 if e > 0 and t == 'u8':
340                     # Fixed byte array
341                     s = struct.Struct('>' + str(e) + 's')
342                     return s.size, s
343                 if e > 0:
344                     # Fixed array of base type
345                     s = struct.Struct('>' + base_types[t])
346                     return s.size, [e, s]
347                 elif e == 0:
348                     # Old style variable array
349                     s = struct.Struct('>' + base_types[t])
350                     return s.size, [-1, s]
351             else:
352                 # Variable length array
353                 if t == 'u8':
354                     s = struct.Struct('>s')
355                     return s.size, [vl, s]
356                 else:
357                     s = struct.Struct('>' + base_types[t])
358                 return s.size, [vl, s]
359
360             s = struct.Struct('>' + base_types[t])
361             return s.size, s
362
363         if t in self.messages:
364             size = self.messages[t]['sizes'][0]
365
366             # Return a list in case of array
367             if e > 0 and not vl:
368                 return size, [e, lambda self, encode, buf, offset, args: (
369                     self.__struct_type(encode, self.messages[t], buf, offset,
370                                        args))]
371             if vl:
372                 return size, [vl, lambda self, encode, buf, offset, args: (
373                     self.__struct_type(encode, self.messages[t], buf, offset,
374                                        args))]
375             elif e == 0:
376                 # Old style VLA
377                 raise NotImplementedError(1,
378                                           'No support for compound types ' + t)
379             return size, lambda self, encode, buf, offset, args: (
380                 self.__struct_type(encode, self.messages[t], buf, offset, args)
381             )
382
383         raise ValueError(1, 'Invalid message type: ' + t)
384
385     def __struct_type(self, encode, msgdef, buf, offset, kwargs):
386         """Get a message packer or unpacker."""
387         if encode:
388             return self.__struct_type_encode(msgdef, buf, offset, kwargs)
389         else:
390             return self.__struct_type_decode(msgdef, buf, offset)
391
392     def __struct_type_encode(self, msgdef, buf, offset, kwargs):
393         off = offset
394         size = 0
395
396         for k in kwargs:
397             if k not in msgdef['args']:
398                 raise ValueError(1, 'Non existing argument [' + k + ']' +
399                                     ' used in call to: ' +
400                                  self.id_names[kwargs['_vl_msg_id']] + '()')
401
402         for k, v in vpp_iterator(msgdef['args']):
403             off += size
404             if k in kwargs:
405                 if type(v) is list:
406                     if callable(v[1]):
407                         e = kwargs[v[0]] if v[0] in kwargs else v[0]
408                         if e != len(kwargs[k]):
409                             raise (ValueError(1,
410                                               'Input list length mismatch: '
411                                               '%s (%s != %s)' %
412                                               (k, e, len(kwargs[k]))))
413                         size = 0
414                         for i in range(e):
415                             size += v[1](self, True, buf, off + size,
416                                          kwargs[k][i])
417                     else:
418                         if v[0] in kwargs:
419                             kwargslen = kwargs[v[0]]
420                             if kwargslen != len(kwargs[k]):
421                                 raise ValueError(1,
422                                                  'Input list length mismatch:'
423                                                  ' %s (%s != %s)' %
424                                                  (k, kwargslen,
425                                                   len(kwargs[k])))
426                         else:
427                             kwargslen = len(kwargs[k])
428                         if v[1].size == 1:
429                             buf[off:off + kwargslen] = bytearray(kwargs[k])
430                             size = kwargslen
431                         else:
432                             size = 0
433                             for i in kwargs[k]:
434                                 v[1].pack_into(buf, off + size, i)
435                                 size += v[1].size
436                 else:
437                     if callable(v):
438                         size = v(self, True, buf, off, kwargs[k])
439                     else:
440                         if type(kwargs[k]) is str and v.size < len(kwargs[k]):
441                             raise ValueError(1,
442                                              'Input list length mismatch: '
443                                              '%s (%s < %s)' %
444                                              (k, v.size, len(kwargs[k])))
445                         v.pack_into(buf, off, kwargs[k])
446                         size = v.size
447             else:
448                 size = v.size if not type(v) is list else 0
449
450         return off + size - offset
451
452     def __getitem__(self, name):
453         if name in self.messages:
454             return self.messages[name]
455         return None
456
457     def get_size(self, sizes, kwargs):
458         total_size = sizes[0]
459         for e in sizes[1]:
460             if e in kwargs and type(kwargs[e]) is list:
461                 total_size += len(kwargs[e]) * sizes[1][e]
462         return total_size
463
464     def encode(self, msgdef, kwargs):
465         # Make suitably large buffer
466         size = self.get_size(msgdef['sizes'], kwargs)
467         buf = bytearray(size)
468         offset = 0
469         size = self.__struct_type(True, msgdef, buf, offset, kwargs)
470         return buf[:offset + size]
471
472     def decode(self, msgdef, buf):
473         return self.__struct_type(False, msgdef, buf, 0, None)[1]
474
475     def __struct_type_decode(self, msgdef, buf, offset):
476         res = []
477         off = offset
478         size = 0
479         for k, v in vpp_iterator(msgdef['args']):
480             off += size
481             if type(v) is list:
482                 lst = []
483                 if callable(v[1]):  # compound type
484                     size = 0
485                     if v[0] in msgdef['args']:  # vla
486                         e = res[v[2]]
487                     else:  # fixed array
488                         e = v[0]
489                     res.append(lst)
490                     for i in range(e):
491                         (s, l) = v[1](self, False, buf, off + size, None)
492                         lst.append(l)
493                         size += s
494                     continue
495                 if v[1].size == 1:
496                     if type(v[0]) is int:
497                         size = len(buf) - off
498                     else:
499                         size = res[v[2]]
500                     res.append(buf[off:off + size])
501                 else:
502                     e = v[0] if type(v[0]) is int else res[v[2]]
503                     if e == -1:
504                         e = (len(buf) - off) / v[1].size
505                     lst = []
506                     res.append(lst)
507                     size = 0
508                     for i in range(e):
509                         lst.append(v[1].unpack_from(buf, off + size)[0])
510                         size += v[1].size
511             else:
512                 if callable(v):
513                     size = 0
514                     (s, l) = v(self, False, buf, off, None)
515                     res.append(l)
516                     size += s
517                 else:
518                     res.append(v.unpack_from(buf, off)[0])
519                     size = v.size
520
521         return off + size - offset, msgdef['return_tuple']._make(res)
522
523     def ret_tup(self, name):
524         if name in self.messages and 'return_tuple' in self.messages[name]:
525             return self.messages[name]['return_tuple']
526         return None
527
528     def duplicate_check_ok(self, name, msgdef):
529         crc = None
530         for c in msgdef:
531             if type(c) is dict and 'crc' in c:
532                 crc = c['crc']
533                 break
534         if crc:
535             # We can get duplicates because of imports
536             if crc == self.messages[name]['crc']:
537                 return True
538         return False
539
540     def add_message(self, name, msgdef, typeonly=False):
541         if name in self.messages:
542             if typeonly:
543                 if self.duplicate_check_ok(name, msgdef):
544                     return
545             raise ValueError('Duplicate message name: ' + name)
546
547         args = collections.OrderedDict()
548         argtypes = collections.OrderedDict()
549         fields = []
550         msg = {}
551         total_size = 0
552         sizes = {}
553         for i, f in enumerate(msgdef):
554             if type(f) is dict and 'crc' in f:
555                 msg['crc'] = f['crc']
556                 continue
557             field_type = f[0]
558             field_name = f[1]
559             if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
560                 raise ValueError('Variable Length Array must be last: ' + name)
561             size, s = self.__struct(*f)
562             args[field_name] = s
563             if type(s) == list and type(s[0]) == int and \
564                type(s[1]) == struct.Struct:
565                 if s[0] < 0:
566                     sizes[field_name] = size
567                 else:
568                     sizes[field_name] = size
569                     total_size += s[0] * size
570             else:
571                 sizes[field_name] = size
572                 total_size += size
573
574             argtypes[field_name] = field_type
575             if len(f) == 4:  # Find offset to # elements field
576                 idx = list(args.keys()).index(f[3]) - i
577                 args[field_name].append(idx)
578             fields.append(field_name)
579         msg['return_tuple'] = collections.namedtuple(name, fields,
580                                                      rename=True)
581         self.messages[name] = msg
582         self.messages[name]['args'] = args
583         self.messages[name]['argtypes'] = argtypes
584         self.messages[name]['typeonly'] = typeonly
585         self.messages[name]['sizes'] = [total_size, sizes]
586         return self.messages[name]
587
588     def add_type(self, name, typedef):
589         return self.add_message('vl_api_' + name + '_t', typedef,
590                                 typeonly=True)
591
592     def make_function(self, name, i, msgdef, multipart, async):
593         if (async):
594             def f(**kwargs):
595                 return self._call_vpp_async(i, msgdef, **kwargs)
596         else:
597             def f(**kwargs):
598                 return self._call_vpp(i, msgdef, multipart, **kwargs)
599         args = self.messages[name]['args']
600         argtypes = self.messages[name]['argtypes']
601         f.__name__ = str(name)
602         f.__doc__ = ", ".join(["%s %s" %
603                                (argtypes[k], k) for k in args.keys()])
604         return f
605
606     @property
607     def api(self):
608         if not hasattr(self, "_api"):
609             raise Exception("Not connected, api definitions not available")
610         return self._api
611
612     def _register_functions(self, async=False):
613         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
614         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
615         self._api = Empty()
616         for name, msgdef in vpp_iterator(self.messages):
617             if self.messages[name]['typeonly']:
618                 continue
619             crc = self.messages[name]['crc']
620             n = name + '_' + crc[2:]
621             i = vpp_api.vac_get_msg_index(n.encode())
622             if i > 0:
623                 self.id_msgdef[i] = msgdef
624                 self.id_names[i] = name
625                 multipart = True if name.find('_dump') > 0 else False
626                 f = self.make_function(name, i, msgdef, multipart, async)
627                 setattr(self._api, name, FuncWrapper(f))
628             else:
629                 self.logger.debug(
630                     'No such message type or failed CRC checksum: %s', n)
631
632     def _write_new_cffi(self, buf):
633         """Send a binary-packed message to VPP."""
634         if not self.connected:
635             raise IOError(1, 'Not connected')
636         return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
637
638     def _write_legacy_cffi(self, buf):
639         """Send a binary-packed message to VPP."""
640         if not self.connected:
641             raise IOError(1, 'Not connected')
642         return vpp_api.vac_write(bytes(buf), len(buf))
643
644     def _read(self):
645         if not self.connected:
646             raise IOError(1, 'Not connected')
647         mem = ffi.new("char **")
648         size = ffi.new("int *")
649         rv = vpp_api.vac_read(mem, size, self.read_timeout)
650         if rv:
651             raise IOError(rv, 'vac_read failed')
652         msg = bytes(ffi.buffer(mem[0], size[0]))
653         vpp_api.vac_free(mem[0])
654         return msg
655
656     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
657                          async):
658         pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
659         rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
660         if rv != 0:
661             raise IOError(2, 'Connect failed')
662         self.connected = True
663
664         self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
665         self._register_functions(async=async)
666
667         # Initialise control ping
668         crc = self.messages['control_ping']['crc']
669         self.control_ping_index = vpp_api.vac_get_msg_index(
670             ('control_ping' + '_' + crc[2:]).encode())
671         self.control_ping_msgdef = self.messages['control_ping']
672         if self.async_thread:
673             self.event_thread = threading.Thread(
674                 target=self.thread_msg_handler)
675             self.event_thread.daemon = True
676             self.event_thread.start()
677         return rv
678
679     def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
680         """Attach to VPP.
681
682         name - the name of the client.
683         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
684         async - if true, messages are sent without waiting for a reply
685         rx_qlen - the length of the VPP message receive queue between
686         client and server.
687         """
688         msg_handler = vac_callback_sync if not async else vac_callback_async
689         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
690                                      async)
691
692     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
693         """Attach to VPP in synchronous mode. Application must poll for events.
694
695         name - the name of the client.
696         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
697         rx_qlen - the length of the VPP message receive queue between
698         client and server.
699         """
700
701         return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
702                                      async=False)
703
704     def disconnect(self):
705         """Detach from VPP."""
706         rv = vpp_api.vac_disconnect()
707         self.connected = False
708         self.message_queue.put("terminate event thread")
709         return rv
710
711     def msg_handler_sync(self, msg):
712         """Process an incoming message from VPP in sync mode.
713
714         The message may be a reply or it may be an async notification.
715         """
716         r = self.decode_incoming_msg(msg)
717         if r is None:
718             return
719
720         # If we have a context, then use the context to find any
721         # request waiting for a reply
722         context = 0
723         if hasattr(r, 'context') and r.context > 0:
724             context = r.context
725
726         if context == 0:
727             # No context -> async notification that we feed to the callback
728             self.message_queue.put_nowait(r)
729         else:
730             raise IOError(2, 'RPC reply message received in event handler')
731
732     def decode_incoming_msg(self, msg):
733         if not msg:
734             self.logger.warning('vpp_api.read failed')
735             return
736
737         i, ci = self.header.unpack_from(msg, 0)
738         if self.id_names[i] == 'rx_thread_exit':
739             return
740
741         #
742         # Decode message and returns a tuple.
743         #
744         msgdef = self.id_msgdef[i]
745         if not msgdef:
746             raise IOError(2, 'Reply message undefined')
747
748         r = self.decode(msgdef, msg)
749
750         return r
751
752     def msg_handler_async(self, msg):
753         """Process a message from VPP in async mode.
754
755         In async mode, all messages are returned to the callback.
756         """
757         r = self.decode_incoming_msg(msg)
758         if r is None:
759             return
760
761         msgname = type(r).__name__
762
763         if self.event_callback:
764             self.event_callback(msgname, r)
765
766     def _control_ping(self, context):
767         """Send a ping command."""
768         self._call_vpp_async(self.control_ping_index,
769                              self.control_ping_msgdef,
770                              context=context)
771
772     def _call_vpp(self, i, msgdef, multipart, **kwargs):
773         """Given a message, send the message and await a reply.
774
775         msgdef - the message packing definition
776         i - the message type index
777         multipart - True if the message returns multiple
778         messages in return.
779         context - context number - chosen at random if not
780         supplied.
781         The remainder of the kwargs are the arguments to the API call.
782
783         The return value is the message or message array containing
784         the response.  It will raise an IOError exception if there was
785         no response within the timeout window.
786         """
787
788         if 'context' not in kwargs:
789             context = self.get_context()
790             kwargs['context'] = context
791         else:
792             context = kwargs['context']
793         kwargs['_vl_msg_id'] = i
794         b = self.encode(msgdef, kwargs)
795
796         vpp_api.vac_rx_suspend()
797         self._write(b)
798
799         if multipart:
800             # Send a ping after the request - we use its response
801             # to detect that we have seen all results.
802             self._control_ping(context)
803
804         # Block until we get a reply.
805         rl = []
806         while (True):
807             msg = self._read()
808             if not msg:
809                 raise IOError(2, 'VPP API client: read failed')
810
811             r = self.decode_incoming_msg(msg)
812             msgname = type(r).__name__
813             if context not in r or r.context == 0 or context != r.context:
814                 self.message_queue.put_nowait(r)
815                 continue
816
817             if not multipart:
818                 rl = r
819                 break
820             if msgname == 'control_ping_reply':
821                 break
822
823             rl.append(r)
824
825         vpp_api.vac_rx_resume()
826
827         return rl
828
829     def _call_vpp_async(self, i, msgdef, **kwargs):
830         """Given a message, send the message and await a reply.
831
832         msgdef - the message packing definition
833         i - the message type index
834         context - context number - chosen at random if not
835         supplied.
836         The remainder of the kwargs are the arguments to the API call.
837         """
838         if 'context' not in kwargs:
839             context = self.get_context()
840             kwargs['context'] = context
841         else:
842             context = kwargs['context']
843         kwargs['_vl_msg_id'] = i
844         b = self.encode(msgdef, kwargs)
845
846         self._write(b)
847
848     def register_event_callback(self, callback):
849         """Register a callback for async messages.
850
851         This will be called for async notifications in sync mode,
852         and all messages in async mode.  In sync mode, replies to
853         requests will not come here.
854
855         callback is a fn(msg_type_name, msg_type) that will be
856         called when a message comes in.  While this function is
857         executing, note that (a) you are in a background thread and
858         may wish to use threading.Lock to protect your datastructures,
859         and (b) message processing from VPP will stop (so if you take
860         a long while about it you may provoke reply timeouts or cause
861         VPP to fill the RX buffer).  Passing None will disable the
862         callback.
863         """
864         self.event_callback = callback
865
866     def thread_msg_handler(self):
867         """Python thread calling the user registerd message handler.
868
869         This is to emulate the old style event callback scheme. Modern
870         clients should provide their own thread to poll the event
871         queue.
872         """
873         while True:
874             r = self.message_queue.get()
875             if r == "terminate event thread":
876                 break
877             msgname = type(r).__name__
878             if self.event_callback:
879                 self.event_callback(msgname, r)
880
881
882 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4