X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvpp-api%2Fpython%2Fvpp_papi%2Fvpp_papi.py;h=9c4ede90d48d4ae056f8d6f79e397b23f88af42a;hb=8006c6a;hp=5ff8064d4255aa773011f8b35574c85da72536b2;hpb=a5ee900fb75201bbfceaf13c8bc57a13ed094988;p=vpp.git diff --git a/src/vpp-api/python/vpp_papi/vpp_papi.py b/src/vpp-api/python/vpp_papi/vpp_papi.py index 5ff8064d425..9c4ede90d48 100644 --- a/src/vpp-api/python/vpp_papi/vpp_papi.py +++ b/src/vpp-api/python/vpp_papi/vpp_papi.py @@ -15,6 +15,7 @@ # from __future__ import print_function +from __future__ import absolute_import import sys import os import logging @@ -25,73 +26,61 @@ import threading import fnmatch import weakref import atexit -from cffi import FFI -import cffi -from vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes +from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes +from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias +from . macaddress import MACAddress, mac_pton, mac_ntop + +logger = logging.getLogger(__name__) if sys.version[0] == '2': import Queue as queue else: import queue as queue -ffi = FFI() -ffi.cdef(""" -typedef void (*vac_callback_t)(unsigned char * data, int len); -typedef void (*vac_error_callback_t)(void *, unsigned char *, int); -int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb, - int rx_qlen); -int vac_disconnect(void); -int vac_read(char **data, int *l, unsigned short timeout); -int vac_write(char *data, int len); -void vac_free(void * msg); -int vac_get_msg_index(unsigned char * name); -int vac_msg_table_size(void); -int vac_msg_table_max_index(void); +class VppEnumType(type): + def __getattr__(cls, name): + t = vpp_get_type(name) + return t.enum -void vac_rx_suspend (void); -void vac_rx_resume (void); -void vac_set_error_handler(vac_error_callback_t); - """) -# Barfs on failure, no need to check success. -vpp_api = ffi.dlopen('libvppapiclient.so') +# Python3 +# class VppEnum(metaclass=VppEnumType): +# pass +class VppEnum(object): + __metaclass__ = VppEnumType def vpp_atexit(vpp_weakref): """Clean up VPP connection on shutdown.""" vpp_instance = vpp_weakref() - if vpp_instance.connected: + if vpp_instance and vpp_instance.transport.connected: vpp_instance.logger.debug('Cleaning up VPP on exit') vpp_instance.disconnect() -vpp_object = None - - -def vpp_iterator(d): - if sys.version[0] == '2': +if sys.version[0] == '2': + def vpp_iterator(d): return d.iteritems() - else: +else: + def vpp_iterator(d): return d.items() -@ffi.callback("void(unsigned char *, int)") -def vac_callback_sync(data, len): - vpp_object.msg_handler_sync(ffi.buffer(data, len)) - +def call_logger(msgdef, kwargs): + s = 'Calling {}('.format(msgdef.name) + for k, v in kwargs.items(): + s += '{}:{} '.format(k, v) + s += ')' + return s -@ffi.callback("void(unsigned char *, int)") -def vac_callback_async(data, len): - vpp_object.msg_handler_async(ffi.buffer(data, len)) +def return_logger(r): + s = 'Return from {}'.format(r) + return s -@ffi.callback("void(void *, unsigned char *, int)") -def vac_error_handler(arg, msg, msg_len): - vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len)) - -class Empty(object): +class VppApiDynamicMethodHolder(object): pass @@ -104,10 +93,27 @@ class FuncWrapper(object): return self._func(**kwargs) -class VPPMessage(VPPType): +class VPPApiError(Exception): + pass + + +class VPPNotImplementedError(NotImplementedError): + pass + + +class VPPIOError(IOError): + pass + + +class VPPRuntimeError(RuntimeError): + pass + + +class VPPValueError(ValueError): pass -class VPP(): + +class VPP(object): """VPP interface. This class provides the APIs to VPP. The APIs are loaded @@ -119,6 +125,11 @@ class VPP(): provides a means to register a callback function to receive these messages in a background thread. """ + VPPApiError = VPPApiError + VPPRuntimeError = VPPRuntimeError + VPPValueError = VPPValueError + VPPNotImplementedError = VPPNotImplementedError + VPPIOError = VPPIOError def process_json_file(self, apidef_file): api = json.load(apidef_file) @@ -132,44 +143,54 @@ class VPP(): for t in api['types']: t[0] = 'vl_api_' + t[0] + '_t' types[t[0]] = {'type': 'type', 'data': t} + for t, v in api['aliases'].items(): + types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v} + self.services.update(api['services']) i = 0 while True: unresolved = {} for k, v in types.items(): t = v['data'] - if v['type'] == 'enum': - try: - VPPEnumType(t[0], t[1:]) - except ValueError: - unresolved[k] = v - elif v['type'] == 'union': - try: - VPPUnionType(t[0], t[1:]) - except ValueError: - unresolved[k] = v - elif v['type'] == 'type': - try: - VPPType(t[0], t[1:]) - except ValueError: - unresolved[k] = v + if not vpp_get_type(k): + if v['type'] == 'enum': + try: + VPPEnumType(t[0], t[1:]) + except ValueError: + unresolved[k] = v + elif v['type'] == 'union': + try: + VPPUnionType(t[0], t[1:]) + except ValueError: + unresolved[k] = v + elif v['type'] == 'type': + try: + VPPType(t[0], t[1:]) + except ValueError: + unresolved[k] = v + elif v['type'] == 'alias': + try: + VPPTypeAlias(k, t) + except ValueError: + unresolved[k] = v if len(unresolved) == 0: break if i > 3: - raise ValueError('Unresolved type definitions {}' - .format(unresolved)) + raise VPPValueError('Unresolved type definitions {}' + .format(unresolved)) types = unresolved i += 1 for m in api['messages']: try: self.messages[m[0]] = VPPMessage(m[0], m[1:]) - except NotImplementedError: + except VPPNotImplementedError: self.logger.error('Not implemented error for {}'.format(m[0])) def __init__(self, apifiles=None, testmode=False, async_thread=True, - logger=logging.getLogger('vpp_papi'), loglevel='debug', - read_timeout=0): + logger=None, loglevel=None, + read_timeout=5, use_socket=False, + server_address='/run/vpp-api.sock'): """Create a VPP API object. apifiles is a list of files containing API @@ -182,9 +203,6 @@ class VPP(): loglevel, if supplied, is the log level this logger is set to report at (from the loglevels in the logging module). """ - global vpp_object - vpp_object = self - if logger is None: logger = logging.getLogger(__name__) if loglevel is not None: @@ -192,18 +210,22 @@ class VPP(): self.logger = logger self.messages = {} + self.services = {} self.id_names = [] self.id_msgdef = [] - self.connected = False self.header = VPPType('header', [['u16', 'msgid'], ['u32', 'client_index']]) self.apifiles = [] self.event_callback = None self.message_queue = queue.Queue() self.read_timeout = read_timeout - self.vpp_api = vpp_api self.async_thread = async_thread + if use_socket: + from . vpp_transport_socket import VppTransport + else: + from . vpp_transport_shmem import VppTransport + if not apifiles: # Pick up API definitions from default directory try: @@ -213,7 +235,7 @@ class VPP(): if testmode: apifiles = [] else: - raise + raise VPPRuntimeError for file in apifiles: with open(file) as apidef_file: @@ -223,24 +245,13 @@ class VPP(): # Basic sanity check if len(self.messages) == 0 and not testmode: - raise ValueError(1, 'Missing JSON message definitions') + raise VPPValueError(1, 'Missing JSON message definitions') + self.transport = VppTransport(self, read_timeout=read_timeout, + server_address=server_address) # Make sure we allow VPP to clean up the message rings. atexit.register(vpp_atexit, weakref.ref(self)) - # Register error handler - if not testmode: - vpp_api.vac_set_error_handler(vac_error_handler) - - # Support legacy CFFI - # from_buffer supported from 1.8.0 - (major, minor, patch) = [int(s) for s in - cffi.__version__.split('.', 3)] - if major >= 1 and minor >= 8: - self._write = self._write_new_cffi - else: - self._write = self._write_legacy_cffi - class ContextId(object): """Thread-safe provider of unique context IDs.""" def __init__(self): @@ -254,6 +265,9 @@ class VPP(): return self.context get_context = ContextId() + def get_type(self, name): + return vpp_get_type(name) + @classmethod def find_api_dir(cls): """Attempt to find the best directory in which API definition @@ -362,7 +376,7 @@ class VPP(): if api_dir is None: api_dir = cls.find_api_dir() if api_dir is None: - raise RuntimeError("api_dir cannot be located") + raise VPPApiError("api_dir cannot be located") if isinstance(patterns, list) or isinstance(patterns, tuple): patterns = [p.strip() + '.api.json' for p in patterns] @@ -378,19 +392,14 @@ class VPP(): return api_files - def status(self): - """Debug function: report current VPP API status to stdout.""" - print('Connected') if self.connected else print('Not Connected') - print('Read API definitions from', ', '.join(self.apifiles)) - @property def api(self): if not hasattr(self, "_api"): - raise Exception("Not connected, api definitions not available") + raise VPPApiError("Not connected, api definitions not available") return self._api - def make_function(self, msg, i, multipart, async): - if (async): + def make_function(self, msg, i, multipart, do_async): + if (do_async): def f(**kwargs): return self._call_vpp_async(i, msg, **kwargs) else: @@ -399,64 +408,47 @@ class VPP(): f.__name__ = str(msg.name) f.__doc__ = ", ".join(["%s %s" % - (msg.fieldtypes[j], k) for j, k in enumerate(msg.fields)]) + (msg.fieldtypes[j], k) + for j, k in enumerate(msg.fields)]) return f - def _register_functions(self, async=False): + def _register_functions(self, do_async=False): self.id_names = [None] * (self.vpp_dictionary_maxid + 1) self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1) - self._api = Empty() + self._api = VppApiDynamicMethodHolder() for name, msg in vpp_iterator(self.messages): n = name + '_' + msg.crc[2:] - i = vpp_api.vac_get_msg_index(n.encode()) + i = self.transport.get_msg_index(n.encode()) if i > 0: self.id_msgdef[i] = msg self.id_names[i] = name - # TODO: Fix multipart (use services) - multipart = True if name.find('_dump') > 0 else False - f = self.make_function(msg, i, multipart, async) - setattr(self._api, name, FuncWrapper(f)) + + # Create function for client side messages. + if name in self.services: + if 'stream' in self.services[name] and \ + self.services[name]['stream']: + multipart = True + else: + multipart = False + f = self.make_function(msg, i, multipart, do_async) + setattr(self._api, name, FuncWrapper(f)) else: self.logger.debug( 'No such message type or failed CRC checksum: %s', n) - def _write_new_cffi(self, buf): - """Send a binary-packed message to VPP.""" - if not self.connected: - raise IOError(1, 'Not connected') - return vpp_api.vac_write(ffi.from_buffer(buf), len(buf)) - - def _write_legacy_cffi(self, buf): - """Send a binary-packed message to VPP.""" - if not self.connected: - raise IOError(1, 'Not connected') - return vpp_api.vac_write(bytes(buf), len(buf)) - - def _read(self): - if not self.connected: - raise IOError(1, 'Not connected') - mem = ffi.new("char **") - size = ffi.new("int *") - rv = vpp_api.vac_read(mem, size, self.read_timeout) - if rv: - raise IOError(rv, 'vac_read failed') - msg = bytes(ffi.buffer(mem[0], size[0])) - vpp_api.vac_free(mem[0]) - return msg - def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, - async): - pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL - rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen) + do_async): + pfx = chroot_prefix.encode() if chroot_prefix else None + + rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen) if rv != 0: - raise IOError(2, 'Connect failed') - self.connected = True - self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index() - self._register_functions(async=async) + raise VPPIOError(2, 'Connect failed') + self.vpp_dictionary_maxid = self.transport.msg_table_max_index() + self._register_functions(do_async=do_async) # Initialise control ping crc = self.messages['control_ping'].crc - self.control_ping_index = vpp_api.vac_get_msg_index( + self.control_ping_index = self.transport.get_msg_index( ('control_ping' + '_' + crc[2:]).encode()) self.control_ping_msgdef = self.messages['control_ping'] if self.async_thread: @@ -466,18 +458,18 @@ class VPP(): self.event_thread.start() return rv - def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32): + def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32): """Attach to VPP. name - the name of the client. chroot_prefix - if VPP is chroot'ed, the prefix of the jail - async - if true, messages are sent without waiting for a reply + do_async - if true, messages are sent without waiting for a reply rx_qlen - the length of the VPP message receive queue between client and server. """ - msg_handler = vac_callback_sync if not async else vac_callback_async + msg_handler = self.transport.get_callback(do_async) return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen, - async) + do_async) def connect_sync(self, name, chroot_prefix=None, rx_qlen=32): """Attach to VPP in synchronous mode. Application must poll for events. @@ -488,13 +480,12 @@ class VPP(): client and server. """ - return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen, - async=False) + return self.connect_internal(name, None, chroot_prefix, rx_qlen, + do_async=False) def disconnect(self): """Detach from VPP.""" - rv = vpp_api.vac_disconnect() - self.connected = False + rv = self.transport.disconnect() self.message_queue.put("terminate event thread") return rv @@ -517,14 +508,34 @@ class VPP(): # No context -> async notification that we feed to the callback self.message_queue.put_nowait(r) else: - raise IOError(2, 'RPC reply message received in event handler') + raise VPPIOError(2, 'RPC reply message received in event handler') + + def has_context(self, msg): + if len(msg) < 10: + return False + + header = VPPType('header_with_context', [['u16', 'msgid'], + ['u32', 'client_index'], + ['u32', 'context']]) + + (i, ci, context), size = header.unpack(msg, 0) + if self.id_names[i] == 'rx_thread_exit': + return - def decode_incoming_msg(self, msg): + # + # Decode message and returns a tuple. + # + msgobj = self.id_msgdef[i] + if 'context' in msgobj.field_by_name and context >= 0: + return True + return False + + def decode_incoming_msg(self, msg, no_type_conversion=False): if not msg: self.logger.warning('vpp_api.read failed') return - i, ci = self.header.unpack(msg, 0) + (i, ci), size = self.header.unpack(msg, 0) if self.id_names[i] == 'rx_thread_exit': return @@ -533,10 +544,9 @@ class VPP(): # msgobj = self.id_msgdef[i] if not msgobj: - raise IOError(2, 'Reply message undefined') - - r = msgobj.unpack(msg) + raise VPPIOError(2, 'Reply message undefined') + r, size = msgobj.unpack(msg, ntc=no_type_conversion) return r def msg_handler_async(self, msg): @@ -562,9 +572,10 @@ class VPP(): def validate_args(self, msg, kwargs): d = set(kwargs.keys()) - set(msg.field_by_name.keys()) if d: - raise ValueError('Invalid argument {} to {}'.format(list(d), msg.name)) + raise VPPValueError('Invalid argument {} to {}' + .format(list(d), msg.name)) - def _call_vpp(self, i, msg, multipart, **kwargs): + def _call_vpp(self, i, msgdef, multipart, **kwargs): """Given a message, send the message and await a reply. msgdef - the message packing definition @@ -587,10 +598,21 @@ class VPP(): context = kwargs['context'] kwargs['_vl_msg_id'] = i - self.validate_args(msg, kwargs) - b = msg.pack(kwargs) - vpp_api.vac_rx_suspend() - self._write(b) + no_type_conversion = kwargs.pop('_no_type_conversion', False) + + try: + if self.transport.socket_index: + kwargs['client_index'] = self.transport.socket_index + except AttributeError: + pass + self.validate_args(msgdef, kwargs) + + logging.debug(call_logger(msgdef, kwargs)) + + b = msgdef.pack(kwargs) + self.transport.suspend() + + self.transport.write(b) if multipart: # Send a ping after the request - we use its response @@ -600,12 +622,13 @@ class VPP(): # Block until we get a reply. rl = [] while (True): - msg = self._read() + msg = self.transport.read() if not msg: - raise IOError(2, 'VPP API client: read failed') - r = self.decode_incoming_msg(msg) + raise VPPIOError(2, 'VPP API client: read failed') + r = self.decode_incoming_msg(msg, no_type_conversion) msgname = type(r).__name__ if context not in r or r.context == 0 or context != r.context: + # Message being queued self.message_queue.put_nowait(r) continue @@ -617,8 +640,9 @@ class VPP(): rl.append(r) - vpp_api.vac_rx_resume() + self.transport.resume() + logger.debug(return_logger(rl)) return rl def _call_vpp_async(self, i, msg, **kwargs): @@ -635,11 +659,15 @@ class VPP(): kwargs['context'] = context else: context = kwargs['context'] - kwargs['client_index'] = 0 + try: + if self.transport.socket_index: + kwargs['client_index'] = self.transport.socket_index + except AttributeError: + kwargs['client_index'] = 0 kwargs['_vl_msg_id'] = i b = msg.pack(kwargs) - self._write(b) + self.transport.write(b) def register_event_callback(self, callback): """Register a callback for async messages. @@ -660,7 +688,7 @@ class VPP(): self.event_callback = callback def thread_msg_handler(self): - """Python thread calling the user registerd message handler. + """Python thread calling the user registered message handler. This is to emulate the old style event callback scheme. Modern clients should provide their own thread to poll the event