Python API: Add enum and union support.
[vpp.git] / src / vpp-api / python / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 import sys
19 import os
20 import logging
21 import collections
22 import struct
23 import json
24 import threading
25 import fnmatch
26 import weakref
27 import atexit
28 from cffi import FFI
29 import cffi
30
31 if sys.version[0] == '2':
32     import Queue as queue
33 else:
34     import queue as queue
35
36 ffi = FFI()
37 ffi.cdef("""
38 typedef void (*vac_callback_t)(unsigned char * data, int len);
39 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
40 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
41     int rx_qlen);
42 int vac_disconnect(void);
43 int vac_read(char **data, int *l, unsigned short timeout);
44 int vac_write(char *data, int len);
45 void vac_free(void * msg);
46
47 int vac_get_msg_index(unsigned char * name);
48 int vac_msg_table_size(void);
49 int vac_msg_table_max_index(void);
50
51 void vac_rx_suspend (void);
52 void vac_rx_resume (void);
53 void vac_set_error_handler(vac_error_callback_t);
54  """)
55
56 # Barfs on failure, no need to check success.
57 vpp_api = ffi.dlopen('libvppapiclient.so')
58
59
60 def vpp_atexit(vpp_weakref):
61     """Clean up VPP connection on shutdown."""
62     vpp_instance = vpp_weakref()
63     if vpp_instance.connected:
64         vpp_instance.logger.debug('Cleaning up VPP on exit')
65         vpp_instance.disconnect()
66
67
68 vpp_object = None
69
70
71 def vpp_iterator(d):
72     if sys.version[0] == '2':
73         return d.iteritems()
74     else:
75         return d.items()
76
77
78 @ffi.callback("void(unsigned char *, int)")
79 def vac_callback_sync(data, len):
80     vpp_object.msg_handler_sync(ffi.buffer(data, len))
81
82
83 @ffi.callback("void(unsigned char *, int)")
84 def vac_callback_async(data, len):
85     vpp_object.msg_handler_async(ffi.buffer(data, len))
86
87
88 @ffi.callback("void(void *, unsigned char *, int)")
89 def vac_error_handler(arg, msg, msg_len):
90     vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
91
92
93 class Empty(object):
94     pass
95
96
97 class FuncWrapper(object):
98     def __init__(self, func):
99         self._func = func
100         self.__name__ = func.__name__
101
102     def __call__(self, **kwargs):
103         return self._func(**kwargs)
104
105
106 class VPP():
107     """VPP interface.
108
109     This class provides the APIs to VPP.  The APIs are loaded
110     from provided .api.json files and makes functions accordingly.
111     These functions are documented in the VPP .api files, as they
112     are dynamically created.
113
114     Additionally, VPP can send callback messages; this class
115     provides a means to register a callback function to receive
116     these messages in a background thread.
117     """
118     def __init__(self, apifiles=None, testmode=False, async_thread=True,
119                  logger=None, loglevel=None,
120                  read_timeout=0):
121         """Create a VPP API object.
122
123         apifiles is a list of files containing API
124         descriptions that will be loaded - methods will be
125         dynamically created reflecting these APIs.  If not
126         provided this will load the API files from VPP's
127         default install location.
128
129         logger, if supplied, is the logging logger object to log to.
130         loglevel, if supplied, is the log level this logger is set
131         to report at (from the loglevels in the logging module).
132         """
133         global vpp_object
134         vpp_object = self
135
136         if logger is None:
137             logger = logging.getLogger(__name__)
138             if loglevel is not None:
139                 logger.setLevel(loglevel)
140
141         self.logger = logger
142
143         self.messages = {}
144         self.id_names = []
145         self.id_msgdef = []
146         self.connected = False
147         self.header = struct.Struct('>HI')
148         self.apifiles = []
149         self.event_callback = None
150         self.message_queue = queue.Queue()
151         self.read_timeout = read_timeout
152         self.vpp_api = vpp_api
153         self.async_thread = async_thread
154
155         if not apifiles:
156             # Pick up API definitions from default directory
157             try:
158                 apifiles = self.find_api_files()
159             except RuntimeError:
160                 # In test mode we don't care that we can't find the API files
161                 if testmode:
162                     apifiles = []
163                 else:
164                     raise
165
166         for file in apifiles:
167             with open(file) as apidef_file:
168                 api = json.load(apidef_file)
169                 for t in api['types']:
170                     self.add_type(t[0], t[1:])
171
172                 for m in api['messages']:
173                     self.add_message(m[0], m[1:])
174         self.apifiles = apifiles
175
176         # Basic sanity check
177         if len(self.messages) == 0 and not testmode:
178             raise ValueError(1, 'Missing JSON message definitions')
179
180         # Make sure we allow VPP to clean up the message rings.
181         atexit.register(vpp_atexit, weakref.ref(self))
182
183         # Register error handler
184         vpp_api.vac_set_error_handler(vac_error_handler)
185
186         # Support legacy CFFI
187         # from_buffer supported from 1.8.0
188         (major, minor, patch) = [int(s) for s in
189                                  cffi.__version__.split('.', 3)]
190         if major >= 1 and minor >= 8:
191             self._write = self._write_new_cffi
192         else:
193             self._write = self._write_legacy_cffi
194
195     class ContextId(object):
196         """Thread-safe provider of unique context IDs."""
197         def __init__(self):
198             self.context = 0
199             self.lock = threading.Lock()
200
201         def __call__(self):
202             """Get a new unique (or, at least, not recently used) context."""
203             with self.lock:
204                 self.context += 1
205                 return self.context
206     get_context = ContextId()
207
208     @classmethod
209     def find_api_dir(cls):
210         """Attempt to find the best directory in which API definition
211         files may reside. If the value VPP_API_DIR exists in the environment
212         then it is first on the search list. If we're inside a recognized
213         location in a VPP source tree (src/scripts and src/vpp-api/python)
214         then entries from there to the likely locations in build-root are
215         added. Finally the location used by system packages is added.
216
217         :returns: A single directory name, or None if no such directory
218             could be found.
219         """
220         dirs = []
221
222         if 'VPP_API_DIR' in os.environ:
223             dirs.append(os.environ['VPP_API_DIR'])
224
225         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
226         # in which case, plot a course to likely places in the src tree
227         import __main__ as main
228         if hasattr(main, '__file__'):
229             # get the path of the calling script
230             localdir = os.path.dirname(os.path.realpath(main.__file__))
231         else:
232             # use cwd if there is no calling script
233             localdir = os.getcwd()
234         localdir_s = localdir.split(os.path.sep)
235
236         def dmatch(dir):
237             """Match dir against right-hand components of the script dir"""
238             d = dir.split('/')  # param 'dir' assumes a / separator
239             length = len(d)
240             return len(localdir_s) > length and localdir_s[-length:] == d
241
242         def sdir(srcdir, variant):
243             """Build a path from srcdir to the staged API files of
244             'variant'  (typically '' or '_debug')"""
245             # Since 'core' and 'plugin' files are staged
246             # in separate directories, we target the parent dir.
247             return os.path.sep.join((
248                 srcdir,
249                 'build-root',
250                 'install-vpp%s-native' % variant,
251                 'vpp',
252                 'share',
253                 'vpp',
254                 'api',
255             ))
256
257         srcdir = None
258         if dmatch('src/scripts'):
259             srcdir = os.path.sep.join(localdir_s[:-2])
260         elif dmatch('src/vpp-api/python'):
261             srcdir = os.path.sep.join(localdir_s[:-3])
262         elif dmatch('test'):
263             # we're apparently running tests
264             srcdir = os.path.sep.join(localdir_s[:-1])
265
266         if srcdir:
267             # we're in the source tree, try both the debug and release
268             # variants.
269             dirs.append(sdir(srcdir, '_debug'))
270             dirs.append(sdir(srcdir, ''))
271
272         # Test for staged copies of the scripts
273         # For these, since we explicitly know if we're running a debug versus
274         # release variant, target only the relevant directory
275         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
276             srcdir = os.path.sep.join(localdir_s[:-4])
277             dirs.append(sdir(srcdir, '_debug'))
278         if dmatch('build-root/install-vpp-native/vpp/bin'):
279             srcdir = os.path.sep.join(localdir_s[:-4])
280             dirs.append(sdir(srcdir, ''))
281
282         # finally, try the location system packages typically install into
283         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
284
285         # check the directories for existance; first one wins
286         for dir in dirs:
287             if os.path.isdir(dir):
288                 return dir
289
290         return None
291
292     @classmethod
293     def find_api_files(cls, api_dir=None, patterns='*'):
294         """Find API definition files from the given directory tree with the
295         given pattern. If no directory is given then find_api_dir() is used
296         to locate one. If no pattern is given then all definition files found
297         in the directory tree are used.
298
299         :param api_dir: A directory tree in which to locate API definition
300             files; subdirectories are descended into.
301             If this is None then find_api_dir() is called to discover it.
302         :param patterns: A list of patterns to use in each visited directory
303             when looking for files.
304             This can be a list/tuple object or a comma-separated string of
305             patterns. Each value in the list will have leading/trialing
306             whitespace stripped.
307             The pattern specifies the first part of the filename, '.api.json'
308             is appended.
309             The results are de-duplicated, thus overlapping patterns are fine.
310             If this is None it defaults to '*' meaning "all API files".
311         :returns: A list of file paths for the API files found.
312         """
313         if api_dir is None:
314             api_dir = cls.find_api_dir()
315             if api_dir is None:
316                 raise RuntimeError("api_dir cannot be located")
317
318         if isinstance(patterns, list) or isinstance(patterns, tuple):
319             patterns = [p.strip() + '.api.json' for p in patterns]
320         else:
321             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
322
323         api_files = []
324         for root, dirnames, files in os.walk(api_dir):
325             # iterate all given patterns and de-dup the result
326             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
327             for filename in files:
328                 api_files.append(os.path.join(root, filename))
329
330         return api_files
331
332     def status(self):
333         """Debug function: report current VPP API status to stdout."""
334         print('Connected') if self.connected else print('Not Connected')
335         print('Read API definitions from', ', '.join(self.apifiles))
336
337     def __struct(self, t, n=None, e=-1, vl=None):
338         """Create a packing structure for a message."""
339         base_types = {'u8': 'B',
340                       'u16': 'H',
341                       'u32': 'I',
342                       'i32': 'i',
343                       'u64': 'Q',
344                       'f64': 'd', }
345
346         if t in base_types:
347             if not vl:
348                 if e > 0 and t == 'u8':
349                     # Fixed byte array
350                     s = struct.Struct('>' + str(e) + 's')
351                     return s.size, s
352                 if e > 0:
353                     # Fixed array of base type
354                     s = struct.Struct('>' + base_types[t])
355                     return s.size, [e, s]
356                 elif e == 0:
357                     # Old style variable array
358                     s = struct.Struct('>' + base_types[t])
359                     return s.size, [-1, s]
360             else:
361                 # Variable length array
362                 if t == 'u8':
363                     s = struct.Struct('>s')
364                     return s.size, [vl, s]
365                 else:
366                     s = struct.Struct('>' + base_types[t])
367                 return s.size, [vl, s]
368
369             s = struct.Struct('>' + base_types[t])
370             return s.size, s
371
372         if t in self.messages:
373             size = self.messages[t]['sizes'][0]
374
375             # Return a list in case of array
376             if e > 0 and not vl:
377                 return size, [e, lambda self, encode, buf, offset, args: (
378                     self.__struct_type(encode, self.messages[t], buf, offset,
379                                        args))]
380             if vl:
381                 return size, [vl, lambda self, encode, buf, offset, args: (
382                     self.__struct_type(encode, self.messages[t], buf, offset,
383                                        args))]
384             elif e == 0:
385                 # Old style VLA
386                 raise NotImplementedError(1,
387                                           'No support for compound types ' + t)
388             return size, lambda self, encode, buf, offset, args: (
389                 self.__struct_type(encode, self.messages[t], buf, offset, args)
390             )
391
392         raise ValueError(1, 'Invalid message type: ' + t)
393
394     def __struct_type(self, encode, msgdef, buf, offset, kwargs):
395         """Get a message packer or unpacker."""
396         if encode:
397             return self.__struct_type_encode(msgdef, buf, offset, kwargs)
398         else:
399             return self.__struct_type_decode(msgdef, buf, offset)
400
401     def __struct_type_encode(self, msgdef, buf, offset, kwargs):
402         off = offset
403         size = 0
404
405         for k in kwargs:
406             if k not in msgdef['args']:
407                 raise ValueError(1, 'Non existing argument [' + k + ']' +
408                                     ' used in call to: ' +
409                                  self.id_names[kwargs['_vl_msg_id']] + '()')
410
411         for k, v in vpp_iterator(msgdef['args']):
412             off += size
413             if k in kwargs:
414                 if type(v) is list:
415                     if callable(v[1]):
416                         e = kwargs[v[0]] if v[0] in kwargs else v[0]
417                         if e != len(kwargs[k]):
418                             raise (ValueError(1,
419                                               'Input list length mismatch: '
420                                               '%s (%s != %s)' %
421                                               (k, e, len(kwargs[k]))))
422                         size = 0
423                         for i in range(e):
424                             size += v[1](self, True, buf, off + size,
425                                          kwargs[k][i])
426                     else:
427                         if v[0] in kwargs:
428                             kwargslen = kwargs[v[0]]
429                             if kwargslen != len(kwargs[k]):
430                                 raise ValueError(1,
431                                                  'Input list length mismatch:'
432                                                  ' %s (%s != %s)' %
433                                                  (k, kwargslen,
434                                                   len(kwargs[k])))
435                         else:
436                             kwargslen = len(kwargs[k])
437                         if v[1].size == 1:
438                             buf[off:off + kwargslen] = bytearray(kwargs[k])
439                             size = kwargslen
440                         else:
441                             size = 0
442                             for i in kwargs[k]:
443                                 v[1].pack_into(buf, off + size, i)
444                                 size += v[1].size
445                 else:
446                     if callable(v):
447                         size = v(self, True, buf, off, kwargs[k])
448                     else:
449                         if type(kwargs[k]) is str and v.size < len(kwargs[k]):
450                             raise ValueError(1,
451                                              'Input list length mismatch: '
452                                              '%s (%s < %s)' %
453                                              (k, v.size, len(kwargs[k])))
454                         v.pack_into(buf, off, kwargs[k])
455                         size = v.size
456             else:
457                 size = v.size if not type(v) is list else 0
458
459         return off + size - offset
460
461     def __getitem__(self, name):
462         if name in self.messages:
463             return self.messages[name]
464         return None
465
466     def get_size(self, sizes, kwargs):
467         total_size = sizes[0]
468         for e in sizes[1]:
469             if e in kwargs and type(kwargs[e]) is list:
470                 total_size += len(kwargs[e]) * sizes[1][e]
471         return total_size
472
473     def encode(self, msgdef, kwargs):
474         # Make suitably large buffer
475         size = self.get_size(msgdef['sizes'], kwargs)
476         buf = bytearray(size)
477         offset = 0
478         size = self.__struct_type(True, msgdef, buf, offset, kwargs)
479         return buf[:offset + size]
480
481     def decode(self, msgdef, buf):
482         return self.__struct_type(False, msgdef, buf, 0, None)[1]
483
484     def __struct_type_decode(self, msgdef, buf, offset):
485         res = []
486         off = offset
487         size = 0
488         for k, v in vpp_iterator(msgdef['args']):
489             off += size
490             if type(v) is list:
491                 lst = []
492                 if callable(v[1]):  # compound type
493                     size = 0
494                     if v[0] in msgdef['args']:  # vla
495                         e = res[v[2]]
496                     else:  # fixed array
497                         e = v[0]
498                     res.append(lst)
499                     for i in range(e):
500                         (s, l) = v[1](self, False, buf, off + size, None)
501                         lst.append(l)
502                         size += s
503                     continue
504                 if v[1].size == 1:
505                     if type(v[0]) is int:
506                         size = len(buf) - off
507                     else:
508                         size = res[v[2]]
509                     res.append(buf[off:off + size])
510                 else:
511                     e = v[0] if type(v[0]) is int else res[v[2]]
512                     if e == -1:
513                         e = (len(buf) - off) / v[1].size
514                     lst = []
515                     res.append(lst)
516                     size = 0
517                     for i in range(e):
518                         lst.append(v[1].unpack_from(buf, off + size)[0])
519                         size += v[1].size
520             else:
521                 if callable(v):
522                     size = 0
523                     (s, l) = v(self, False, buf, off, None)
524                     res.append(l)
525                     size += s
526                 else:
527                     res.append(v.unpack_from(buf, off)[0])
528                     size = v.size
529
530         return off + size - offset, msgdef['return_tuple']._make(res)
531
532     def ret_tup(self, name):
533         if name in self.messages and 'return_tuple' in self.messages[name]:
534             return self.messages[name]['return_tuple']
535         return None
536
537     def duplicate_check_ok(self, name, msgdef):
538         crc = None
539         for c in msgdef:
540             if type(c) is dict and 'crc' in c:
541                 crc = c['crc']
542                 break
543         if crc:
544             # We can get duplicates because of imports
545             if crc == self.messages[name]['crc']:
546                 return True
547         return False
548
549     def add_message(self, name, msgdef, typeonly=False):
550         if name in self.messages:
551             if typeonly:
552                 if self.duplicate_check_ok(name, msgdef):
553                     return
554             raise ValueError('Duplicate message name: ' + name)
555
556         args = collections.OrderedDict()
557         argtypes = collections.OrderedDict()
558         fields = []
559         msg = {}
560         total_size = 0
561         sizes = {}
562         for i, f in enumerate(msgdef):
563             if type(f) is dict and 'crc' in f:
564                 msg['crc'] = f['crc']
565                 continue
566             field_type = f[0]
567             field_name = f[1]
568             if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
569                 raise ValueError('Variable Length Array must be last: ' + name)
570             size, s = self.__struct(*f)
571             args[field_name] = s
572             if type(s) == list and type(s[0]) == int and \
573                type(s[1]) == struct.Struct:
574                 if s[0] < 0:
575                     sizes[field_name] = size
576                 else:
577                     sizes[field_name] = size
578                     total_size += s[0] * size
579             else:
580                 sizes[field_name] = size
581                 total_size += size
582
583             argtypes[field_name] = field_type
584             if len(f) == 4:  # Find offset to # elements field
585                 idx = list(args.keys()).index(f[3]) - i
586                 args[field_name].append(idx)
587             fields.append(field_name)
588         msg['return_tuple'] = collections.namedtuple(name, fields,
589                                                      rename=True)
590         self.messages[name] = msg
591         self.messages[name]['args'] = args
592         self.messages[name]['argtypes'] = argtypes
593         self.messages[name]['typeonly'] = typeonly
594         self.messages[name]['sizes'] = [total_size, sizes]
595         return self.messages[name]
596
597     def add_type(self, name, typedef):
598         return self.add_message('vl_api_' + name + '_t', typedef,
599                                 typeonly=True)
600
601     def make_function(self, name, i, msgdef, multipart, async):
602         if (async):
603             def f(**kwargs):
604                 return self._call_vpp_async(i, msgdef, **kwargs)
605         else:
606             def f(**kwargs):
607                 return self._call_vpp(i, msgdef, multipart, **kwargs)
608         args = self.messages[name]['args']
609         argtypes = self.messages[name]['argtypes']
610         f.__name__ = str(name)
611         f.__doc__ = ", ".join(["%s %s" %
612                                (argtypes[k], k) for k in args.keys()])
613         return f
614
615     @property
616     def api(self):
617         if not hasattr(self, "_api"):
618             raise Exception("Not connected, api definitions not available")
619         return self._api
620
621     def _register_functions(self, async=False):
622         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
623         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
624         self._api = Empty()
625         for name, msgdef in vpp_iterator(self.messages):
626             if self.messages[name]['typeonly']:
627                 continue
628             crc = self.messages[name]['crc']
629             n = name + '_' + crc[2:]
630             i = vpp_api.vac_get_msg_index(n.encode())
631             if i > 0:
632                 self.id_msgdef[i] = msgdef
633                 self.id_names[i] = name
634                 multipart = True if name.find('_dump') > 0 else False
635                 f = self.make_function(name, i, msgdef, multipart, async)
636                 setattr(self._api, name, FuncWrapper(f))
637             else:
638                 self.logger.debug(
639                     'No such message type or failed CRC checksum: %s', n)
640
641     def _write_new_cffi(self, buf):
642         """Send a binary-packed message to VPP."""
643         if not self.connected:
644             raise IOError(1, 'Not connected')
645         return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
646
647     def _write_legacy_cffi(self, buf):
648         """Send a binary-packed message to VPP."""
649         if not self.connected:
650             raise IOError(1, 'Not connected')
651         return vpp_api.vac_write(bytes(buf), len(buf))
652
653     def _read(self):
654         if not self.connected:
655             raise IOError(1, 'Not connected')
656         mem = ffi.new("char **")
657         size = ffi.new("int *")
658         rv = vpp_api.vac_read(mem, size, self.read_timeout)
659         if rv:
660             raise IOError(rv, 'vac_read failed')
661         msg = bytes(ffi.buffer(mem[0], size[0]))
662         vpp_api.vac_free(mem[0])
663         return msg
664
665     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
666                          async):
667         pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
668         rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
669         if rv != 0:
670             raise IOError(2, 'Connect failed')
671         self.connected = True
672
673         self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
674         self._register_functions(async=async)
675
676         # Initialise control ping
677         crc = self.messages['control_ping']['crc']
678         self.control_ping_index = vpp_api.vac_get_msg_index(
679             ('control_ping' + '_' + crc[2:]).encode())
680         self.control_ping_msgdef = self.messages['control_ping']
681         if self.async_thread:
682             self.event_thread = threading.Thread(
683                 target=self.thread_msg_handler)
684             self.event_thread.daemon = True
685             self.event_thread.start()
686         return rv
687
688     def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
689         """Attach to VPP.
690
691         name - the name of the client.
692         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
693         async - if true, messages are sent without waiting for a reply
694         rx_qlen - the length of the VPP message receive queue between
695         client and server.
696         """
697         msg_handler = vac_callback_sync if not async else vac_callback_async
698         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
699                                      async)
700
701     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
702         """Attach to VPP in synchronous mode. Application must poll for events.
703
704         name - the name of the client.
705         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
706         rx_qlen - the length of the VPP message receive queue between
707         client and server.
708         """
709
710         return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
711                                      async=False)
712
713     def disconnect(self):
714         """Detach from VPP."""
715         rv = vpp_api.vac_disconnect()
716         self.connected = False
717         self.message_queue.put("terminate event thread")
718         return rv
719
720     def msg_handler_sync(self, msg):
721         """Process an incoming message from VPP in sync mode.
722
723         The message may be a reply or it may be an async notification.
724         """
725         r = self.decode_incoming_msg(msg)
726         if r is None:
727             return
728
729         # If we have a context, then use the context to find any
730         # request waiting for a reply
731         context = 0
732         if hasattr(r, 'context') and r.context > 0:
733             context = r.context
734
735         if context == 0:
736             # No context -> async notification that we feed to the callback
737             self.message_queue.put_nowait(r)
738         else:
739             raise IOError(2, 'RPC reply message received in event handler')
740
741     def decode_incoming_msg(self, msg):
742         if not msg:
743             self.logger.warning('vpp_api.read failed')
744             return
745
746         i, ci = self.header.unpack_from(msg, 0)
747         if self.id_names[i] == 'rx_thread_exit':
748             return
749
750         #
751         # Decode message and returns a tuple.
752         #
753         msgdef = self.id_msgdef[i]
754         if not msgdef:
755             raise IOError(2, 'Reply message undefined')
756
757         r = self.decode(msgdef, msg)
758
759         return r
760
761     def msg_handler_async(self, msg):
762         """Process a message from VPP in async mode.
763
764         In async mode, all messages are returned to the callback.
765         """
766         r = self.decode_incoming_msg(msg)
767         if r is None:
768             return
769
770         msgname = type(r).__name__
771
772         if self.event_callback:
773             self.event_callback(msgname, r)
774
775     def _control_ping(self, context):
776         """Send a ping command."""
777         self._call_vpp_async(self.control_ping_index,
778                              self.control_ping_msgdef,
779                              context=context)
780
781     def _call_vpp(self, i, msgdef, multipart, **kwargs):
782         """Given a message, send the message and await a reply.
783
784         msgdef - the message packing definition
785         i - the message type index
786         multipart - True if the message returns multiple
787         messages in return.
788         context - context number - chosen at random if not
789         supplied.
790         The remainder of the kwargs are the arguments to the API call.
791
792         The return value is the message or message array containing
793         the response.  It will raise an IOError exception if there was
794         no response within the timeout window.
795         """
796
797         if 'context' not in kwargs:
798             context = self.get_context()
799             kwargs['context'] = context
800         else:
801             context = kwargs['context']
802         kwargs['_vl_msg_id'] = i
803         b = self.encode(msgdef, kwargs)
804
805         vpp_api.vac_rx_suspend()
806         self._write(b)
807
808         if multipart:
809             # Send a ping after the request - we use its response
810             # to detect that we have seen all results.
811             self._control_ping(context)
812
813         # Block until we get a reply.
814         rl = []
815         while (True):
816             msg = self._read()
817             if not msg:
818                 raise IOError(2, 'VPP API client: read failed')
819
820             r = self.decode_incoming_msg(msg)
821             msgname = type(r).__name__
822             if context not in r or r.context == 0 or context != r.context:
823                 self.message_queue.put_nowait(r)
824                 continue
825
826             if not multipart:
827                 rl = r
828                 break
829             if msgname == 'control_ping_reply':
830                 break
831
832             rl.append(r)
833
834         vpp_api.vac_rx_resume()
835
836         return rl
837
838     def _call_vpp_async(self, i, msgdef, **kwargs):
839         """Given a message, send the message and await a reply.
840
841         msgdef - the message packing definition
842         i - the message type index
843         context - context number - chosen at random if not
844         supplied.
845         The remainder of the kwargs are the arguments to the API call.
846         """
847         if 'context' not in kwargs:
848             context = self.get_context()
849             kwargs['context'] = context
850         else:
851             context = kwargs['context']
852         kwargs['_vl_msg_id'] = i
853         b = self.encode(msgdef, kwargs)
854
855         self._write(b)
856
857     def register_event_callback(self, callback):
858         """Register a callback for async messages.
859
860         This will be called for async notifications in sync mode,
861         and all messages in async mode.  In sync mode, replies to
862         requests will not come here.
863
864         callback is a fn(msg_type_name, msg_type) that will be
865         called when a message comes in.  While this function is
866         executing, note that (a) you are in a background thread and
867         may wish to use threading.Lock to protect your datastructures,
868         and (b) message processing from VPP will stop (so if you take
869         a long while about it you may provoke reply timeouts or cause
870         VPP to fill the RX buffer).  Passing None will disable the
871         callback.
872         """
873         self.event_callback = callback
874
875     def thread_msg_handler(self):
876         """Python thread calling the user registerd message handler.
877
878         This is to emulate the old style event callback scheme. Modern
879         clients should provide their own thread to poll the event
880         queue.
881         """
882         while True:
883             r = self.message_queue.get()
884             if r == "terminate event thread":
885                 break
886             msgname = type(r).__name__
887             if self.event_callback:
888                 self.event_callback(msgname, r)
889
890
891 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4