From: Ole Troan Date: Thu, 2 Aug 2018 09:58:12 +0000 (+0200) Subject: PAPI: Use UNIX domain sockets instead of shared memory X-Git-Tag: v18.10-rc1~51 X-Git-Url: https://gerrit.fd.io/r/gitweb?p=vpp.git;a=commitdiff_plain;h=94495f2a6a68ac2202b7715ce09620f1ba6fe673 PAPI: Use UNIX domain sockets instead of shared memory Adds support for running the API purely across Unix domain sockets. Usage: vpp = VPP(use_socket=True) Change-Id: Iafc1301e03dd3edc3f4d702dd6c0b98d3b50b69e Signed-off-by: Ole Troan --- diff --git a/src/vlibapi/api_common.h b/src/vlibapi/api_common.h index 94320826f3d..497a1e8bd16 100644 --- a/src/vlibapi/api_common.h +++ b/src/vlibapi/api_common.h @@ -64,7 +64,7 @@ typedef struct vl_api_registration_ u32 clib_file_index; /**< Socket only: file index */ i8 *unprocessed_input; /**< Socket only: pending input */ u32 unprocessed_msg_length; /**< Socket only: unprocssed length */ - u8 *output_vector; /**< Socket only: output vecto */ + u8 *output_vector; /**< Socket only: output vector */ int *additional_fds_to_close; /* socket client only */ diff --git a/src/vlibmemory/api.h b/src/vlibmemory/api.h index 2146b16c5b0..d66c4398764 100644 --- a/src/vlibmemory/api.h +++ b/src/vlibmemory/api.h @@ -55,10 +55,11 @@ vl_api_can_send_msg (vl_api_registration_t * rp) always_inline vl_api_registration_t * vl_api_client_index_to_registration (u32 index) { - if (PREDICT_FALSE (socket_main.current_rp != 0)) - return socket_main.current_rp; - - return (vl_mem_api_client_index_to_registration (index)); + vl_api_registration_t *reg = + vl_socket_api_client_index_to_registration (index); + if (reg && reg->registration_type != REGISTRATION_TYPE_FREE) + return reg; + return vl_mem_api_client_index_to_registration (index); } always_inline u32 diff --git a/src/vlibmemory/memclnt.api b/src/vlibmemory/memclnt.api index f88e5bdb7ca..451bc0e5fae 100644 --- a/src/vlibmemory/memclnt.api +++ b/src/vlibmemory/memclnt.api @@ -17,7 +17,7 @@ option version = "2.0.0"; /* - * Define services not following the normal convetions here + * Define services not following the normal conventions here */ service { rpc memclnt_rx_thread_suspend returns null; @@ -145,6 +145,12 @@ manual_print define trace_plugin_msg_ids u16 last_msg_id; }; +typedef message_table_entry +{ + u16 index; + u8 name[64]; +}; + /* * Create a socket client registration. */ @@ -154,23 +160,26 @@ define sockclnt_create { }; define sockclnt_create_reply { + u32 client_index; u32 context; /* opaque value from the create request */ i32 response; /* Non-negative = success */ - u64 handle; /* handle by which vlib knows this client */ u32 index; /* index, used e.g. by API trace replay */ + u16 count; + vl_api_message_table_entry_t message_table[count]; }; /* * Delete a client registration */ define sockclnt_delete { + u32 client_index; + u32 context; u32 index; /* index, used e.g. by API trace replay */ - u64 handle; /* handle by which vlib knows this client */ }; define sockclnt_delete_reply { + u32 context; i32 response; /* Non-negative = success */ - u64 handle; /* in case the client wonders */ }; /* diff --git a/src/vlibmemory/socket_api.c b/src/vlibmemory/socket_api.c index afe02d20536..4787069daa3 100644 --- a/src/vlibmemory/socket_api.c +++ b/src/vlibmemory/socket_api.c @@ -1,6 +1,6 @@ /* *------------------------------------------------------------------ - * socksvr_vlib.c + * socket_api.c * * Copyright (c) 2009 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); @@ -74,6 +74,20 @@ vl_sock_api_dump_clients (vlib_main_t * vm, api_main_t * am) /* *INDENT-ON* */ } +vl_api_registration_t * +vl_socket_api_client_index_to_registration (u32 index) +{ + socket_main_t *sm = &socket_main; + if (pool_is_free_index (sm->registration_pool, ntohl (index))) + { +#if DEBUG > 2 + clib_warning ("Invalid index %d\n", ntohl (index)); +#endif + return 0; + } + return pool_elt_at_index (sm->registration_pool, ntohl (index)); +} + void vl_socket_api_send (vl_api_registration_t * rp, u8 * elem) { @@ -289,26 +303,37 @@ vl_socket_write_ready (clib_file_t * uf) rp = pool_elt_at_index (socket_main.registration_pool, uf->private_data); /* Flush output vector. */ - n = write (uf->file_descriptor, rp->output_vector, - vec_len (rp->output_vector)); - if (n < 0) + size_t total_bytes = vec_len (rp->output_vector); + size_t bytes_to_send, remaining_bytes = total_bytes; + void *p = rp->output_vector; + while (remaining_bytes > 0) { + bytes_to_send = remaining_bytes > 4096 ? 4096 : remaining_bytes; + n = write (uf->file_descriptor, p, bytes_to_send); + if (n < 0) + { + if (errno == EAGAIN) + { + break; + } #if DEBUG > 2 - clib_warning ("write error, close the file...\n"); + clib_warning ("write error, close the file...\n"); #endif - clib_file_del (fm, uf); - vl_socket_free_registration_index (rp - socket_main.registration_pool); - return 0; + clib_file_del (fm, uf); + vl_socket_free_registration_index (rp - + socket_main.registration_pool); + return 0; + } + remaining_bytes -= bytes_to_send; + p += bytes_to_send; } - else if (n > 0) + + vec_delete (rp->output_vector, total_bytes - remaining_bytes, 0); + if (vec_len (rp->output_vector) <= 0 + && (uf->flags & UNIX_FILE_DATA_AVAILABLE_TO_WRITE)) { - vec_delete (rp->output_vector, n, 0); - if (vec_len (rp->output_vector) <= 0 - && (uf->flags & UNIX_FILE_DATA_AVAILABLE_TO_WRITE)) - { - uf->flags &= ~UNIX_FILE_DATA_AVAILABLE_TO_WRITE; - fm->file_update (uf, UNIX_FILE_UPDATE_MODIFY); - } + uf->flags &= ~UNIX_FILE_DATA_AVAILABLE_TO_WRITE; + fm->file_update (uf, UNIX_FILE_UPDATE_MODIFY); } return 0; @@ -379,7 +404,11 @@ vl_api_sockclnt_create_t_handler (vl_api_sockclnt_create_t * mp) { vl_api_registration_t *regp; vl_api_sockclnt_create_reply_t *rp; + api_main_t *am = &api_main; + hash_pair_t *hp; int rv = 0; + u32 nmsg = hash_elts (am->msg_index_by_name_and_crc); + u32 i = 0; regp = socket_main.current_rp; @@ -387,13 +416,22 @@ vl_api_sockclnt_create_t_handler (vl_api_sockclnt_create_t * mp) regp->name = format (0, "%s%c", mp->name, 0); - rp = vl_msg_api_alloc (sizeof (*rp)); + u32 size = sizeof (*rp) + (nmsg * sizeof (vl_api_message_table_entry_t)); + rp = vl_msg_api_alloc (size); rp->_vl_msg_id = htons (VL_API_SOCKCLNT_CREATE_REPLY); - rp->handle = (uword) regp; - rp->index = (uword) regp->vl_api_registration_pool_index; + rp->index = htonl (regp->vl_api_registration_pool_index); rp->context = mp->context; rp->response = htonl (rv); - + rp->count = htons (nmsg); + + /* *INDENT-OFF* */ + hash_foreach_pair (hp, am->msg_index_by_name_and_crc, + ({ + rp->message_table[i].index = htons(hp->value[0]); + strncpy((char *)rp->message_table[i].name, (char *)hp->key, 64-1); + i++; + })); + /* *INDENT-ON* */ vl_api_send_msg (regp, (u8 *) rp); } @@ -406,23 +444,28 @@ vl_api_sockclnt_delete_t_handler (vl_api_sockclnt_delete_t * mp) vl_api_registration_t *regp; vl_api_sockclnt_delete_reply_t *rp; - if (!pool_is_free_index (socket_main.registration_pool, mp->index)) - { - regp = pool_elt_at_index (socket_main.registration_pool, mp->index); + regp = vl_api_client_index_to_registration (mp->client_index); + if (!regp) + return; - rp = vl_msg_api_alloc (sizeof (*rp)); - rp->_vl_msg_id = htons (VL_API_SOCKCLNT_DELETE_REPLY); - rp->handle = mp->handle; - rp->response = htonl (1); + u32 reg_index = ntohl (mp->index); + rp = vl_msg_api_alloc (sizeof (*rp)); + rp->_vl_msg_id = htons (VL_API_SOCKCLNT_DELETE_REPLY); + rp->context = mp->context; + if (!pool_is_free_index (socket_main.registration_pool, reg_index)) + { + rp->response = htonl (1); vl_api_send_msg (regp, (u8 *) rp); vl_api_registration_del_file (regp); - vl_socket_free_registration_index (mp->index); + vl_socket_free_registration_index (reg_index); } else { - clib_warning ("unknown client ID %d", mp->index); + clib_warning ("unknown client ID %d", reg_index); + rp->response = htonl (-1); + vl_api_send_msg (regp, (u8 *) rp); } } diff --git a/src/vlibmemory/socket_api.h b/src/vlibmemory/socket_api.h index 1e99550e64b..00985fe7591 100644 --- a/src/vlibmemory/socket_api.h +++ b/src/vlibmemory/socket_api.h @@ -75,6 +75,8 @@ clib_error_t *vl_sock_api_send_fd_msg (int socket_fd, int fds[], int n_fds); clib_error_t *vl_sock_api_recv_fd_msg (int socket_fd, int fds[], int n_fds, u32 wait); +vl_api_registration_t *vl_socket_api_client_index_to_registration (u32 index); + #endif /* SRC_VLIBMEMORY_SOCKET_API_H_ */ /* diff --git a/src/vlibmemory/vlib_api.c b/src/vlibmemory/vlib_api.c index 65fa34bcc63..15a0ba82f12 100644 --- a/src/vlibmemory/vlib_api.c +++ b/src/vlibmemory/vlib_api.c @@ -165,8 +165,8 @@ vl_api_api_versions_t_handler (vl_api_api_versions_t * mp) vl_api_send_msg (reg, (u8 *) rmp); } -#define foreach_vlib_api_msg \ -_(GET_FIRST_MSG_ID, get_first_msg_id) \ +#define foreach_vlib_api_msg \ +_(GET_FIRST_MSG_ID, get_first_msg_id) \ _(API_VERSIONS, api_versions) /* diff --git a/src/vpp-api/client/client.c b/src/vpp-api/client/client.c index 6c922dbe2e8..68269bb9b55 100644 --- a/src/vpp-api/client/client.c +++ b/src/vpp-api/client/client.c @@ -507,7 +507,7 @@ typedef VL_API_PACKED(struct _vl_api_header { u32 client_index; }) vl_api_header_t; -static unsigned int +static u32 vac_client_index (void) { return (api_main.my_client_index); diff --git a/src/vpp-api/python/vpp_papi/vpp_papi.py b/src/vpp-api/python/vpp_papi/vpp_papi.py index 4f765ecbd18..e1a7059f317 100644 --- a/src/vpp-api/python/vpp_papi/vpp_papi.py +++ b/src/vpp-api/python/vpp_papi/vpp_papi.py @@ -26,8 +26,6 @@ import threading import fnmatch import weakref import atexit -from cffi import FFI -import cffi from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes from . vpp_serializer import VPPMessage @@ -36,41 +34,15 @@ if sys.version[0] == '2': else: import queue as queue -ffi = FFI() -ffi.cdef(""" -typedef void (*vac_callback_t)(unsigned char * data, int len); -typedef void (*vac_error_callback_t)(void *, unsigned char *, int); -int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb, - int rx_qlen); -int vac_disconnect(void); -int vac_read(char **data, int *l, unsigned short timeout); -int vac_write(char *data, int len); -void vac_free(void * msg); - -int vac_get_msg_index(unsigned char * name); -int vac_msg_table_size(void); -int vac_msg_table_max_index(void); - -void vac_rx_suspend (void); -void vac_rx_resume (void); -void vac_set_error_handler(vac_error_callback_t); - """) - -# Barfs on failure, no need to check success. -vpp_api = ffi.dlopen('libvppapiclient.so') - def vpp_atexit(vpp_weakref): """Clean up VPP connection on shutdown.""" vpp_instance = vpp_weakref() - if vpp_instance and vpp_instance.connected: + if vpp_instance and vpp_instance.transport.connected: vpp_instance.logger.debug('Cleaning up VPP on exit') vpp_instance.disconnect() -vpp_object = None - - def vpp_iterator(d): if sys.version[0] == '2': return d.iteritems() @@ -78,21 +50,6 @@ def vpp_iterator(d): return d.items() -@ffi.callback("void(unsigned char *, int)") -def vac_callback_sync(data, len): - vpp_object.msg_handler_sync(ffi.buffer(data, len)) - - -@ffi.callback("void(unsigned char *, int)") -def vac_callback_async(data, len): - vpp_object.msg_handler_async(ffi.buffer(data, len)) - - -@ffi.callback("void(void *, unsigned char *, int)") -def vac_error_handler(arg, msg, msg_len): - vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len)) - - class VppApiDynamicMethodHolder(object): pass @@ -168,7 +125,8 @@ class VPP(): def __init__(self, apifiles=None, testmode=False, async_thread=True, logger=logging.getLogger('vpp_papi'), loglevel='debug', - read_timeout=0): + read_timeout=5, use_socket=False, + server_address='/run/vpp-api.sock'): """Create a VPP API object. apifiles is a list of files containing API @@ -181,9 +139,6 @@ class VPP(): loglevel, if supplied, is the log level this logger is set to report at (from the loglevels in the logging module). """ - global vpp_object - vpp_object = self - if logger is None: logger = logging.getLogger(__name__) if loglevel is not None: @@ -193,16 +148,19 @@ class VPP(): self.messages = {} self.id_names = [] self.id_msgdef = [] - self.connected = False self.header = VPPType('header', [['u16', 'msgid'], ['u32', 'client_index']]) self.apifiles = [] self.event_callback = None self.message_queue = queue.Queue() self.read_timeout = read_timeout - self.vpp_api = vpp_api self.async_thread = async_thread + if use_socket: + from . vpp_transport_socket import VppTransport + else: + from . vpp_transport_shmem import VppTransport + if not apifiles: # Pick up API definitions from default directory try: @@ -224,22 +182,11 @@ class VPP(): if len(self.messages) == 0 and not testmode: raise ValueError(1, 'Missing JSON message definitions') + self.transport = VppTransport(self, read_timeout=read_timeout, + server_address=server_address) # Make sure we allow VPP to clean up the message rings. atexit.register(vpp_atexit, weakref.ref(self)) - # Register error handler - if not testmode: - vpp_api.vac_set_error_handler(vac_error_handler) - - # Support legacy CFFI - # from_buffer supported from 1.8.0 - (major, minor, patch) = [int(s) for s in - cffi.__version__.split('.', 3)] - if major >= 1 and minor >= 8: - self._write = self._write_new_cffi - else: - self._write = self._write_legacy_cffi - class ContextId(object): """Thread-safe provider of unique context IDs.""" def __init__(self): @@ -377,11 +324,6 @@ class VPP(): return api_files - def status(self): - """Debug function: report current VPP API status to stdout.""" - print('Connected') if self.connected else print('Not Connected') - print('Read API definitions from', ', '.join(self.apifiles)) - @property def api(self): if not hasattr(self, "_api"): @@ -408,7 +350,7 @@ class VPP(): self._api = VppApiDynamicMethodHolder() for name, msg in vpp_iterator(self.messages): n = name + '_' + msg.crc[2:] - i = vpp_api.vac_get_msg_index(n.encode()) + i = self.transport.get_msg_index(n.encode()) if i > 0: self.id_msgdef[i] = msg self.id_names[i] = name @@ -420,43 +362,19 @@ class VPP(): self.logger.debug( 'No such message type or failed CRC checksum: %s', n) - def _write_new_cffi(self, buf): - """Send a binary-packed message to VPP.""" - if not self.connected: - raise IOError(1, 'Not connected') - return vpp_api.vac_write(ffi.from_buffer(buf), len(buf)) - - def _write_legacy_cffi(self, buf): - """Send a binary-packed message to VPP.""" - if not self.connected: - raise IOError(1, 'Not connected') - return vpp_api.vac_write(bytes(buf), len(buf)) - - def _read(self): - if not self.connected: - raise IOError(1, 'Not connected') - mem = ffi.new("char **") - size = ffi.new("int *") - rv = vpp_api.vac_read(mem, size, self.read_timeout) - if rv: - raise IOError(rv, 'vac_read failed') - msg = bytes(ffi.buffer(mem[0], size[0])) - vpp_api.vac_free(mem[0]) - return msg - def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, do_async): - pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL - rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen) + pfx = chroot_prefix.encode() if chroot_prefix else None + + rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen) if rv != 0: raise IOError(2, 'Connect failed') - self.connected = True - self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index() + self.vpp_dictionary_maxid = self.transport.msg_table_max_index() self._register_functions(do_async=do_async) # Initialise control ping crc = self.messages['control_ping'].crc - self.control_ping_index = vpp_api.vac_get_msg_index( + self.control_ping_index = self.transport.get_msg_index( ('control_ping' + '_' + crc[2:]).encode()) self.control_ping_msgdef = self.messages['control_ping'] if self.async_thread: @@ -475,7 +393,7 @@ class VPP(): rx_qlen - the length of the VPP message receive queue between client and server. """ - msg_handler = vac_callback_sync if not do_async else vac_callback_async + msg_handler = self.transport.get_callback(do_async) return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen, do_async) @@ -488,13 +406,12 @@ class VPP(): client and server. """ - return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen, + return self.connect_internal(name, None, chroot_prefix, rx_qlen, do_async=False) def disconnect(self): """Detach from VPP.""" - rv = vpp_api.vac_disconnect() - self.connected = False + rv = self.transport.disconnect() self.message_queue.put("terminate event thread") return rv @@ -586,10 +503,16 @@ class VPP(): context = kwargs['context'] kwargs['_vl_msg_id'] = i + try: + if self.transport.socket_index: + kwargs['client_index'] = self.transport.socket_index + except AttributeError: + pass self.validate_args(msg, kwargs) b = msg.pack(kwargs) - vpp_api.vac_rx_suspend() - self._write(b) + self.transport.suspend() + + self.transport.write(b) if multipart: # Send a ping after the request - we use its response @@ -599,12 +522,13 @@ class VPP(): # Block until we get a reply. rl = [] while (True): - msg = self._read() + msg = self.transport.read() if not msg: raise IOError(2, 'VPP API client: read failed') r = self.decode_incoming_msg(msg) msgname = type(r).__name__ if context not in r or r.context == 0 or context != r.context: + # Message being queued self.message_queue.put_nowait(r) continue @@ -616,7 +540,7 @@ class VPP(): rl.append(r) - vpp_api.vac_rx_resume() + self.transport.resume() return rl @@ -634,11 +558,15 @@ class VPP(): kwargs['context'] = context else: context = kwargs['context'] - kwargs['client_index'] = 0 + try: + if self.transport.socket_index: + kwargs['client_index'] = self.transport.socket_index + except AttributeError: + kwargs['client_index'] = 0 kwargs['_vl_msg_id'] = i b = msg.pack(kwargs) - self._write(b) + self.transport.write(b) def register_event_callback(self, callback): """Register a callback for async messages. @@ -659,7 +587,7 @@ class VPP(): self.event_callback = callback def thread_msg_handler(self): - """Python thread calling the user registerd message handler. + """Python thread calling the user registered message handler. This is to emulate the old style event callback scheme. Modern clients should provide their own thread to poll the event diff --git a/src/vpp-api/python/vpp_papi/vpp_serializer.py b/src/vpp-api/python/vpp_papi/vpp_serializer.py index 2177cdbb2e4..103a078cd5b 100644 --- a/src/vpp-api/python/vpp_papi/vpp_serializer.py +++ b/src/vpp-api/python/vpp_papi/vpp_serializer.py @@ -80,7 +80,7 @@ class FixedList_u8(): if len(data[offset:]) < self.num: raise ValueError('Invalid array length for "{}" got {}' ' expected {}' - .format(self.name, len(data), self.num)) + .format(self.name, len(data[offset:]), self.num)) return self.packer.unpack(data, offset) diff --git a/src/vpp-api/python/vpp_papi/vpp_transport_shmem.py b/src/vpp-api/python/vpp_papi/vpp_transport_shmem.py new file mode 100644 index 00000000000..a20295b0f09 --- /dev/null +++ b/src/vpp-api/python/vpp_papi/vpp_transport_shmem.py @@ -0,0 +1,117 @@ +# +# A transport class. With two implementations. +# One for socket and one for shared memory. +# + +from cffi import FFI +import cffi + +ffi = FFI() +ffi.cdef(""" +typedef void (*vac_callback_t)(unsigned char * data, int len); +typedef void (*vac_error_callback_t)(void *, unsigned char *, int); +int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb, + int rx_qlen); +int vac_disconnect(void); +int vac_read(char **data, int *l, unsigned short timeout); +int vac_write(char *data, int len); +void vac_free(void * msg); + +int vac_get_msg_index(unsigned char * name); +int vac_msg_table_size(void); +int vac_msg_table_max_index(void); + +void vac_rx_suspend (void); +void vac_rx_resume (void); +void vac_set_error_handler(vac_error_callback_t); + """) + +vpp_object = None + +# Barfs on failure, no need to check success. +vpp_api = ffi.dlopen('libvppapiclient.so') + + +@ffi.callback("void(unsigned char *, int)") +def vac_callback_sync(data, len): + vpp_object.msg_handler_sync(ffi.buffer(data, len)) + + +@ffi.callback("void(unsigned char *, int)") +def vac_callback_async(data, len): + vpp_object.msg_handler_async(ffi.buffer(data, len)) + + +@ffi.callback("void(void *, unsigned char *, int)") +def vac_error_handler(arg, msg, msg_len): + vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len)) + + +class VppTransport: + def __init__(self, parent, read_timeout, server_address): + self.connected = False + self.read_timeout = read_timeout + self.parent = parent + global vpp_object + vpp_object = parent + + # Register error handler + vpp_api.vac_set_error_handler(vac_error_handler) + + # Support legacy CFFI + # from_buffer supported from 1.8.0 + (major, minor, patch) = [int(s) for s in + cffi.__version__.split('.', 3)] + if major >= 1 and minor >= 8: + self.write = self._write_new_cffi + else: + self.write = self._write_legacy_cffi + + def connect(self, name, pfx, msg_handler, rx_qlen): + self.connected = True + if not pfx: + pfx = ffi.NULL + return vpp_api.vac_connect(name, pfx, msg_handler, rx_qlen) + + def disconnect(self): + self.connected = False + vpp_api.vac_disconnect() + + def suspend(self): + vpp_api.vac_rx_suspend() + + def resume(self): + vpp_api.vac_rx_resume() + + def get_callback(self, async): + return vac_callback_sync if not async else vac_callback_async + + def get_msg_index(self, name): + return vpp_api.vac_get_msg_index(name) + + def msg_table_max_index(self): + return vpp_api.vac_msg_table_max_index() + + def _write_new_cffi(self, buf): + """Send a binary-packed message to VPP.""" + if not self.connected: + raise IOError(1, 'Not connected') + return vpp_api.vac_write(ffi.from_buffer(buf), len(buf)) + + def _write_legacy_cffi(self, buf): + """Send a binary-packed message to VPP.""" + if not self.connected: + raise IOError(1, 'Not connected') + return vpp_api.vac_write(bytes(buf), len(buf)) + + def read(self): + if not self.connected: + raise IOError(1, 'Not connected') + mem = ffi.new("char **") + size = ffi.new("int *") + rv = vpp_api.vac_read(mem, size, self.read_timeout) + if rv: + raise IOError(rv, 'vac_read failed') + msg = bytes(ffi.buffer(mem[0], size[0])) + vpp_api.vac_free(mem[0]) + return msg diff --git a/src/vpp-api/python/vpp_papi/vpp_transport_socket.py b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py new file mode 100644 index 00000000000..1822deb6d07 --- /dev/null +++ b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py @@ -0,0 +1,176 @@ +# +# VPP Unix Domain Socket Transport. +# +import socket +import struct +import threading +import select +import multiprocessing +import logging + + +class VppTransport: + def __init__(self, parent, read_timeout, server_address): + self.connected = False + self.read_timeout = read_timeout if read_timeout > 0 else 1 + self.parent = parent + self.server_address = server_address + self.header = struct.Struct('>QII') + self.message_table = {} + self.sque = multiprocessing.Queue() + self.q = multiprocessing.Queue() + self.message_thread = threading.Thread(target=self.msg_thread_func) + + def msg_thread_func(self): + while True: + try: + rlist, _, _ = select.select([self.socket, + self.sque._reader], [], []) + except socket.error: + # Terminate thread + logging.error('select failed') + self.q.put(None) + return + + for r in rlist: + if r == self.sque._reader: + # Terminate + self.q.put(None) + return + + elif r == self.socket: + try: + msg = self._read() + if not msg: + self.q.put(None) + return + except socket.error: + self.q.put(None) + return + # Put either to local queue or if context == 0 + # callback queue + r = self.parent.decode_incoming_msg(msg) + if hasattr(r, 'context') and r.context > 0: + self.q.put(msg) + else: + self.parent.msg_handler_async(msg) + else: + raise IOError(2, 'Unknown response from select') + + def connect(self, name, pfx, msg_handler, rx_qlen): + + # Create a UDS socket + self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) + self.socket.settimeout(self.read_timeout) + + # Connect the socket to the port where the server is listening + try: + self.socket.connect(self.server_address) + except socket.error as msg: + logging.error(msg) + raise + + self.connected = True + + # Initialise sockclnt_create + sockclnt_create = self.parent.messages['sockclnt_create'] + sockclnt_create_reply = self.parent.messages['sockclnt_create_reply'] + + args = {'_vl_msg_id': 15, + 'name': name, + 'context': 124} + b = sockclnt_create.pack(args) + self.write(b) + msg = self._read() + hdr, length = self.parent.header.unpack(msg, 0) + if hdr.msgid != 16: + raise IOError('Invalid reply message') + + r, length = sockclnt_create_reply.unpack(msg) + self.socket_index = r.index + for m in r.message_table: + n = m.name.rstrip(b'\x00\x13') + self.message_table[n] = m.index + + self.message_thread.daemon = True + self.message_thread.start() + + return 0 + + def disconnect(self): + try: # Might fail, if VPP closes socket before packet makes it out + rv = self.parent.api.sockclnt_delete(index=self.socket_index) + except IOError: + pass + self.connected = False + self.socket.close() + self.sque.put(True) # Terminate listening thread + self.message_thread.join() + + def suspend(self): + pass + + def resume(self): + pass + + def callback(self): + raise NotImplemented + + def get_callback(self, async): + return self.callback + + def get_msg_index(self, name): + try: + return self.message_table[name] + except KeyError: + return 0 + + def msg_table_max_index(self): + return len(self.message_table) + + def write(self, buf): + """Send a binary-packed message to VPP.""" + if not self.connected: + raise IOError(1, 'Not connected') + + # Send header + header = self.header.pack(0, len(buf), 0) + n = self.socket.send(header) + n = self.socket.send(buf) + + def _read(self): + # Header and message + try: + msg = self.socket.recv(4096) + if len(msg) == 0: + return None + except socket.error as message: + logging.error(message) + raise + + (_, l, _) = self.header.unpack(msg[:16]) + + if l > len(msg): + buf = bytearray(l + 16) + view = memoryview(buf) + view[:4096] = msg + view = view[4096:] + # Read rest of message + remaining_bytes = l - 4096 + 16 + while remaining_bytes > 0: + bytes_to_read = (remaining_bytes if remaining_bytes + <= 4096 else 4096) + nbytes = self.socket.recv_into(view, bytes_to_read) + if nbytes == 0: + logging.error('recv failed') + break + view = view[nbytes:] + remaining_bytes -= nbytes + else: + buf = msg + return buf[16:] + + def read(self): + if not self.connected: + raise IOError(1, 'Not connected') + return self.q.get()