PAPI: Add MACAddress object wrapper for vl_api_mac_address_t
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
index 5ff8064..9c4ede9 100644 (file)
@@ -15,6 +15,7 @@
 #
 
 from __future__ import print_function
 #
 
 from __future__ import print_function
+from __future__ import absolute_import
 import sys
 import os
 import logging
 import sys
 import os
 import logging
@@ -25,73 +26,61 @@ import threading
 import fnmatch
 import weakref
 import atexit
 import fnmatch
 import weakref
 import atexit
-from cffi import FFI
-import cffi
-from vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
+from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
+from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
+from . macaddress import MACAddress, mac_pton, mac_ntop
+
+logger = logging.getLogger(__name__)
 
 if sys.version[0] == '2':
     import Queue as queue
 else:
     import queue as queue
 
 
 if sys.version[0] == '2':
     import Queue as queue
 else:
     import queue as queue
 
-ffi = FFI()
-ffi.cdef("""
-typedef void (*vac_callback_t)(unsigned char * data, int len);
-typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
-int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
-    int rx_qlen);
-int vac_disconnect(void);
-int vac_read(char **data, int *l, unsigned short timeout);
-int vac_write(char *data, int len);
-void vac_free(void * msg);
 
 
-int vac_get_msg_index(unsigned char * name);
-int vac_msg_table_size(void);
-int vac_msg_table_max_index(void);
+class VppEnumType(type):
+    def __getattr__(cls, name):
+        t = vpp_get_type(name)
+        return t.enum
 
 
-void vac_rx_suspend (void);
-void vac_rx_resume (void);
-void vac_set_error_handler(vac_error_callback_t);
- """)
 
 
-# Barfs on failure, no need to check success.
-vpp_api = ffi.dlopen('libvppapiclient.so')
+# Python3
+# class VppEnum(metaclass=VppEnumType):
+#    pass
+class VppEnum(object):
+    __metaclass__ = VppEnumType
 
 
 def vpp_atexit(vpp_weakref):
     """Clean up VPP connection on shutdown."""
     vpp_instance = vpp_weakref()
 
 
 def vpp_atexit(vpp_weakref):
     """Clean up VPP connection on shutdown."""
     vpp_instance = vpp_weakref()
-    if vpp_instance.connected:
+    if vpp_instance and vpp_instance.transport.connected:
         vpp_instance.logger.debug('Cleaning up VPP on exit')
         vpp_instance.disconnect()
 
 
         vpp_instance.logger.debug('Cleaning up VPP on exit')
         vpp_instance.disconnect()
 
 
-vpp_object = None
-
-
-def vpp_iterator(d):
-    if sys.version[0] == '2':
+if sys.version[0] == '2':
+    def vpp_iterator(d):
         return d.iteritems()
         return d.iteritems()
-    else:
+else:
+    def vpp_iterator(d):
         return d.items()
 
 
         return d.items()
 
 
-@ffi.callback("void(unsigned char *, int)")
-def vac_callback_sync(data, len):
-    vpp_object.msg_handler_sync(ffi.buffer(data, len))
-
+def call_logger(msgdef, kwargs):
+    s = 'Calling {}('.format(msgdef.name)
+    for k, v in kwargs.items():
+        s += '{}:{} '.format(k, v)
+    s += ')'
+    return s
 
 
-@ffi.callback("void(unsigned char *, int)")
-def vac_callback_async(data, len):
-    vpp_object.msg_handler_async(ffi.buffer(data, len))
 
 
+def return_logger(r):
+    s = 'Return from {}'.format(r)
+    return s
 
 
-@ffi.callback("void(void *, unsigned char *, int)")
-def vac_error_handler(arg, msg, msg_len):
-    vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
 
 
-
-class Empty(object):
+class VppApiDynamicMethodHolder(object):
     pass
 
 
     pass
 
 
@@ -104,10 +93,27 @@ class FuncWrapper(object):
         return self._func(**kwargs)
 
 
         return self._func(**kwargs)
 
 
-class VPPMessage(VPPType):
+class VPPApiError(Exception):
+    pass
+
+
+class VPPNotImplementedError(NotImplementedError):
+    pass
+
+
+class VPPIOError(IOError):
+    pass
+
+
+class VPPRuntimeError(RuntimeError):
+    pass
+
+
+class VPPValueError(ValueError):
     pass
 
     pass
 
-class VPP():
+
+class VPP(object):
     """VPP interface.
 
     This class provides the APIs to VPP.  The APIs are loaded
     """VPP interface.
 
     This class provides the APIs to VPP.  The APIs are loaded
@@ -119,6 +125,11 @@ class VPP():
     provides a means to register a callback function to receive
     these messages in a background thread.
     """
     provides a means to register a callback function to receive
     these messages in a background thread.
     """
+    VPPApiError = VPPApiError
+    VPPRuntimeError = VPPRuntimeError
+    VPPValueError = VPPValueError
+    VPPNotImplementedError = VPPNotImplementedError
+    VPPIOError = VPPIOError
 
     def process_json_file(self, apidef_file):
         api = json.load(apidef_file)
 
     def process_json_file(self, apidef_file):
         api = json.load(apidef_file)
@@ -132,44 +143,54 @@ class VPP():
         for t in api['types']:
             t[0] = 'vl_api_' + t[0] + '_t'
             types[t[0]] = {'type': 'type', 'data': t}
         for t in api['types']:
             t[0] = 'vl_api_' + t[0] + '_t'
             types[t[0]] = {'type': 'type', 'data': t}
+        for t, v in api['aliases'].items():
+            types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
+        self.services.update(api['services'])
 
         i = 0
         while True:
             unresolved = {}
             for k, v in types.items():
                 t = v['data']
 
         i = 0
         while True:
             unresolved = {}
             for k, v in types.items():
                 t = v['data']
-                if v['type'] == 'enum':
-                    try:
-                        VPPEnumType(t[0], t[1:])
-                    except ValueError:
-                        unresolved[k] = v
-                elif v['type'] == 'union':
-                    try:
-                        VPPUnionType(t[0], t[1:])
-                    except ValueError:
-                        unresolved[k] = v
-                elif v['type'] == 'type':
-                    try:
-                        VPPType(t[0], t[1:])
-                    except ValueError:
-                        unresolved[k] = v
+                if not vpp_get_type(k):
+                    if v['type'] == 'enum':
+                        try:
+                            VPPEnumType(t[0], t[1:])
+                        except ValueError:
+                            unresolved[k] = v
+                    elif v['type'] == 'union':
+                        try:
+                            VPPUnionType(t[0], t[1:])
+                        except ValueError:
+                            unresolved[k] = v
+                    elif v['type'] == 'type':
+                        try:
+                            VPPType(t[0], t[1:])
+                        except ValueError:
+                            unresolved[k] = v
+                    elif v['type'] == 'alias':
+                        try:
+                            VPPTypeAlias(k, t)
+                        except ValueError:
+                            unresolved[k] = v
             if len(unresolved) == 0:
                 break
             if i > 3:
             if len(unresolved) == 0:
                 break
             if i > 3:
-                raise ValueError('Unresolved type definitions {}'
-                                 .format(unresolved))
+                raise VPPValueError('Unresolved type definitions {}'
+                                    .format(unresolved))
             types = unresolved
             i += 1
 
         for m in api['messages']:
             try:
                 self.messages[m[0]] = VPPMessage(m[0], m[1:])
             types = unresolved
             i += 1
 
         for m in api['messages']:
             try:
                 self.messages[m[0]] = VPPMessage(m[0], m[1:])
-            except NotImplementedError:
+            except VPPNotImplementedError:
                 self.logger.error('Not implemented error for {}'.format(m[0]))
 
     def __init__(self, apifiles=None, testmode=False, async_thread=True,
                 self.logger.error('Not implemented error for {}'.format(m[0]))
 
     def __init__(self, apifiles=None, testmode=False, async_thread=True,
-                 logger=logging.getLogger('vpp_papi'), loglevel='debug',
-                 read_timeout=0):
+                 logger=None, loglevel=None,
+                 read_timeout=5, use_socket=False,
+                 server_address='/run/vpp-api.sock'):
         """Create a VPP API object.
 
         apifiles is a list of files containing API
         """Create a VPP API object.
 
         apifiles is a list of files containing API
@@ -182,9 +203,6 @@ class VPP():
         loglevel, if supplied, is the log level this logger is set
         to report at (from the loglevels in the logging module).
         """
         loglevel, if supplied, is the log level this logger is set
         to report at (from the loglevels in the logging module).
         """
-        global vpp_object
-        vpp_object = self
-
         if logger is None:
             logger = logging.getLogger(__name__)
             if loglevel is not None:
         if logger is None:
             logger = logging.getLogger(__name__)
             if loglevel is not None:
@@ -192,18 +210,22 @@ class VPP():
         self.logger = logger
 
         self.messages = {}
         self.logger = logger
 
         self.messages = {}
+        self.services = {}
         self.id_names = []
         self.id_msgdef = []
         self.id_names = []
         self.id_msgdef = []
-        self.connected = False
         self.header = VPPType('header', [['u16', 'msgid'],
                                          ['u32', 'client_index']])
         self.apifiles = []
         self.event_callback = None
         self.message_queue = queue.Queue()
         self.read_timeout = read_timeout
         self.header = VPPType('header', [['u16', 'msgid'],
                                          ['u32', 'client_index']])
         self.apifiles = []
         self.event_callback = None
         self.message_queue = queue.Queue()
         self.read_timeout = read_timeout
-        self.vpp_api = vpp_api
         self.async_thread = async_thread
 
         self.async_thread = async_thread
 
+        if use_socket:
+            from . vpp_transport_socket import VppTransport
+        else:
+            from . vpp_transport_shmem import VppTransport
+
         if not apifiles:
             # Pick up API definitions from default directory
             try:
         if not apifiles:
             # Pick up API definitions from default directory
             try:
@@ -213,7 +235,7 @@ class VPP():
                 if testmode:
                     apifiles = []
                 else:
                 if testmode:
                     apifiles = []
                 else:
-                    raise
+                    raise VPPRuntimeError
 
         for file in apifiles:
             with open(file) as apidef_file:
 
         for file in apifiles:
             with open(file) as apidef_file:
@@ -223,24 +245,13 @@ class VPP():
 
         # Basic sanity check
         if len(self.messages) == 0 and not testmode:
 
         # Basic sanity check
         if len(self.messages) == 0 and not testmode:
-            raise ValueError(1, 'Missing JSON message definitions')
+            raise VPPValueError(1, 'Missing JSON message definitions')
 
 
+        self.transport = VppTransport(self, read_timeout=read_timeout,
+                                      server_address=server_address)
         # Make sure we allow VPP to clean up the message rings.
         atexit.register(vpp_atexit, weakref.ref(self))
 
         # Make sure we allow VPP to clean up the message rings.
         atexit.register(vpp_atexit, weakref.ref(self))
 
-        # Register error handler
-        if not testmode:
-            vpp_api.vac_set_error_handler(vac_error_handler)
-
-        # Support legacy CFFI
-        # from_buffer supported from 1.8.0
-        (major, minor, patch) = [int(s) for s in
-                                 cffi.__version__.split('.', 3)]
-        if major >= 1 and minor >= 8:
-            self._write = self._write_new_cffi
-        else:
-            self._write = self._write_legacy_cffi
-
     class ContextId(object):
         """Thread-safe provider of unique context IDs."""
         def __init__(self):
     class ContextId(object):
         """Thread-safe provider of unique context IDs."""
         def __init__(self):
@@ -254,6 +265,9 @@ class VPP():
                 return self.context
     get_context = ContextId()
 
                 return self.context
     get_context = ContextId()
 
+    def get_type(self, name):
+        return vpp_get_type(name)
+
     @classmethod
     def find_api_dir(cls):
         """Attempt to find the best directory in which API definition
     @classmethod
     def find_api_dir(cls):
         """Attempt to find the best directory in which API definition
@@ -362,7 +376,7 @@ class VPP():
         if api_dir is None:
             api_dir = cls.find_api_dir()
             if api_dir is None:
         if api_dir is None:
             api_dir = cls.find_api_dir()
             if api_dir is None:
-                raise RuntimeError("api_dir cannot be located")
+                raise VPPApiError("api_dir cannot be located")
 
         if isinstance(patterns, list) or isinstance(patterns, tuple):
             patterns = [p.strip() + '.api.json' for p in patterns]
 
         if isinstance(patterns, list) or isinstance(patterns, tuple):
             patterns = [p.strip() + '.api.json' for p in patterns]
@@ -378,19 +392,14 @@ class VPP():
 
         return api_files
 
 
         return api_files
 
-    def status(self):
-        """Debug function: report current VPP API status to stdout."""
-        print('Connected') if self.connected else print('Not Connected')
-        print('Read API definitions from', ', '.join(self.apifiles))
-
     @property
     def api(self):
         if not hasattr(self, "_api"):
     @property
     def api(self):
         if not hasattr(self, "_api"):
-            raise Exception("Not connected, api definitions not available")
+            raise VPPApiError("Not connected, api definitions not available")
         return self._api
 
         return self._api
 
-    def make_function(self, msg, i, multipart, async):
-        if (async):
+    def make_function(self, msg, i, multipart, do_async):
+        if (do_async):
             def f(**kwargs):
                 return self._call_vpp_async(i, msg, **kwargs)
         else:
             def f(**kwargs):
                 return self._call_vpp_async(i, msg, **kwargs)
         else:
@@ -399,64 +408,47 @@ class VPP():
 
         f.__name__ = str(msg.name)
         f.__doc__ = ", ".join(["%s %s" %
 
         f.__name__ = str(msg.name)
         f.__doc__ = ", ".join(["%s %s" %
-                               (msg.fieldtypes[j], k) for j, k in enumerate(msg.fields)])
+                               (msg.fieldtypes[j], k)
+                               for j, k in enumerate(msg.fields)])
         return f
 
         return f
 
-    def _register_functions(self, async=False):
+    def _register_functions(self, do_async=False):
         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
-        self._api = Empty()
+        self._api = VppApiDynamicMethodHolder()
         for name, msg in vpp_iterator(self.messages):
             n = name + '_' + msg.crc[2:]
         for name, msg in vpp_iterator(self.messages):
             n = name + '_' + msg.crc[2:]
-            i = vpp_api.vac_get_msg_index(n.encode())
+            i = self.transport.get_msg_index(n.encode())
             if i > 0:
                 self.id_msgdef[i] = msg
                 self.id_names[i] = name
             if i > 0:
                 self.id_msgdef[i] = msg
                 self.id_names[i] = name
-                # TODO: Fix multipart (use services)
-                multipart = True if name.find('_dump') > 0 else False
-                f = self.make_function(msg, i, multipart, async)
-                setattr(self._api, name, FuncWrapper(f))
+
+                # Create function for client side messages.
+                if name in self.services:
+                    if 'stream' in self.services[name] and \
+                       self.services[name]['stream']:
+                        multipart = True
+                    else:
+                        multipart = False
+                    f = self.make_function(msg, i, multipart, do_async)
+                    setattr(self._api, name, FuncWrapper(f))
             else:
                 self.logger.debug(
                     'No such message type or failed CRC checksum: %s', n)
 
             else:
                 self.logger.debug(
                     'No such message type or failed CRC checksum: %s', n)
 
-    def _write_new_cffi(self, buf):
-        """Send a binary-packed message to VPP."""
-        if not self.connected:
-            raise IOError(1, 'Not connected')
-        return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
-
-    def _write_legacy_cffi(self, buf):
-        """Send a binary-packed message to VPP."""
-        if not self.connected:
-            raise IOError(1, 'Not connected')
-        return vpp_api.vac_write(bytes(buf), len(buf))
-
-    def _read(self):
-        if not self.connected:
-            raise IOError(1, 'Not connected')
-        mem = ffi.new("char **")
-        size = ffi.new("int *")
-        rv = vpp_api.vac_read(mem, size, self.read_timeout)
-        if rv:
-            raise IOError(rv, 'vac_read failed')
-        msg = bytes(ffi.buffer(mem[0], size[0]))
-        vpp_api.vac_free(mem[0])
-        return msg
-
     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
-                         async):
-        pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
-        rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
+                         do_async):
+        pfx = chroot_prefix.encode() if chroot_prefix else None
+
+        rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
         if rv != 0:
         if rv != 0:
-            raise IOError(2, 'Connect failed')
-        self.connected = True
-        self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
-        self._register_functions(async=async)
+            raise VPPIOError(2, 'Connect failed')
+        self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
+        self._register_functions(do_async=do_async)
 
         # Initialise control ping
         crc = self.messages['control_ping'].crc
 
         # Initialise control ping
         crc = self.messages['control_ping'].crc
-        self.control_ping_index = vpp_api.vac_get_msg_index(
+        self.control_ping_index = self.transport.get_msg_index(
             ('control_ping' + '_' + crc[2:]).encode())
         self.control_ping_msgdef = self.messages['control_ping']
         if self.async_thread:
             ('control_ping' + '_' + crc[2:]).encode())
         self.control_ping_msgdef = self.messages['control_ping']
         if self.async_thread:
@@ -466,18 +458,18 @@ class VPP():
             self.event_thread.start()
         return rv
 
             self.event_thread.start()
         return rv
 
-    def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
+    def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
         """Attach to VPP.
 
         name - the name of the client.
         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
         """Attach to VPP.
 
         name - the name of the client.
         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
-        async - if true, messages are sent without waiting for a reply
+        do_async - if true, messages are sent without waiting for a reply
         rx_qlen - the length of the VPP message receive queue between
         client and server.
         """
         rx_qlen - the length of the VPP message receive queue between
         client and server.
         """
-        msg_handler = vac_callback_sync if not async else vac_callback_async
+        msg_handler = self.transport.get_callback(do_async)
         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
-                                     async)
+                                     do_async)
 
     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
         """Attach to VPP in synchronous mode. Application must poll for events.
 
     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
         """Attach to VPP in synchronous mode. Application must poll for events.
@@ -488,13 +480,12 @@ class VPP():
         client and server.
         """
 
         client and server.
         """
 
-        return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
-                                     async=False)
+        return self.connect_internal(name, None, chroot_prefix, rx_qlen,
+                                     do_async=False)
 
     def disconnect(self):
         """Detach from VPP."""
 
     def disconnect(self):
         """Detach from VPP."""
-        rv = vpp_api.vac_disconnect()
-        self.connected = False
+        rv = self.transport.disconnect()
         self.message_queue.put("terminate event thread")
         return rv
 
         self.message_queue.put("terminate event thread")
         return rv
 
@@ -517,14 +508,34 @@ class VPP():
             # No context -> async notification that we feed to the callback
             self.message_queue.put_nowait(r)
         else:
             # No context -> async notification that we feed to the callback
             self.message_queue.put_nowait(r)
         else:
-            raise IOError(2, 'RPC reply message received in event handler')
+            raise VPPIOError(2, 'RPC reply message received in event handler')
+
+    def has_context(self, msg):
+        if len(msg) < 10:
+            return False
+
+        header = VPPType('header_with_context', [['u16', 'msgid'],
+                                                 ['u32', 'client_index'],
+                                                 ['u32', 'context']])
+
+        (i, ci, context), size = header.unpack(msg, 0)
+        if self.id_names[i] == 'rx_thread_exit':
+            return
 
 
-    def decode_incoming_msg(self, msg):
+        #
+        # Decode message and returns a tuple.
+        #
+        msgobj = self.id_msgdef[i]
+        if 'context' in msgobj.field_by_name and context >= 0:
+            return True
+        return False
+
+    def decode_incoming_msg(self, msg, no_type_conversion=False):
         if not msg:
             self.logger.warning('vpp_api.read failed')
             return
 
         if not msg:
             self.logger.warning('vpp_api.read failed')
             return
 
-        i, ci = self.header.unpack(msg, 0)
+        (i, ci), size = self.header.unpack(msg, 0)
         if self.id_names[i] == 'rx_thread_exit':
             return
 
         if self.id_names[i] == 'rx_thread_exit':
             return
 
@@ -533,10 +544,9 @@ class VPP():
         #
         msgobj = self.id_msgdef[i]
         if not msgobj:
         #
         msgobj = self.id_msgdef[i]
         if not msgobj:
-            raise IOError(2, 'Reply message undefined')
-
-        r = msgobj.unpack(msg)
+            raise VPPIOError(2, 'Reply message undefined')
 
 
+        r, size = msgobj.unpack(msg, ntc=no_type_conversion)
         return r
 
     def msg_handler_async(self, msg):
         return r
 
     def msg_handler_async(self, msg):
@@ -562,9 +572,10 @@ class VPP():
     def validate_args(self, msg, kwargs):
         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
         if d:
     def validate_args(self, msg, kwargs):
         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
         if d:
-            raise ValueError('Invalid argument {} to {}'.format(list(d), msg.name))
+            raise VPPValueError('Invalid argument {} to {}'
+                                .format(list(d), msg.name))
 
 
-    def _call_vpp(self, i, msg, multipart, **kwargs):
+    def _call_vpp(self, i, msgdef, multipart, **kwargs):
         """Given a message, send the message and await a reply.
 
         msgdef - the message packing definition
         """Given a message, send the message and await a reply.
 
         msgdef - the message packing definition
@@ -587,10 +598,21 @@ class VPP():
             context = kwargs['context']
         kwargs['_vl_msg_id'] = i
 
             context = kwargs['context']
         kwargs['_vl_msg_id'] = i
 
-        self.validate_args(msg, kwargs)
-        b = msg.pack(kwargs)
-        vpp_api.vac_rx_suspend()
-        self._write(b)
+        no_type_conversion = kwargs.pop('_no_type_conversion', False)
+
+        try:
+            if self.transport.socket_index:
+                kwargs['client_index'] = self.transport.socket_index
+        except AttributeError:
+            pass
+        self.validate_args(msgdef, kwargs)
+
+        logging.debug(call_logger(msgdef, kwargs))
+
+        b = msgdef.pack(kwargs)
+        self.transport.suspend()
+
+        self.transport.write(b)
 
         if multipart:
             # Send a ping after the request - we use its response
 
         if multipart:
             # Send a ping after the request - we use its response
@@ -600,12 +622,13 @@ class VPP():
         # Block until we get a reply.
         rl = []
         while (True):
         # Block until we get a reply.
         rl = []
         while (True):
-            msg = self._read()
+            msg = self.transport.read()
             if not msg:
             if not msg:
-                raise IOError(2, 'VPP API client: read failed')
-            r = self.decode_incoming_msg(msg)
+                raise VPPIOError(2, 'VPP API client: read failed')
+            r = self.decode_incoming_msg(msg, no_type_conversion)
             msgname = type(r).__name__
             if context not in r or r.context == 0 or context != r.context:
             msgname = type(r).__name__
             if context not in r or r.context == 0 or context != r.context:
+                # Message being queued
                 self.message_queue.put_nowait(r)
                 continue
 
                 self.message_queue.put_nowait(r)
                 continue
 
@@ -617,8 +640,9 @@ class VPP():
 
             rl.append(r)
 
 
             rl.append(r)
 
-        vpp_api.vac_rx_resume()
+        self.transport.resume()
 
 
+        logger.debug(return_logger(rl))
         return rl
 
     def _call_vpp_async(self, i, msg, **kwargs):
         return rl
 
     def _call_vpp_async(self, i, msg, **kwargs):
@@ -635,11 +659,15 @@ class VPP():
             kwargs['context'] = context
         else:
             context = kwargs['context']
             kwargs['context'] = context
         else:
             context = kwargs['context']
-        kwargs['client_index'] = 0
+        try:
+            if self.transport.socket_index:
+                kwargs['client_index'] = self.transport.socket_index
+        except AttributeError:
+            kwargs['client_index'] = 0
         kwargs['_vl_msg_id'] = i
         b = msg.pack(kwargs)
 
         kwargs['_vl_msg_id'] = i
         b = msg.pack(kwargs)
 
-        self._write(b)
+        self.transport.write(b)
 
     def register_event_callback(self, callback):
         """Register a callback for async messages.
 
     def register_event_callback(self, callback):
         """Register a callback for async messages.
@@ -660,7 +688,7 @@ class VPP():
         self.event_callback = callback
 
     def thread_msg_handler(self):
         self.event_callback = callback
 
     def thread_msg_handler(self):
-        """Python thread calling the user registerd message handler.
+        """Python thread calling the user registered message handler.
 
         This is to emulate the old style event callback scheme. Modern
         clients should provide their own thread to poll the event
 
         This is to emulate the old style event callback scheme. Modern
         clients should provide their own thread to poll the event