Adds support for running the API purely across Unix domain sockets.
Usage: vpp = VPP(use_socket=True)
Change-Id: Iafc1301e03dd3edc3f4d702dd6c0b98d3b50b69e
Signed-off-by: Ole Troan <ot@cisco.com>
u32 clib_file_index; /**< Socket only: file index */
i8 *unprocessed_input; /**< Socket only: pending input */
u32 unprocessed_msg_length; /**< Socket only: unprocssed length */
- u8 *output_vector; /**< Socket only: output vecto */
+ u8 *output_vector; /**< Socket only: output vector */
int *additional_fds_to_close;
/* socket client only */
always_inline vl_api_registration_t *
vl_api_client_index_to_registration (u32 index)
{
- if (PREDICT_FALSE (socket_main.current_rp != 0))
- return socket_main.current_rp;
-
- return (vl_mem_api_client_index_to_registration (index));
+ vl_api_registration_t *reg =
+ vl_socket_api_client_index_to_registration (index);
+ if (reg && reg->registration_type != REGISTRATION_TYPE_FREE)
+ return reg;
+ return vl_mem_api_client_index_to_registration (index);
}
always_inline u32
option version = "2.0.0";
/*
- * Define services not following the normal convetions here
+ * Define services not following the normal conventions here
*/
service {
rpc memclnt_rx_thread_suspend returns null;
u16 last_msg_id;
};
+typedef message_table_entry
+{
+ u16 index;
+ u8 name[64];
+};
+
/*
* Create a socket client registration.
*/
};
define sockclnt_create_reply {
+ u32 client_index;
u32 context; /* opaque value from the create request */
i32 response; /* Non-negative = success */
- u64 handle; /* handle by which vlib knows this client */
u32 index; /* index, used e.g. by API trace replay */
+ u16 count;
+ vl_api_message_table_entry_t message_table[count];
};
/*
* Delete a client registration
*/
define sockclnt_delete {
+ u32 client_index;
+ u32 context;
u32 index; /* index, used e.g. by API trace replay */
- u64 handle; /* handle by which vlib knows this client */
};
define sockclnt_delete_reply {
+ u32 context;
i32 response; /* Non-negative = success */
- u64 handle; /* in case the client wonders */
};
/*
/*
*------------------------------------------------------------------
- * socksvr_vlib.c
+ * socket_api.c
*
* Copyright (c) 2009 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
/* *INDENT-ON* */
}
+vl_api_registration_t *
+vl_socket_api_client_index_to_registration (u32 index)
+{
+ socket_main_t *sm = &socket_main;
+ if (pool_is_free_index (sm->registration_pool, ntohl (index)))
+ {
+#if DEBUG > 2
+ clib_warning ("Invalid index %d\n", ntohl (index));
+#endif
+ return 0;
+ }
+ return pool_elt_at_index (sm->registration_pool, ntohl (index));
+}
+
void
vl_socket_api_send (vl_api_registration_t * rp, u8 * elem)
{
rp = pool_elt_at_index (socket_main.registration_pool, uf->private_data);
/* Flush output vector. */
- n = write (uf->file_descriptor, rp->output_vector,
- vec_len (rp->output_vector));
- if (n < 0)
+ size_t total_bytes = vec_len (rp->output_vector);
+ size_t bytes_to_send, remaining_bytes = total_bytes;
+ void *p = rp->output_vector;
+ while (remaining_bytes > 0)
{
+ bytes_to_send = remaining_bytes > 4096 ? 4096 : remaining_bytes;
+ n = write (uf->file_descriptor, p, bytes_to_send);
+ if (n < 0)
+ {
+ if (errno == EAGAIN)
+ {
+ break;
+ }
#if DEBUG > 2
- clib_warning ("write error, close the file...\n");
+ clib_warning ("write error, close the file...\n");
#endif
- clib_file_del (fm, uf);
- vl_socket_free_registration_index (rp - socket_main.registration_pool);
- return 0;
+ clib_file_del (fm, uf);
+ vl_socket_free_registration_index (rp -
+ socket_main.registration_pool);
+ return 0;
+ }
+ remaining_bytes -= bytes_to_send;
+ p += bytes_to_send;
}
- else if (n > 0)
+
+ vec_delete (rp->output_vector, total_bytes - remaining_bytes, 0);
+ if (vec_len (rp->output_vector) <= 0
+ && (uf->flags & UNIX_FILE_DATA_AVAILABLE_TO_WRITE))
{
- vec_delete (rp->output_vector, n, 0);
- if (vec_len (rp->output_vector) <= 0
- && (uf->flags & UNIX_FILE_DATA_AVAILABLE_TO_WRITE))
- {
- uf->flags &= ~UNIX_FILE_DATA_AVAILABLE_TO_WRITE;
- fm->file_update (uf, UNIX_FILE_UPDATE_MODIFY);
- }
+ uf->flags &= ~UNIX_FILE_DATA_AVAILABLE_TO_WRITE;
+ fm->file_update (uf, UNIX_FILE_UPDATE_MODIFY);
}
return 0;
{
vl_api_registration_t *regp;
vl_api_sockclnt_create_reply_t *rp;
+ api_main_t *am = &api_main;
+ hash_pair_t *hp;
int rv = 0;
+ u32 nmsg = hash_elts (am->msg_index_by_name_and_crc);
+ u32 i = 0;
regp = socket_main.current_rp;
regp->name = format (0, "%s%c", mp->name, 0);
- rp = vl_msg_api_alloc (sizeof (*rp));
+ u32 size = sizeof (*rp) + (nmsg * sizeof (vl_api_message_table_entry_t));
+ rp = vl_msg_api_alloc (size);
rp->_vl_msg_id = htons (VL_API_SOCKCLNT_CREATE_REPLY);
- rp->handle = (uword) regp;
- rp->index = (uword) regp->vl_api_registration_pool_index;
+ rp->index = htonl (regp->vl_api_registration_pool_index);
rp->context = mp->context;
rp->response = htonl (rv);
-
+ rp->count = htons (nmsg);
+
+ /* *INDENT-OFF* */
+ hash_foreach_pair (hp, am->msg_index_by_name_and_crc,
+ ({
+ rp->message_table[i].index = htons(hp->value[0]);
+ strncpy((char *)rp->message_table[i].name, (char *)hp->key, 64-1);
+ i++;
+ }));
+ /* *INDENT-ON* */
vl_api_send_msg (regp, (u8 *) rp);
}
vl_api_registration_t *regp;
vl_api_sockclnt_delete_reply_t *rp;
- if (!pool_is_free_index (socket_main.registration_pool, mp->index))
- {
- regp = pool_elt_at_index (socket_main.registration_pool, mp->index);
+ regp = vl_api_client_index_to_registration (mp->client_index);
+ if (!regp)
+ return;
- rp = vl_msg_api_alloc (sizeof (*rp));
- rp->_vl_msg_id = htons (VL_API_SOCKCLNT_DELETE_REPLY);
- rp->handle = mp->handle;
- rp->response = htonl (1);
+ u32 reg_index = ntohl (mp->index);
+ rp = vl_msg_api_alloc (sizeof (*rp));
+ rp->_vl_msg_id = htons (VL_API_SOCKCLNT_DELETE_REPLY);
+ rp->context = mp->context;
+ if (!pool_is_free_index (socket_main.registration_pool, reg_index))
+ {
+ rp->response = htonl (1);
vl_api_send_msg (regp, (u8 *) rp);
vl_api_registration_del_file (regp);
- vl_socket_free_registration_index (mp->index);
+ vl_socket_free_registration_index (reg_index);
}
else
{
- clib_warning ("unknown client ID %d", mp->index);
+ clib_warning ("unknown client ID %d", reg_index);
+ rp->response = htonl (-1);
+ vl_api_send_msg (regp, (u8 *) rp);
}
}
clib_error_t *vl_sock_api_recv_fd_msg (int socket_fd, int fds[], int n_fds,
u32 wait);
+vl_api_registration_t *vl_socket_api_client_index_to_registration (u32 index);
+
#endif /* SRC_VLIBMEMORY_SOCKET_API_H_ */
/*
vl_api_send_msg (reg, (u8 *) rmp);
}
-#define foreach_vlib_api_msg \
-_(GET_FIRST_MSG_ID, get_first_msg_id) \
+#define foreach_vlib_api_msg \
+_(GET_FIRST_MSG_ID, get_first_msg_id) \
_(API_VERSIONS, api_versions)
/*
u32 client_index;
}) vl_api_header_t;
-static unsigned int
+static u32
vac_client_index (void)
{
return (api_main.my_client_index);
import fnmatch
import weakref
import atexit
-from cffi import FFI
-import cffi
from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
from . vpp_serializer import VPPMessage
else:
import queue as queue
-ffi = FFI()
-ffi.cdef("""
-typedef void (*vac_callback_t)(unsigned char * data, int len);
-typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
-int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
- int rx_qlen);
-int vac_disconnect(void);
-int vac_read(char **data, int *l, unsigned short timeout);
-int vac_write(char *data, int len);
-void vac_free(void * msg);
-
-int vac_get_msg_index(unsigned char * name);
-int vac_msg_table_size(void);
-int vac_msg_table_max_index(void);
-
-void vac_rx_suspend (void);
-void vac_rx_resume (void);
-void vac_set_error_handler(vac_error_callback_t);
- """)
-
-# Barfs on failure, no need to check success.
-vpp_api = ffi.dlopen('libvppapiclient.so')
-
def vpp_atexit(vpp_weakref):
"""Clean up VPP connection on shutdown."""
vpp_instance = vpp_weakref()
- if vpp_instance and vpp_instance.connected:
+ if vpp_instance and vpp_instance.transport.connected:
vpp_instance.logger.debug('Cleaning up VPP on exit')
vpp_instance.disconnect()
-vpp_object = None
-
-
def vpp_iterator(d):
if sys.version[0] == '2':
return d.iteritems()
return d.items()
-@ffi.callback("void(unsigned char *, int)")
-def vac_callback_sync(data, len):
- vpp_object.msg_handler_sync(ffi.buffer(data, len))
-
-
-@ffi.callback("void(unsigned char *, int)")
-def vac_callback_async(data, len):
- vpp_object.msg_handler_async(ffi.buffer(data, len))
-
-
-@ffi.callback("void(void *, unsigned char *, int)")
-def vac_error_handler(arg, msg, msg_len):
- vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
-
-
class VppApiDynamicMethodHolder(object):
pass
def __init__(self, apifiles=None, testmode=False, async_thread=True,
logger=logging.getLogger('vpp_papi'), loglevel='debug',
- read_timeout=0):
+ read_timeout=5, use_socket=False,
+ server_address='/run/vpp-api.sock'):
"""Create a VPP API object.
apifiles is a list of files containing API
loglevel, if supplied, is the log level this logger is set
to report at (from the loglevels in the logging module).
"""
- global vpp_object
- vpp_object = self
-
if logger is None:
logger = logging.getLogger(__name__)
if loglevel is not None:
self.messages = {}
self.id_names = []
self.id_msgdef = []
- self.connected = False
self.header = VPPType('header', [['u16', 'msgid'],
['u32', 'client_index']])
self.apifiles = []
self.event_callback = None
self.message_queue = queue.Queue()
self.read_timeout = read_timeout
- self.vpp_api = vpp_api
self.async_thread = async_thread
+ if use_socket:
+ from . vpp_transport_socket import VppTransport
+ else:
+ from . vpp_transport_shmem import VppTransport
+
if not apifiles:
# Pick up API definitions from default directory
try:
if len(self.messages) == 0 and not testmode:
raise ValueError(1, 'Missing JSON message definitions')
+ self.transport = VppTransport(self, read_timeout=read_timeout,
+ server_address=server_address)
# Make sure we allow VPP to clean up the message rings.
atexit.register(vpp_atexit, weakref.ref(self))
- # Register error handler
- if not testmode:
- vpp_api.vac_set_error_handler(vac_error_handler)
-
- # Support legacy CFFI
- # from_buffer supported from 1.8.0
- (major, minor, patch) = [int(s) for s in
- cffi.__version__.split('.', 3)]
- if major >= 1 and minor >= 8:
- self._write = self._write_new_cffi
- else:
- self._write = self._write_legacy_cffi
-
class ContextId(object):
"""Thread-safe provider of unique context IDs."""
def __init__(self):
return api_files
- def status(self):
- """Debug function: report current VPP API status to stdout."""
- print('Connected') if self.connected else print('Not Connected')
- print('Read API definitions from', ', '.join(self.apifiles))
-
@property
def api(self):
if not hasattr(self, "_api"):
self._api = VppApiDynamicMethodHolder()
for name, msg in vpp_iterator(self.messages):
n = name + '_' + msg.crc[2:]
- i = vpp_api.vac_get_msg_index(n.encode())
+ i = self.transport.get_msg_index(n.encode())
if i > 0:
self.id_msgdef[i] = msg
self.id_names[i] = name
self.logger.debug(
'No such message type or failed CRC checksum: %s', n)
- def _write_new_cffi(self, buf):
- """Send a binary-packed message to VPP."""
- if not self.connected:
- raise IOError(1, 'Not connected')
- return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
-
- def _write_legacy_cffi(self, buf):
- """Send a binary-packed message to VPP."""
- if not self.connected:
- raise IOError(1, 'Not connected')
- return vpp_api.vac_write(bytes(buf), len(buf))
-
- def _read(self):
- if not self.connected:
- raise IOError(1, 'Not connected')
- mem = ffi.new("char **")
- size = ffi.new("int *")
- rv = vpp_api.vac_read(mem, size, self.read_timeout)
- if rv:
- raise IOError(rv, 'vac_read failed')
- msg = bytes(ffi.buffer(mem[0], size[0]))
- vpp_api.vac_free(mem[0])
- return msg
-
def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
do_async):
- pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
- rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
+ pfx = chroot_prefix.encode() if chroot_prefix else None
+
+ rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
if rv != 0:
raise IOError(2, 'Connect failed')
- self.connected = True
- self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
+ self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
self._register_functions(do_async=do_async)
# Initialise control ping
crc = self.messages['control_ping'].crc
- self.control_ping_index = vpp_api.vac_get_msg_index(
+ self.control_ping_index = self.transport.get_msg_index(
('control_ping' + '_' + crc[2:]).encode())
self.control_ping_msgdef = self.messages['control_ping']
if self.async_thread:
rx_qlen - the length of the VPP message receive queue between
client and server.
"""
- msg_handler = vac_callback_sync if not do_async else vac_callback_async
+ msg_handler = self.transport.get_callback(do_async)
return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
do_async)
client and server.
"""
- return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
+ return self.connect_internal(name, None, chroot_prefix, rx_qlen,
do_async=False)
def disconnect(self):
"""Detach from VPP."""
- rv = vpp_api.vac_disconnect()
- self.connected = False
+ rv = self.transport.disconnect()
self.message_queue.put("terminate event thread")
return rv
context = kwargs['context']
kwargs['_vl_msg_id'] = i
+ try:
+ if self.transport.socket_index:
+ kwargs['client_index'] = self.transport.socket_index
+ except AttributeError:
+ pass
self.validate_args(msg, kwargs)
b = msg.pack(kwargs)
- vpp_api.vac_rx_suspend()
- self._write(b)
+ self.transport.suspend()
+
+ self.transport.write(b)
if multipart:
# Send a ping after the request - we use its response
# Block until we get a reply.
rl = []
while (True):
- msg = self._read()
+ msg = self.transport.read()
if not msg:
raise IOError(2, 'VPP API client: read failed')
r = self.decode_incoming_msg(msg)
msgname = type(r).__name__
if context not in r or r.context == 0 or context != r.context:
+ # Message being queued
self.message_queue.put_nowait(r)
continue
rl.append(r)
- vpp_api.vac_rx_resume()
+ self.transport.resume()
return rl
kwargs['context'] = context
else:
context = kwargs['context']
- kwargs['client_index'] = 0
+ try:
+ if self.transport.socket_index:
+ kwargs['client_index'] = self.transport.socket_index
+ except AttributeError:
+ kwargs['client_index'] = 0
kwargs['_vl_msg_id'] = i
b = msg.pack(kwargs)
- self._write(b)
+ self.transport.write(b)
def register_event_callback(self, callback):
"""Register a callback for async messages.
self.event_callback = callback
def thread_msg_handler(self):
- """Python thread calling the user registerd message handler.
+ """Python thread calling the user registered message handler.
This is to emulate the old style event callback scheme. Modern
clients should provide their own thread to poll the event
if len(data[offset:]) < self.num:
raise ValueError('Invalid array length for "{}" got {}'
' expected {}'
- .format(self.name, len(data), self.num))
+ .format(self.name, len(data[offset:]), self.num))
return self.packer.unpack(data, offset)
--- /dev/null
+#
+# A transport class. With two implementations.
+# One for socket and one for shared memory.
+#
+
+from cffi import FFI
+import cffi
+
+ffi = FFI()
+ffi.cdef("""
+typedef void (*vac_callback_t)(unsigned char * data, int len);
+typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
+int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
+ int rx_qlen);
+int vac_disconnect(void);
+int vac_read(char **data, int *l, unsigned short timeout);
+int vac_write(char *data, int len);
+void vac_free(void * msg);
+
+int vac_get_msg_index(unsigned char * name);
+int vac_msg_table_size(void);
+int vac_msg_table_max_index(void);
+
+void vac_rx_suspend (void);
+void vac_rx_resume (void);
+void vac_set_error_handler(vac_error_callback_t);
+ """)
+
+vpp_object = None
+
+# Barfs on failure, no need to check success.
+vpp_api = ffi.dlopen('libvppapiclient.so')
+
+
+@ffi.callback("void(unsigned char *, int)")
+def vac_callback_sync(data, len):
+ vpp_object.msg_handler_sync(ffi.buffer(data, len))
+
+
+@ffi.callback("void(unsigned char *, int)")
+def vac_callback_async(data, len):
+ vpp_object.msg_handler_async(ffi.buffer(data, len))
+
+
+@ffi.callback("void(void *, unsigned char *, int)")
+def vac_error_handler(arg, msg, msg_len):
+ vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
+
+
+class VppTransport:
+ def __init__(self, parent, read_timeout, server_address):
+ self.connected = False
+ self.read_timeout = read_timeout
+ self.parent = parent
+ global vpp_object
+ vpp_object = parent
+
+ # Register error handler
+ vpp_api.vac_set_error_handler(vac_error_handler)
+
+ # Support legacy CFFI
+ # from_buffer supported from 1.8.0
+ (major, minor, patch) = [int(s) for s in
+ cffi.__version__.split('.', 3)]
+ if major >= 1 and minor >= 8:
+ self.write = self._write_new_cffi
+ else:
+ self.write = self._write_legacy_cffi
+
+ def connect(self, name, pfx, msg_handler, rx_qlen):
+ self.connected = True
+ if not pfx:
+ pfx = ffi.NULL
+ return vpp_api.vac_connect(name, pfx, msg_handler, rx_qlen)
+
+ def disconnect(self):
+ self.connected = False
+ vpp_api.vac_disconnect()
+
+ def suspend(self):
+ vpp_api.vac_rx_suspend()
+
+ def resume(self):
+ vpp_api.vac_rx_resume()
+
+ def get_callback(self, async):
+ return vac_callback_sync if not async else vac_callback_async
+
+ def get_msg_index(self, name):
+ return vpp_api.vac_get_msg_index(name)
+
+ def msg_table_max_index(self):
+ return vpp_api.vac_msg_table_max_index()
+
+ def _write_new_cffi(self, buf):
+ """Send a binary-packed message to VPP."""
+ if not self.connected:
+ raise IOError(1, 'Not connected')
+ return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
+
+ def _write_legacy_cffi(self, buf):
+ """Send a binary-packed message to VPP."""
+ if not self.connected:
+ raise IOError(1, 'Not connected')
+ return vpp_api.vac_write(bytes(buf), len(buf))
+
+ def read(self):
+ if not self.connected:
+ raise IOError(1, 'Not connected')
+ mem = ffi.new("char **")
+ size = ffi.new("int *")
+ rv = vpp_api.vac_read(mem, size, self.read_timeout)
+ if rv:
+ raise IOError(rv, 'vac_read failed')
+ msg = bytes(ffi.buffer(mem[0], size[0]))
+ vpp_api.vac_free(mem[0])
+ return msg
--- /dev/null
+#
+# VPP Unix Domain Socket Transport.
+#
+import socket
+import struct
+import threading
+import select
+import multiprocessing
+import logging
+
+
+class VppTransport:
+ def __init__(self, parent, read_timeout, server_address):
+ self.connected = False
+ self.read_timeout = read_timeout if read_timeout > 0 else 1
+ self.parent = parent
+ self.server_address = server_address
+ self.header = struct.Struct('>QII')
+ self.message_table = {}
+ self.sque = multiprocessing.Queue()
+ self.q = multiprocessing.Queue()
+ self.message_thread = threading.Thread(target=self.msg_thread_func)
+
+ def msg_thread_func(self):
+ while True:
+ try:
+ rlist, _, _ = select.select([self.socket,
+ self.sque._reader], [], [])
+ except socket.error:
+ # Terminate thread
+ logging.error('select failed')
+ self.q.put(None)
+ return
+
+ for r in rlist:
+ if r == self.sque._reader:
+ # Terminate
+ self.q.put(None)
+ return
+
+ elif r == self.socket:
+ try:
+ msg = self._read()
+ if not msg:
+ self.q.put(None)
+ return
+ except socket.error:
+ self.q.put(None)
+ return
+ # Put either to local queue or if context == 0
+ # callback queue
+ r = self.parent.decode_incoming_msg(msg)
+ if hasattr(r, 'context') and r.context > 0:
+ self.q.put(msg)
+ else:
+ self.parent.msg_handler_async(msg)
+ else:
+ raise IOError(2, 'Unknown response from select')
+
+ def connect(self, name, pfx, msg_handler, rx_qlen):
+
+ # Create a UDS socket
+ self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
+ self.socket.settimeout(self.read_timeout)
+
+ # Connect the socket to the port where the server is listening
+ try:
+ self.socket.connect(self.server_address)
+ except socket.error as msg:
+ logging.error(msg)
+ raise
+
+ self.connected = True
+
+ # Initialise sockclnt_create
+ sockclnt_create = self.parent.messages['sockclnt_create']
+ sockclnt_create_reply = self.parent.messages['sockclnt_create_reply']
+
+ args = {'_vl_msg_id': 15,
+ 'name': name,
+ 'context': 124}
+ b = sockclnt_create.pack(args)
+ self.write(b)
+ msg = self._read()
+ hdr, length = self.parent.header.unpack(msg, 0)
+ if hdr.msgid != 16:
+ raise IOError('Invalid reply message')
+
+ r, length = sockclnt_create_reply.unpack(msg)
+ self.socket_index = r.index
+ for m in r.message_table:
+ n = m.name.rstrip(b'\x00\x13')
+ self.message_table[n] = m.index
+
+ self.message_thread.daemon = True
+ self.message_thread.start()
+
+ return 0
+
+ def disconnect(self):
+ try: # Might fail, if VPP closes socket before packet makes it out
+ rv = self.parent.api.sockclnt_delete(index=self.socket_index)
+ except IOError:
+ pass
+ self.connected = False
+ self.socket.close()
+ self.sque.put(True) # Terminate listening thread
+ self.message_thread.join()
+
+ def suspend(self):
+ pass
+
+ def resume(self):
+ pass
+
+ def callback(self):
+ raise NotImplemented
+
+ def get_callback(self, async):
+ return self.callback
+
+ def get_msg_index(self, name):
+ try:
+ return self.message_table[name]
+ except KeyError:
+ return 0
+
+ def msg_table_max_index(self):
+ return len(self.message_table)
+
+ def write(self, buf):
+ """Send a binary-packed message to VPP."""
+ if not self.connected:
+ raise IOError(1, 'Not connected')
+
+ # Send header
+ header = self.header.pack(0, len(buf), 0)
+ n = self.socket.send(header)
+ n = self.socket.send(buf)
+
+ def _read(self):
+ # Header and message
+ try:
+ msg = self.socket.recv(4096)
+ if len(msg) == 0:
+ return None
+ except socket.error as message:
+ logging.error(message)
+ raise
+
+ (_, l, _) = self.header.unpack(msg[:16])
+
+ if l > len(msg):
+ buf = bytearray(l + 16)
+ view = memoryview(buf)
+ view[:4096] = msg
+ view = view[4096:]
+ # Read rest of message
+ remaining_bytes = l - 4096 + 16
+ while remaining_bytes > 0:
+ bytes_to_read = (remaining_bytes if remaining_bytes
+ <= 4096 else 4096)
+ nbytes = self.socket.recv_into(view, bytes_to_read)
+ if nbytes == 0:
+ logging.error('recv failed')
+ break
+ view = view[nbytes:]
+ remaining_bytes -= nbytes
+ else:
+ buf = msg
+ return buf[16:]
+
+ def read(self):
+ if not self.connected:
+ raise IOError(1, 'Not connected')
+ return self.q.get()