import sys, os, logging, collections, struct, json, threading, glob
import atexit, Queue
-logging.basicConfig(level=logging.DEBUG)
-import vpp_api
-
-def eprint(*args, **kwargs):
- """Print critical diagnostics to stderr."""
- print(*args, file=sys.stderr, **kwargs)
+from cffi import FFI
+ffi = FFI()
+ffi.cdef("""
+typedef void (*pneum_callback_t)(unsigned char * data, int len);
+typedef void (*pneum_error_callback_t)(void *, unsigned char *, int);
+int pneum_connect(char * name, char * chroot_prefix, pneum_callback_t cb,
+ int rx_qlen);
+int pneum_disconnect(void);
+int pneum_read(char **data, int *l, unsigned short timeout);
+int pneum_write(char *data, int len);
+void pneum_free(void * msg);
+
+int pneum_get_msg_index(unsigned char * name);
+int pneum_msg_table_size(void);
+int pneum_msg_table_max_index(void);
+
+void pneum_rx_suspend (void);
+void pneum_rx_resume (void);
+void pneum_set_error_handler(pneum_error_callback_t);
+ """)
+
+# Barfs on failure, no need to check success.
+vpp_api = ffi.dlopen('libpneum.so')
def vpp_atexit(self):
"""Clean up VPP connection on shutdown."""
if self.connected:
- eprint ('Cleaning up VPP on exit')
+ self.logger.debug('Cleaning up VPP on exit')
self.disconnect()
+vpp_object = None
+
+@ffi.callback("void(unsigned char *, int)")
+def pneum_callback_sync(data, len):
+ vpp_object.msg_handler_sync(ffi.buffer(data, len))
+@ffi.callback("void(unsigned char *, int)")
+def pneum_callback_async(data, len):
+ vpp_object.msg_handler_async(ffi.buffer(data, len))
+@ffi.callback("void(void *, unsigned char *, int)")
+def pneum_error_handler(arg, msg, msg_len):
+ vpp_object.logger.warning("PNEUM: %s", ffi.string(msg, msg_len))
class Empty(object):
pass
provides a means to register a callback function to receive
these messages in a background thread.
"""
- def __init__(self, apifiles = None, testmode = False, async_thread = True):
+ def __init__(self, apifiles = None, testmode = False, async_thread = True,
+ logger = logging.getLogger('vpp_papi'), loglevel = 'debug'):
"""Create a VPP API object.
apifiles is a list of files containing API
provided this will load the API files from VPP's
default install location.
"""
+ global vpp_object
+ vpp_object = self
+ self.logger = logger
+ logging.basicConfig(level=getattr(logging, loglevel.upper()))
+
self.messages = {}
self.id_names = []
self.id_msgdef = []
# Make sure we allow VPP to clean up the message rings.
atexit.register(vpp_atexit, self)
+ # Register error handler
+ vpp_api.pneum_set_error_handler(pneum_error_handler)
+
class ContextId(object):
"""Thread-safe provider of unique context IDs."""
def __init__(self):
return self.messages[name]['return_tuple']
return None
- def add_message(self, name, msgdef):
+ def add_message(self, name, msgdef, typeonly = False):
if name in self.messages:
raise ValueError('Duplicate message name: ' + name)
self.messages[name] = msg
self.messages[name]['args'] = args
self.messages[name]['argtypes'] = argtypes
+ self.messages[name]['typeonly'] = typeonly
return self.messages[name]
def add_type(self, name, typedef):
- return self.add_message('vl_api_' + name + '_t', typedef)
+ return self.add_message('vl_api_' + name + '_t', typedef, typeonly=True)
def make_function(self, name, i, msgdef, multipart, async):
if (async):
self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
self._api = Empty()
for name, msgdef in self.messages.iteritems():
- if name in self.vpp_dictionary:
- if self.messages[name]['crc'] != self.vpp_dictionary[name]['crc']:
- raise ValueError(3, 'Failed CRC checksum ' + name +
- ' ' + self.messages[name]['crc'] +
- ' ' + self.vpp_dictionary[name]['crc'])
- i = self.vpp_dictionary[name]['id']
+ if self.messages[name]['typeonly']: continue
+ crc = self.messages[name]['crc']
+ n = name + '_' + crc[2:]
+ i = vpp_api.pneum_get_msg_index(bytes(n))
+ if i > 0:
self.id_msgdef[i] = msgdef
self.id_names[i] = name
multipart = True if name.find('_dump') > 0 else False
3, "Conflicting name in JSON definition: `%s'" % name)
setattr(self, name, f)
# old API stuff ends here
+ else:
+ self.logger.debug('No such message type or failed CRC checksum: %s', n)
def _write (self, buf):
"""Send a binary-packed message to VPP."""
if not self.connected:
raise IOError(1, 'Not connected')
- return vpp_api.write(str(buf))
+ return vpp_api.pneum_write(str(buf), len(buf))
def _read (self):
if not self.connected:
raise IOError(1, 'Not connected')
-
- return vpp_api.read(self.read_timeout)
-
- def _load_dictionary(self):
- self.vpp_dictionary = {}
- self.vpp_dictionary_maxid = 0
- d = vpp_api.msg_table()
-
- if not d:
- raise IOError(3, 'Cannot get VPP API dictionary')
- for i,n in d:
- name, crc = n.rsplit('_', 1)
- crc = '0x' + crc
- self.vpp_dictionary[name] = { 'id' : i, 'crc' : crc }
- self.vpp_dictionary_maxid = max(self.vpp_dictionary_maxid, i)
+ mem = ffi.new("char **")
+ size = ffi.new("int *")
+ rv = vpp_api.pneum_read(mem, size, self.read_timeout)
+ if rv:
+ raise IOError(rv, 'pneum_read filed')
+ msg = bytes(ffi.buffer(mem[0], size[0]))
+ vpp_api.pneum_free(mem[0])
+ return msg
def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, async):
- rv = vpp_api.connect(name, msg_handler, chroot_prefix, rx_qlen)
+ rv = vpp_api.pneum_connect(name, chroot_prefix, msg_handler, rx_qlen)
if rv != 0:
raise IOError(2, 'Connect failed')
self.connected = True
- self._load_dictionary()
+ self.vpp_dictionary_maxid = vpp_api.pneum_msg_table_max_index()
self._register_functions(async=async)
# Initialise control ping
- self.control_ping_index = self.vpp_dictionary['control_ping']['id']
+ crc = self.messages['control_ping']['crc']
+ self.control_ping_index = \
+ vpp_api.pneum_get_msg_index(
+ bytes('control_ping' + '_' + crc[2:]))
self.control_ping_msgdef = self.messages['control_ping']
- def connect(self, name, chroot_prefix = None, async = False, rx_qlen = 32):
+ def connect(self, name, chroot_prefix = ffi.NULL,
+ async = False, rx_qlen = 32):
"""Attach to VPP.
name - the name of the client.
rx_qlen - the length of the VPP message receive queue between
client and server.
"""
- msg_handler = self.msg_handler_sync if not async \
- else self.msg_handler_async
+ msg_handler = pneum_callback_sync if not async \
+ else pneum_callback_async
return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
async)
- def connect_sync (self, name, chroot_prefix = None, rx_qlen = 32):
+ def connect_sync (self, name, chroot_prefix = ffi.NULL, rx_qlen = 32):
"""Attach to VPP in synchronous mode. Application must poll for events.
name - the name of the client.
client and server.
"""
- return self.connect_internal(name, None, chroot_prefix, rx_qlen,
+ return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
async=False)
def disconnect(self):
"""Detach from VPP."""
- rv = vpp_api.disconnect()
+ rv = vpp_api.pneum_disconnect()
self.connected = False
return rv
def decode_incoming_msg(self, msg):
if not msg:
- eprint('vpp_api.read failed')
+ self.logger.warning('vpp_api.read failed')
return
i, ci = self.header.unpack_from(msg, 0)
kwargs['_vl_msg_id'] = i
b = self.encode(msgdef, kwargs)
- vpp_api.suspend()
+ vpp_api.pneum_rx_suspend()
self._write(b)
if multipart:
rl.append(r)
- vpp_api.resume()
+ vpp_api.pneum_rx_resume()
return rl