PAPI: Use UNIX domain sockets instead of shared memory
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
index 4f765ec..e1a7059 100644 (file)
@@ -26,8 +26,6 @@ import threading
 import fnmatch
 import weakref
 import atexit
 import fnmatch
 import weakref
 import atexit
-from cffi import FFI
-import cffi
 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
 from . vpp_serializer import VPPMessage
 
 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
 from . vpp_serializer import VPPMessage
 
@@ -36,41 +34,15 @@ if sys.version[0] == '2':
 else:
     import queue as queue
 
 else:
     import queue as queue
 
-ffi = FFI()
-ffi.cdef("""
-typedef void (*vac_callback_t)(unsigned char * data, int len);
-typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
-int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
-    int rx_qlen);
-int vac_disconnect(void);
-int vac_read(char **data, int *l, unsigned short timeout);
-int vac_write(char *data, int len);
-void vac_free(void * msg);
-
-int vac_get_msg_index(unsigned char * name);
-int vac_msg_table_size(void);
-int vac_msg_table_max_index(void);
-
-void vac_rx_suspend (void);
-void vac_rx_resume (void);
-void vac_set_error_handler(vac_error_callback_t);
- """)
-
-# Barfs on failure, no need to check success.
-vpp_api = ffi.dlopen('libvppapiclient.so')
-
 
 def vpp_atexit(vpp_weakref):
     """Clean up VPP connection on shutdown."""
     vpp_instance = vpp_weakref()
 
 def vpp_atexit(vpp_weakref):
     """Clean up VPP connection on shutdown."""
     vpp_instance = vpp_weakref()
-    if vpp_instance and vpp_instance.connected:
+    if vpp_instance and vpp_instance.transport.connected:
         vpp_instance.logger.debug('Cleaning up VPP on exit')
         vpp_instance.disconnect()
 
 
         vpp_instance.logger.debug('Cleaning up VPP on exit')
         vpp_instance.disconnect()
 
 
-vpp_object = None
-
-
 def vpp_iterator(d):
     if sys.version[0] == '2':
         return d.iteritems()
 def vpp_iterator(d):
     if sys.version[0] == '2':
         return d.iteritems()
@@ -78,21 +50,6 @@ def vpp_iterator(d):
         return d.items()
 
 
         return d.items()
 
 
-@ffi.callback("void(unsigned char *, int)")
-def vac_callback_sync(data, len):
-    vpp_object.msg_handler_sync(ffi.buffer(data, len))
-
-
-@ffi.callback("void(unsigned char *, int)")
-def vac_callback_async(data, len):
-    vpp_object.msg_handler_async(ffi.buffer(data, len))
-
-
-@ffi.callback("void(void *, unsigned char *, int)")
-def vac_error_handler(arg, msg, msg_len):
-    vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
-
-
 class VppApiDynamicMethodHolder(object):
     pass
 
 class VppApiDynamicMethodHolder(object):
     pass
 
@@ -168,7 +125,8 @@ class VPP():
 
     def __init__(self, apifiles=None, testmode=False, async_thread=True,
                  logger=logging.getLogger('vpp_papi'), loglevel='debug',
 
     def __init__(self, apifiles=None, testmode=False, async_thread=True,
                  logger=logging.getLogger('vpp_papi'), loglevel='debug',
-                 read_timeout=0):
+                 read_timeout=5, use_socket=False,
+                 server_address='/run/vpp-api.sock'):
         """Create a VPP API object.
 
         apifiles is a list of files containing API
         """Create a VPP API object.
 
         apifiles is a list of files containing API
@@ -181,9 +139,6 @@ class VPP():
         loglevel, if supplied, is the log level this logger is set
         to report at (from the loglevels in the logging module).
         """
         loglevel, if supplied, is the log level this logger is set
         to report at (from the loglevels in the logging module).
         """
-        global vpp_object
-        vpp_object = self
-
         if logger is None:
             logger = logging.getLogger(__name__)
             if loglevel is not None:
         if logger is None:
             logger = logging.getLogger(__name__)
             if loglevel is not None:
@@ -193,16 +148,19 @@ class VPP():
         self.messages = {}
         self.id_names = []
         self.id_msgdef = []
         self.messages = {}
         self.id_names = []
         self.id_msgdef = []
-        self.connected = False
         self.header = VPPType('header', [['u16', 'msgid'],
                                          ['u32', 'client_index']])
         self.apifiles = []
         self.event_callback = None
         self.message_queue = queue.Queue()
         self.read_timeout = read_timeout
         self.header = VPPType('header', [['u16', 'msgid'],
                                          ['u32', 'client_index']])
         self.apifiles = []
         self.event_callback = None
         self.message_queue = queue.Queue()
         self.read_timeout = read_timeout
-        self.vpp_api = vpp_api
         self.async_thread = async_thread
 
         self.async_thread = async_thread
 
+        if use_socket:
+            from . vpp_transport_socket import VppTransport
+        else:
+            from . vpp_transport_shmem import VppTransport
+
         if not apifiles:
             # Pick up API definitions from default directory
             try:
         if not apifiles:
             # Pick up API definitions from default directory
             try:
@@ -224,22 +182,11 @@ class VPP():
         if len(self.messages) == 0 and not testmode:
             raise ValueError(1, 'Missing JSON message definitions')
 
         if len(self.messages) == 0 and not testmode:
             raise ValueError(1, 'Missing JSON message definitions')
 
+        self.transport = VppTransport(self, read_timeout=read_timeout,
+                                      server_address=server_address)
         # Make sure we allow VPP to clean up the message rings.
         atexit.register(vpp_atexit, weakref.ref(self))
 
         # Make sure we allow VPP to clean up the message rings.
         atexit.register(vpp_atexit, weakref.ref(self))
 
-        # Register error handler
-        if not testmode:
-            vpp_api.vac_set_error_handler(vac_error_handler)
-
-        # Support legacy CFFI
-        # from_buffer supported from 1.8.0
-        (major, minor, patch) = [int(s) for s in
-                                 cffi.__version__.split('.', 3)]
-        if major >= 1 and minor >= 8:
-            self._write = self._write_new_cffi
-        else:
-            self._write = self._write_legacy_cffi
-
     class ContextId(object):
         """Thread-safe provider of unique context IDs."""
         def __init__(self):
     class ContextId(object):
         """Thread-safe provider of unique context IDs."""
         def __init__(self):
@@ -377,11 +324,6 @@ class VPP():
 
         return api_files
 
 
         return api_files
 
-    def status(self):
-        """Debug function: report current VPP API status to stdout."""
-        print('Connected') if self.connected else print('Not Connected')
-        print('Read API definitions from', ', '.join(self.apifiles))
-
     @property
     def api(self):
         if not hasattr(self, "_api"):
     @property
     def api(self):
         if not hasattr(self, "_api"):
@@ -408,7 +350,7 @@ class VPP():
         self._api = VppApiDynamicMethodHolder()
         for name, msg in vpp_iterator(self.messages):
             n = name + '_' + msg.crc[2:]
         self._api = VppApiDynamicMethodHolder()
         for name, msg in vpp_iterator(self.messages):
             n = name + '_' + msg.crc[2:]
-            i = vpp_api.vac_get_msg_index(n.encode())
+            i = self.transport.get_msg_index(n.encode())
             if i > 0:
                 self.id_msgdef[i] = msg
                 self.id_names[i] = name
             if i > 0:
                 self.id_msgdef[i] = msg
                 self.id_names[i] = name
@@ -420,43 +362,19 @@ class VPP():
                 self.logger.debug(
                     'No such message type or failed CRC checksum: %s', n)
 
                 self.logger.debug(
                     'No such message type or failed CRC checksum: %s', n)
 
-    def _write_new_cffi(self, buf):
-        """Send a binary-packed message to VPP."""
-        if not self.connected:
-            raise IOError(1, 'Not connected')
-        return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
-
-    def _write_legacy_cffi(self, buf):
-        """Send a binary-packed message to VPP."""
-        if not self.connected:
-            raise IOError(1, 'Not connected')
-        return vpp_api.vac_write(bytes(buf), len(buf))
-
-    def _read(self):
-        if not self.connected:
-            raise IOError(1, 'Not connected')
-        mem = ffi.new("char **")
-        size = ffi.new("int *")
-        rv = vpp_api.vac_read(mem, size, self.read_timeout)
-        if rv:
-            raise IOError(rv, 'vac_read failed')
-        msg = bytes(ffi.buffer(mem[0], size[0]))
-        vpp_api.vac_free(mem[0])
-        return msg
-
     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
                          do_async):
     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
                          do_async):
-        pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
-        rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
+        pfx = chroot_prefix.encode() if chroot_prefix else None
+
+        rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
         if rv != 0:
             raise IOError(2, 'Connect failed')
         if rv != 0:
             raise IOError(2, 'Connect failed')
-        self.connected = True
-        self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
+        self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
         self._register_functions(do_async=do_async)
 
         # Initialise control ping
         crc = self.messages['control_ping'].crc
         self._register_functions(do_async=do_async)
 
         # Initialise control ping
         crc = self.messages['control_ping'].crc
-        self.control_ping_index = vpp_api.vac_get_msg_index(
+        self.control_ping_index = self.transport.get_msg_index(
             ('control_ping' + '_' + crc[2:]).encode())
         self.control_ping_msgdef = self.messages['control_ping']
         if self.async_thread:
             ('control_ping' + '_' + crc[2:]).encode())
         self.control_ping_msgdef = self.messages['control_ping']
         if self.async_thread:
@@ -475,7 +393,7 @@ class VPP():
         rx_qlen - the length of the VPP message receive queue between
         client and server.
         """
         rx_qlen - the length of the VPP message receive queue between
         client and server.
         """
-        msg_handler = vac_callback_sync if not do_async else vac_callback_async
+        msg_handler = self.transport.get_callback(do_async)
         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
                                      do_async)
 
         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
                                      do_async)
 
@@ -488,13 +406,12 @@ class VPP():
         client and server.
         """
 
         client and server.
         """
 
-        return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
+        return self.connect_internal(name, None, chroot_prefix, rx_qlen,
                                      do_async=False)
 
     def disconnect(self):
         """Detach from VPP."""
                                      do_async=False)
 
     def disconnect(self):
         """Detach from VPP."""
-        rv = vpp_api.vac_disconnect()
-        self.connected = False
+        rv = self.transport.disconnect()
         self.message_queue.put("terminate event thread")
         return rv
 
         self.message_queue.put("terminate event thread")
         return rv
 
@@ -586,10 +503,16 @@ class VPP():
             context = kwargs['context']
         kwargs['_vl_msg_id'] = i
 
             context = kwargs['context']
         kwargs['_vl_msg_id'] = i
 
+        try:
+            if self.transport.socket_index:
+                kwargs['client_index'] = self.transport.socket_index
+        except AttributeError:
+            pass
         self.validate_args(msg, kwargs)
         b = msg.pack(kwargs)
         self.validate_args(msg, kwargs)
         b = msg.pack(kwargs)
-        vpp_api.vac_rx_suspend()
-        self._write(b)
+        self.transport.suspend()
+
+        self.transport.write(b)
 
         if multipart:
             # Send a ping after the request - we use its response
 
         if multipart:
             # Send a ping after the request - we use its response
@@ -599,12 +522,13 @@ class VPP():
         # Block until we get a reply.
         rl = []
         while (True):
         # Block until we get a reply.
         rl = []
         while (True):
-            msg = self._read()
+            msg = self.transport.read()
             if not msg:
                 raise IOError(2, 'VPP API client: read failed')
             r = self.decode_incoming_msg(msg)
             msgname = type(r).__name__
             if context not in r or r.context == 0 or context != r.context:
             if not msg:
                 raise IOError(2, 'VPP API client: read failed')
             r = self.decode_incoming_msg(msg)
             msgname = type(r).__name__
             if context not in r or r.context == 0 or context != r.context:
+                # Message being queued
                 self.message_queue.put_nowait(r)
                 continue
 
                 self.message_queue.put_nowait(r)
                 continue
 
@@ -616,7 +540,7 @@ class VPP():
 
             rl.append(r)
 
 
             rl.append(r)
 
-        vpp_api.vac_rx_resume()
+        self.transport.resume()
 
         return rl
 
 
         return rl
 
@@ -634,11 +558,15 @@ class VPP():
             kwargs['context'] = context
         else:
             context = kwargs['context']
             kwargs['context'] = context
         else:
             context = kwargs['context']
-        kwargs['client_index'] = 0
+        try:
+            if self.transport.socket_index:
+                kwargs['client_index'] = self.transport.socket_index
+        except AttributeError:
+            kwargs['client_index'] = 0
         kwargs['_vl_msg_id'] = i
         b = msg.pack(kwargs)
 
         kwargs['_vl_msg_id'] = i
         b = msg.pack(kwargs)
 
-        self._write(b)
+        self.transport.write(b)
 
     def register_event_callback(self, callback):
         """Register a callback for async messages.
 
     def register_event_callback(self, callback):
         """Register a callback for async messages.
@@ -659,7 +587,7 @@ class VPP():
         self.event_callback = callback
 
     def thread_msg_handler(self):
         self.event_callback = callback
 
     def thread_msg_handler(self):
-        """Python thread calling the user registerd message handler.
+        """Python thread calling the user registered message handler.
 
         This is to emulate the old style event callback scheme. Modern
         clients should provide their own thread to poll the event
 
         This is to emulate the old style event callback scheme. Modern
         clients should provide their own thread to poll the event