PAPI: Use UNIX domain sockets instead of shared memory 10/13910/9
authorOle Troan <ot@cisco.com>
Thu, 2 Aug 2018 09:58:12 +0000 (11:58 +0200)
committerFlorin Coras <florin.coras@gmail.com>
Tue, 2 Oct 2018 21:10:20 +0000 (21:10 +0000)
Adds support for running the API purely across Unix domain sockets.
Usage: vpp = VPP(use_socket=True)

Change-Id: Iafc1301e03dd3edc3f4d702dd6c0b98d3b50b69e
Signed-off-by: Ole Troan <ot@cisco.com>
src/vlibapi/api_common.h
src/vlibmemory/api.h
src/vlibmemory/memclnt.api
src/vlibmemory/socket_api.c
src/vlibmemory/socket_api.h
src/vlibmemory/vlib_api.c
src/vpp-api/client/client.c
src/vpp-api/python/vpp_papi/vpp_papi.py
src/vpp-api/python/vpp_papi/vpp_serializer.py
src/vpp-api/python/vpp_papi/vpp_transport_shmem.py [new file with mode: 0644]
src/vpp-api/python/vpp_papi/vpp_transport_socket.py [new file with mode: 0644]

index 9432082..497a1e8 100644 (file)
@@ -64,7 +64,7 @@ typedef struct vl_api_registration_
   u32 clib_file_index;         /**< Socket only: file index */
   i8 *unprocessed_input;       /**< Socket only: pending input */
   u32 unprocessed_msg_length;  /**< Socket only: unprocssed length */
-  u8 *output_vector;           /**< Socket only: output vecto  */
+  u8 *output_vector;           /**< Socket only: output vector */
   int *additional_fds_to_close;
 
   /* socket client only */
index 2146b16..d66c439 100644 (file)
@@ -55,10 +55,11 @@ vl_api_can_send_msg (vl_api_registration_t * rp)
 always_inline vl_api_registration_t *
 vl_api_client_index_to_registration (u32 index)
 {
-  if (PREDICT_FALSE (socket_main.current_rp != 0))
-    return socket_main.current_rp;
-
-  return (vl_mem_api_client_index_to_registration (index));
+  vl_api_registration_t *reg =
+    vl_socket_api_client_index_to_registration (index);
+  if (reg && reg->registration_type != REGISTRATION_TYPE_FREE)
+    return reg;
+  return vl_mem_api_client_index_to_registration (index);
 }
 
 always_inline u32
index f88e5bd..451bc0e 100644 (file)
@@ -17,7 +17,7 @@
 option version = "2.0.0";
 
 /*
- * Define services not following the normal convetions here
+ * Define services not following the normal conventions here
  */
 service {
   rpc memclnt_rx_thread_suspend returns null;
@@ -145,6 +145,12 @@ manual_print define trace_plugin_msg_ids
     u16 last_msg_id;
 };
 
+typedef message_table_entry
+{
+  u16 index;
+  u8 name[64];
+};
+
 /*
  * Create a socket client registration. 
  */
@@ -154,23 +160,26 @@ define sockclnt_create {
 };
 
 define sockclnt_create_reply {
+    u32 client_index;
     u32 context;                /* opaque value from the create request */
     i32 response;               /* Non-negative = success */
-    u64 handle;                /* handle by which vlib knows this client */
     u32 index;                  /* index, used e.g. by API trace replay */
+    u16 count;
+    vl_api_message_table_entry_t message_table[count];
 };
 
 /*
  * Delete a client registration 
  */
 define sockclnt_delete {
+    u32 client_index;
+    u32 context;
     u32 index;                  /* index, used e.g. by API trace replay */
-    u64 handle;                /* handle by which vlib knows this client */
 };
 
 define sockclnt_delete_reply {
+    u32 context;
     i32 response;               /* Non-negative = success */
-    u64 handle;                /* in case the client wonders */
 };
 
 /*
index afe02d2..4787069 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *------------------------------------------------------------------
- * socksvr_vlib.c
+ * socket_api.c
  *
  * Copyright (c) 2009 Cisco and/or its affiliates.
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -74,6 +74,20 @@ vl_sock_api_dump_clients (vlib_main_t * vm, api_main_t * am)
 /* *INDENT-ON* */
 }
 
+vl_api_registration_t *
+vl_socket_api_client_index_to_registration (u32 index)
+{
+  socket_main_t *sm = &socket_main;
+  if (pool_is_free_index (sm->registration_pool, ntohl (index)))
+    {
+#if DEBUG > 2
+      clib_warning ("Invalid index %d\n", ntohl (index));
+#endif
+      return 0;
+    }
+  return pool_elt_at_index (sm->registration_pool, ntohl (index));
+}
+
 void
 vl_socket_api_send (vl_api_registration_t * rp, u8 * elem)
 {
@@ -289,26 +303,37 @@ vl_socket_write_ready (clib_file_t * uf)
   rp = pool_elt_at_index (socket_main.registration_pool, uf->private_data);
 
   /* Flush output vector. */
-  n = write (uf->file_descriptor, rp->output_vector,
-            vec_len (rp->output_vector));
-  if (n < 0)
+  size_t total_bytes = vec_len (rp->output_vector);
+  size_t bytes_to_send, remaining_bytes = total_bytes;
+  void *p = rp->output_vector;
+  while (remaining_bytes > 0)
     {
+      bytes_to_send = remaining_bytes > 4096 ? 4096 : remaining_bytes;
+      n = write (uf->file_descriptor, p, bytes_to_send);
+      if (n < 0)
+       {
+         if (errno == EAGAIN)
+           {
+             break;
+           }
 #if DEBUG > 2
-      clib_warning ("write error, close the file...\n");
+         clib_warning ("write error, close the file...\n");
 #endif
-      clib_file_del (fm, uf);
-      vl_socket_free_registration_index (rp - socket_main.registration_pool);
-      return 0;
+         clib_file_del (fm, uf);
+         vl_socket_free_registration_index (rp -
+                                            socket_main.registration_pool);
+         return 0;
+       }
+      remaining_bytes -= bytes_to_send;
+      p += bytes_to_send;
     }
-  else if (n > 0)
+
+  vec_delete (rp->output_vector, total_bytes - remaining_bytes, 0);
+  if (vec_len (rp->output_vector) <= 0
+      && (uf->flags & UNIX_FILE_DATA_AVAILABLE_TO_WRITE))
     {
-      vec_delete (rp->output_vector, n, 0);
-      if (vec_len (rp->output_vector) <= 0
-         && (uf->flags & UNIX_FILE_DATA_AVAILABLE_TO_WRITE))
-       {
-         uf->flags &= ~UNIX_FILE_DATA_AVAILABLE_TO_WRITE;
-         fm->file_update (uf, UNIX_FILE_UPDATE_MODIFY);
-       }
+      uf->flags &= ~UNIX_FILE_DATA_AVAILABLE_TO_WRITE;
+      fm->file_update (uf, UNIX_FILE_UPDATE_MODIFY);
     }
 
   return 0;
@@ -379,7 +404,11 @@ vl_api_sockclnt_create_t_handler (vl_api_sockclnt_create_t * mp)
 {
   vl_api_registration_t *regp;
   vl_api_sockclnt_create_reply_t *rp;
+  api_main_t *am = &api_main;
+  hash_pair_t *hp;
   int rv = 0;
+  u32 nmsg = hash_elts (am->msg_index_by_name_and_crc);
+  u32 i = 0;
 
   regp = socket_main.current_rp;
 
@@ -387,13 +416,22 @@ vl_api_sockclnt_create_t_handler (vl_api_sockclnt_create_t * mp)
 
   regp->name = format (0, "%s%c", mp->name, 0);
 
-  rp = vl_msg_api_alloc (sizeof (*rp));
+  u32 size = sizeof (*rp) + (nmsg * sizeof (vl_api_message_table_entry_t));
+  rp = vl_msg_api_alloc (size);
   rp->_vl_msg_id = htons (VL_API_SOCKCLNT_CREATE_REPLY);
-  rp->handle = (uword) regp;
-  rp->index = (uword) regp->vl_api_registration_pool_index;
+  rp->index = htonl (regp->vl_api_registration_pool_index);
   rp->context = mp->context;
   rp->response = htonl (rv);
-
+  rp->count = htons (nmsg);
+
+  /* *INDENT-OFF* */
+  hash_foreach_pair (hp, am->msg_index_by_name_and_crc,
+  ({
+    rp->message_table[i].index = htons(hp->value[0]);
+    strncpy((char *)rp->message_table[i].name, (char *)hp->key, 64-1);
+    i++;
+  }));
+  /* *INDENT-ON* */
   vl_api_send_msg (regp, (u8 *) rp);
 }
 
@@ -406,23 +444,28 @@ vl_api_sockclnt_delete_t_handler (vl_api_sockclnt_delete_t * mp)
   vl_api_registration_t *regp;
   vl_api_sockclnt_delete_reply_t *rp;
 
-  if (!pool_is_free_index (socket_main.registration_pool, mp->index))
-    {
-      regp = pool_elt_at_index (socket_main.registration_pool, mp->index);
+  regp = vl_api_client_index_to_registration (mp->client_index);
+  if (!regp)
+    return;
 
-      rp = vl_msg_api_alloc (sizeof (*rp));
-      rp->_vl_msg_id = htons (VL_API_SOCKCLNT_DELETE_REPLY);
-      rp->handle = mp->handle;
-      rp->response = htonl (1);
+  u32 reg_index = ntohl (mp->index);
+  rp = vl_msg_api_alloc (sizeof (*rp));
+  rp->_vl_msg_id = htons (VL_API_SOCKCLNT_DELETE_REPLY);
+  rp->context = mp->context;
 
+  if (!pool_is_free_index (socket_main.registration_pool, reg_index))
+    {
+      rp->response = htonl (1);
       vl_api_send_msg (regp, (u8 *) rp);
 
       vl_api_registration_del_file (regp);
-      vl_socket_free_registration_index (mp->index);
+      vl_socket_free_registration_index (reg_index);
     }
   else
     {
-      clib_warning ("unknown client ID %d", mp->index);
+      clib_warning ("unknown client ID %d", reg_index);
+      rp->response = htonl (-1);
+      vl_api_send_msg (regp, (u8 *) rp);
     }
 }
 
index 1e99550..00985fe 100644 (file)
@@ -75,6 +75,8 @@ clib_error_t *vl_sock_api_send_fd_msg (int socket_fd, int fds[], int n_fds);
 clib_error_t *vl_sock_api_recv_fd_msg (int socket_fd, int fds[], int n_fds,
                                       u32 wait);
 
+vl_api_registration_t *vl_socket_api_client_index_to_registration (u32 index);
+
 #endif /* SRC_VLIBMEMORY_SOCKET_API_H_ */
 
 /*
index 65fa34b..15a0ba8 100644 (file)
@@ -165,8 +165,8 @@ vl_api_api_versions_t_handler (vl_api_api_versions_t * mp)
   vl_api_send_msg (reg, (u8 *) rmp);
 }
 
-#define foreach_vlib_api_msg                            \
-_(GET_FIRST_MSG_ID, get_first_msg_id)                   \
+#define foreach_vlib_api_msg                           \
+_(GET_FIRST_MSG_ID, get_first_msg_id)                  \
 _(API_VERSIONS, api_versions)
 
 /*
index 6c922db..68269bb 100644 (file)
@@ -507,7 +507,7 @@ typedef VL_API_PACKED(struct _vl_api_header {
   u32 client_index;
 }) vl_api_header_t;
 
-static unsigned int
+static u32
 vac_client_index (void)
 {
   return (api_main.my_client_index);
index 4f765ec..e1a7059 100644 (file)
@@ -26,8 +26,6 @@ import threading
 import fnmatch
 import weakref
 import atexit
-from cffi import FFI
-import cffi
 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
 from . vpp_serializer import VPPMessage
 
@@ -36,41 +34,15 @@ if sys.version[0] == '2':
 else:
     import queue as queue
 
-ffi = FFI()
-ffi.cdef("""
-typedef void (*vac_callback_t)(unsigned char * data, int len);
-typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
-int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
-    int rx_qlen);
-int vac_disconnect(void);
-int vac_read(char **data, int *l, unsigned short timeout);
-int vac_write(char *data, int len);
-void vac_free(void * msg);
-
-int vac_get_msg_index(unsigned char * name);
-int vac_msg_table_size(void);
-int vac_msg_table_max_index(void);
-
-void vac_rx_suspend (void);
-void vac_rx_resume (void);
-void vac_set_error_handler(vac_error_callback_t);
- """)
-
-# Barfs on failure, no need to check success.
-vpp_api = ffi.dlopen('libvppapiclient.so')
-
 
 def vpp_atexit(vpp_weakref):
     """Clean up VPP connection on shutdown."""
     vpp_instance = vpp_weakref()
-    if vpp_instance and vpp_instance.connected:
+    if vpp_instance and vpp_instance.transport.connected:
         vpp_instance.logger.debug('Cleaning up VPP on exit')
         vpp_instance.disconnect()
 
 
-vpp_object = None
-
-
 def vpp_iterator(d):
     if sys.version[0] == '2':
         return d.iteritems()
@@ -78,21 +50,6 @@ def vpp_iterator(d):
         return d.items()
 
 
-@ffi.callback("void(unsigned char *, int)")
-def vac_callback_sync(data, len):
-    vpp_object.msg_handler_sync(ffi.buffer(data, len))
-
-
-@ffi.callback("void(unsigned char *, int)")
-def vac_callback_async(data, len):
-    vpp_object.msg_handler_async(ffi.buffer(data, len))
-
-
-@ffi.callback("void(void *, unsigned char *, int)")
-def vac_error_handler(arg, msg, msg_len):
-    vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
-
-
 class VppApiDynamicMethodHolder(object):
     pass
 
@@ -168,7 +125,8 @@ class VPP():
 
     def __init__(self, apifiles=None, testmode=False, async_thread=True,
                  logger=logging.getLogger('vpp_papi'), loglevel='debug',
-                 read_timeout=0):
+                 read_timeout=5, use_socket=False,
+                 server_address='/run/vpp-api.sock'):
         """Create a VPP API object.
 
         apifiles is a list of files containing API
@@ -181,9 +139,6 @@ class VPP():
         loglevel, if supplied, is the log level this logger is set
         to report at (from the loglevels in the logging module).
         """
-        global vpp_object
-        vpp_object = self
-
         if logger is None:
             logger = logging.getLogger(__name__)
             if loglevel is not None:
@@ -193,16 +148,19 @@ class VPP():
         self.messages = {}
         self.id_names = []
         self.id_msgdef = []
-        self.connected = False
         self.header = VPPType('header', [['u16', 'msgid'],
                                          ['u32', 'client_index']])
         self.apifiles = []
         self.event_callback = None
         self.message_queue = queue.Queue()
         self.read_timeout = read_timeout
-        self.vpp_api = vpp_api
         self.async_thread = async_thread
 
+        if use_socket:
+            from . vpp_transport_socket import VppTransport
+        else:
+            from . vpp_transport_shmem import VppTransport
+
         if not apifiles:
             # Pick up API definitions from default directory
             try:
@@ -224,22 +182,11 @@ class VPP():
         if len(self.messages) == 0 and not testmode:
             raise ValueError(1, 'Missing JSON message definitions')
 
+        self.transport = VppTransport(self, read_timeout=read_timeout,
+                                      server_address=server_address)
         # Make sure we allow VPP to clean up the message rings.
         atexit.register(vpp_atexit, weakref.ref(self))
 
-        # Register error handler
-        if not testmode:
-            vpp_api.vac_set_error_handler(vac_error_handler)
-
-        # Support legacy CFFI
-        # from_buffer supported from 1.8.0
-        (major, minor, patch) = [int(s) for s in
-                                 cffi.__version__.split('.', 3)]
-        if major >= 1 and minor >= 8:
-            self._write = self._write_new_cffi
-        else:
-            self._write = self._write_legacy_cffi
-
     class ContextId(object):
         """Thread-safe provider of unique context IDs."""
         def __init__(self):
@@ -377,11 +324,6 @@ class VPP():
 
         return api_files
 
-    def status(self):
-        """Debug function: report current VPP API status to stdout."""
-        print('Connected') if self.connected else print('Not Connected')
-        print('Read API definitions from', ', '.join(self.apifiles))
-
     @property
     def api(self):
         if not hasattr(self, "_api"):
@@ -408,7 +350,7 @@ class VPP():
         self._api = VppApiDynamicMethodHolder()
         for name, msg in vpp_iterator(self.messages):
             n = name + '_' + msg.crc[2:]
-            i = vpp_api.vac_get_msg_index(n.encode())
+            i = self.transport.get_msg_index(n.encode())
             if i > 0:
                 self.id_msgdef[i] = msg
                 self.id_names[i] = name
@@ -420,43 +362,19 @@ class VPP():
                 self.logger.debug(
                     'No such message type or failed CRC checksum: %s', n)
 
-    def _write_new_cffi(self, buf):
-        """Send a binary-packed message to VPP."""
-        if not self.connected:
-            raise IOError(1, 'Not connected')
-        return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
-
-    def _write_legacy_cffi(self, buf):
-        """Send a binary-packed message to VPP."""
-        if not self.connected:
-            raise IOError(1, 'Not connected')
-        return vpp_api.vac_write(bytes(buf), len(buf))
-
-    def _read(self):
-        if not self.connected:
-            raise IOError(1, 'Not connected')
-        mem = ffi.new("char **")
-        size = ffi.new("int *")
-        rv = vpp_api.vac_read(mem, size, self.read_timeout)
-        if rv:
-            raise IOError(rv, 'vac_read failed')
-        msg = bytes(ffi.buffer(mem[0], size[0]))
-        vpp_api.vac_free(mem[0])
-        return msg
-
     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
                          do_async):
-        pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
-        rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
+        pfx = chroot_prefix.encode() if chroot_prefix else None
+
+        rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
         if rv != 0:
             raise IOError(2, 'Connect failed')
-        self.connected = True
-        self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
+        self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
         self._register_functions(do_async=do_async)
 
         # Initialise control ping
         crc = self.messages['control_ping'].crc
-        self.control_ping_index = vpp_api.vac_get_msg_index(
+        self.control_ping_index = self.transport.get_msg_index(
             ('control_ping' + '_' + crc[2:]).encode())
         self.control_ping_msgdef = self.messages['control_ping']
         if self.async_thread:
@@ -475,7 +393,7 @@ class VPP():
         rx_qlen - the length of the VPP message receive queue between
         client and server.
         """
-        msg_handler = vac_callback_sync if not do_async else vac_callback_async
+        msg_handler = self.transport.get_callback(do_async)
         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
                                      do_async)
 
@@ -488,13 +406,12 @@ class VPP():
         client and server.
         """
 
-        return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
+        return self.connect_internal(name, None, chroot_prefix, rx_qlen,
                                      do_async=False)
 
     def disconnect(self):
         """Detach from VPP."""
-        rv = vpp_api.vac_disconnect()
-        self.connected = False
+        rv = self.transport.disconnect()
         self.message_queue.put("terminate event thread")
         return rv
 
@@ -586,10 +503,16 @@ class VPP():
             context = kwargs['context']
         kwargs['_vl_msg_id'] = i
 
+        try:
+            if self.transport.socket_index:
+                kwargs['client_index'] = self.transport.socket_index
+        except AttributeError:
+            pass
         self.validate_args(msg, kwargs)
         b = msg.pack(kwargs)
-        vpp_api.vac_rx_suspend()
-        self._write(b)
+        self.transport.suspend()
+
+        self.transport.write(b)
 
         if multipart:
             # Send a ping after the request - we use its response
@@ -599,12 +522,13 @@ class VPP():
         # Block until we get a reply.
         rl = []
         while (True):
-            msg = self._read()
+            msg = self.transport.read()
             if not msg:
                 raise IOError(2, 'VPP API client: read failed')
             r = self.decode_incoming_msg(msg)
             msgname = type(r).__name__
             if context not in r or r.context == 0 or context != r.context:
+                # Message being queued
                 self.message_queue.put_nowait(r)
                 continue
 
@@ -616,7 +540,7 @@ class VPP():
 
             rl.append(r)
 
-        vpp_api.vac_rx_resume()
+        self.transport.resume()
 
         return rl
 
@@ -634,11 +558,15 @@ class VPP():
             kwargs['context'] = context
         else:
             context = kwargs['context']
-        kwargs['client_index'] = 0
+        try:
+            if self.transport.socket_index:
+                kwargs['client_index'] = self.transport.socket_index
+        except AttributeError:
+            kwargs['client_index'] = 0
         kwargs['_vl_msg_id'] = i
         b = msg.pack(kwargs)
 
-        self._write(b)
+        self.transport.write(b)
 
     def register_event_callback(self, callback):
         """Register a callback for async messages.
@@ -659,7 +587,7 @@ class VPP():
         self.event_callback = callback
 
     def thread_msg_handler(self):
-        """Python thread calling the user registerd message handler.
+        """Python thread calling the user registered message handler.
 
         This is to emulate the old style event callback scheme. Modern
         clients should provide their own thread to poll the event
index 2177cdb..103a078 100644 (file)
@@ -80,7 +80,7 @@ class FixedList_u8():
         if len(data[offset:]) < self.num:
             raise ValueError('Invalid array length for "{}" got {}'
                              ' expected {}'
-                             .format(self.name, len(data), self.num))
+                             .format(self.name, len(data[offset:]), self.num))
         return self.packer.unpack(data, offset)
 
 
diff --git a/src/vpp-api/python/vpp_papi/vpp_transport_shmem.py b/src/vpp-api/python/vpp_papi/vpp_transport_shmem.py
new file mode 100644 (file)
index 0000000..a20295b
--- /dev/null
@@ -0,0 +1,117 @@
+#
+# A transport class. With two implementations.
+# One for socket and one for shared memory.
+#
+
+from cffi import FFI
+import cffi
+
+ffi = FFI()
+ffi.cdef("""
+typedef void (*vac_callback_t)(unsigned char * data, int len);
+typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
+int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
+    int rx_qlen);
+int vac_disconnect(void);
+int vac_read(char **data, int *l, unsigned short timeout);
+int vac_write(char *data, int len);
+void vac_free(void * msg);
+
+int vac_get_msg_index(unsigned char * name);
+int vac_msg_table_size(void);
+int vac_msg_table_max_index(void);
+
+void vac_rx_suspend (void);
+void vac_rx_resume (void);
+void vac_set_error_handler(vac_error_callback_t);
+ """)
+
+vpp_object = None
+
+# Barfs on failure, no need to check success.
+vpp_api = ffi.dlopen('libvppapiclient.so')
+
+
+@ffi.callback("void(unsigned char *, int)")
+def vac_callback_sync(data, len):
+    vpp_object.msg_handler_sync(ffi.buffer(data, len))
+
+
+@ffi.callback("void(unsigned char *, int)")
+def vac_callback_async(data, len):
+    vpp_object.msg_handler_async(ffi.buffer(data, len))
+
+
+@ffi.callback("void(void *, unsigned char *, int)")
+def vac_error_handler(arg, msg, msg_len):
+    vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
+
+
+class VppTransport:
+    def __init__(self, parent, read_timeout, server_address):
+        self.connected = False
+        self.read_timeout = read_timeout
+        self.parent = parent
+        global vpp_object
+        vpp_object = parent
+
+        # Register error handler
+        vpp_api.vac_set_error_handler(vac_error_handler)
+
+        # Support legacy CFFI
+        # from_buffer supported from 1.8.0
+        (major, minor, patch) = [int(s) for s in
+                                 cffi.__version__.split('.', 3)]
+        if major >= 1 and minor >= 8:
+            self.write = self._write_new_cffi
+        else:
+            self.write = self._write_legacy_cffi
+
+    def connect(self, name, pfx, msg_handler, rx_qlen):
+        self.connected = True
+        if not pfx:
+            pfx = ffi.NULL
+        return vpp_api.vac_connect(name, pfx, msg_handler, rx_qlen)
+
+    def disconnect(self):
+        self.connected = False
+        vpp_api.vac_disconnect()
+
+    def suspend(self):
+        vpp_api.vac_rx_suspend()
+
+    def resume(self):
+        vpp_api.vac_rx_resume()
+
+    def get_callback(self, async):
+        return vac_callback_sync if not async else vac_callback_async
+
+    def get_msg_index(self, name):
+        return vpp_api.vac_get_msg_index(name)
+
+    def msg_table_max_index(self):
+        return vpp_api.vac_msg_table_max_index()
+
+    def _write_new_cffi(self, buf):
+        """Send a binary-packed message to VPP."""
+        if not self.connected:
+            raise IOError(1, 'Not connected')
+        return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
+
+    def _write_legacy_cffi(self, buf):
+        """Send a binary-packed message to VPP."""
+        if not self.connected:
+            raise IOError(1, 'Not connected')
+        return vpp_api.vac_write(bytes(buf), len(buf))
+
+    def read(self):
+        if not self.connected:
+            raise IOError(1, 'Not connected')
+        mem = ffi.new("char **")
+        size = ffi.new("int *")
+        rv = vpp_api.vac_read(mem, size, self.read_timeout)
+        if rv:
+            raise IOError(rv, 'vac_read failed')
+        msg = bytes(ffi.buffer(mem[0], size[0]))
+        vpp_api.vac_free(mem[0])
+        return msg
diff --git a/src/vpp-api/python/vpp_papi/vpp_transport_socket.py b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py
new file mode 100644 (file)
index 0000000..1822deb
--- /dev/null
@@ -0,0 +1,176 @@
+#
+# VPP Unix Domain Socket Transport.
+#
+import socket
+import struct
+import threading
+import select
+import multiprocessing
+import logging
+
+
+class VppTransport:
+    def __init__(self, parent, read_timeout, server_address):
+        self.connected = False
+        self.read_timeout = read_timeout if read_timeout > 0 else 1
+        self.parent = parent
+        self.server_address = server_address
+        self.header = struct.Struct('>QII')
+        self.message_table = {}
+        self.sque = multiprocessing.Queue()
+        self.q = multiprocessing.Queue()
+        self.message_thread = threading.Thread(target=self.msg_thread_func)
+
+    def msg_thread_func(self):
+        while True:
+            try:
+                rlist, _, _ = select.select([self.socket,
+                                             self.sque._reader], [], [])
+            except socket.error:
+                # Terminate thread
+                logging.error('select failed')
+                self.q.put(None)
+                return
+
+            for r in rlist:
+                if r == self.sque._reader:
+                    # Terminate
+                    self.q.put(None)
+                    return
+
+                elif r == self.socket:
+                    try:
+                        msg = self._read()
+                        if not msg:
+                            self.q.put(None)
+                            return
+                    except socket.error:
+                        self.q.put(None)
+                        return
+                    # Put either to local queue or if context == 0
+                    # callback queue
+                    r = self.parent.decode_incoming_msg(msg)
+                    if hasattr(r, 'context') and r.context > 0:
+                        self.q.put(msg)
+                    else:
+                        self.parent.msg_handler_async(msg)
+                else:
+                    raise IOError(2, 'Unknown response from select')
+
+    def connect(self, name, pfx, msg_handler, rx_qlen):
+
+        # Create a UDS socket
+        self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
+        self.socket.settimeout(self.read_timeout)
+
+        # Connect the socket to the port where the server is listening
+        try:
+            self.socket.connect(self.server_address)
+        except socket.error as msg:
+            logging.error(msg)
+            raise
+
+        self.connected = True
+
+        # Initialise sockclnt_create
+        sockclnt_create = self.parent.messages['sockclnt_create']
+        sockclnt_create_reply = self.parent.messages['sockclnt_create_reply']
+
+        args = {'_vl_msg_id': 15,
+                'name': name,
+                'context': 124}
+        b = sockclnt_create.pack(args)
+        self.write(b)
+        msg = self._read()
+        hdr, length = self.parent.header.unpack(msg, 0)
+        if hdr.msgid != 16:
+            raise IOError('Invalid reply message')
+
+        r, length = sockclnt_create_reply.unpack(msg)
+        self.socket_index = r.index
+        for m in r.message_table:
+            n = m.name.rstrip(b'\x00\x13')
+            self.message_table[n] = m.index
+
+        self.message_thread.daemon = True
+        self.message_thread.start()
+
+        return 0
+
+    def disconnect(self):
+        try:  # Might fail, if VPP closes socket before packet makes it out
+            rv = self.parent.api.sockclnt_delete(index=self.socket_index)
+        except IOError:
+            pass
+        self.connected = False
+        self.socket.close()
+        self.sque.put(True)  # Terminate listening thread
+        self.message_thread.join()
+
+    def suspend(self):
+        pass
+
+    def resume(self):
+        pass
+
+    def callback(self):
+        raise NotImplemented
+
+    def get_callback(self, async):
+        return self.callback
+
+    def get_msg_index(self, name):
+        try:
+            return self.message_table[name]
+        except KeyError:
+            return 0
+
+    def msg_table_max_index(self):
+        return len(self.message_table)
+
+    def write(self, buf):
+        """Send a binary-packed message to VPP."""
+        if not self.connected:
+            raise IOError(1, 'Not connected')
+
+        # Send header
+        header = self.header.pack(0, len(buf), 0)
+        n = self.socket.send(header)
+        n = self.socket.send(buf)
+
+    def _read(self):
+        # Header and message
+        try:
+            msg = self.socket.recv(4096)
+            if len(msg) == 0:
+                return None
+        except socket.error as message:
+            logging.error(message)
+            raise
+
+        (_, l, _) = self.header.unpack(msg[:16])
+
+        if l > len(msg):
+            buf = bytearray(l + 16)
+            view = memoryview(buf)
+            view[:4096] = msg
+            view = view[4096:]
+            # Read rest of message
+            remaining_bytes = l - 4096 + 16
+            while remaining_bytes > 0:
+                bytes_to_read = (remaining_bytes if remaining_bytes
+                                 <= 4096 else 4096)
+                nbytes = self.socket.recv_into(view, bytes_to_read)
+                if nbytes == 0:
+                    logging.error('recv failed')
+                    break
+                view = view[nbytes:]
+                remaining_bytes -= nbytes
+        else:
+            buf = msg
+        return buf[16:]
+
+    def read(self):
+        if not self.connected:
+            raise IOError(1, 'Not connected')
+        return self.q.get()