Python API: Synchronous mode.
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
index 83247ff..0c40f17 100644 (file)
@@ -16,7 +16,7 @@
 
 from __future__ import print_function
 import sys, os, logging, collections, struct, json, threading, glob
-import atexit
+import atexit, Queue
 
 logging.basicConfig(level=logging.DEBUG)
 import vpp_api
@@ -57,7 +57,7 @@ class VPP():
     provides a means to register a callback function to receive
     these messages in a background thread.
     """
-    def __init__(self, apifiles = None, testmode = False):
+    def __init__(self, apifiles = None, testmode = False, async_thread = True):
         """Create a VPP API object.
 
         apifiles is a list of files containing API
@@ -72,11 +72,15 @@ class VPP():
         self.buffersize = 10000
         self.connected = False
         self.header = struct.Struct('>HI')
-        self.results_lock = threading.Lock()
-        self.results = {}
-        self.timeout = 5
         self.apifiles = []
         self.event_callback = None
+        self.message_queue = Queue.Queue()
+        self.read_timeout = 0
+        self.vpp_api = vpp_api
+        if async_thread:
+            self.event_thread = threading.Thread(target=self.thread_msg_handler)
+            self.event_thread.daemon = True
+            self.event_thread.start()
 
         if not apifiles:
             # Pick up API definitions from default directory
@@ -346,7 +350,7 @@ class VPP():
                 f = self.make_function(name, i, msgdef, multipart, async)
                 setattr(self._api, name, FuncWrapper(f))
 
-                # olf API stuff starts here - will be removed in 17.07
+                # old API stuff starts here - will be removed in 17.07
                 if hasattr(self, name):
                     raise NameError(
                         3, "Conflicting name in JSON definition: `%s'" % name)
@@ -359,6 +363,12 @@ class VPP():
             raise IOError(1, 'Not connected')
         return vpp_api.write(str(buf))
 
+    def _read (self):
+        if not self.connected:
+            raise IOError(1, 'Not connected')
+
+        return vpp_api.read(self.read_timeout)
+
     def _load_dictionary(self):
         self.vpp_dictionary = {}
         self.vpp_dictionary_maxid = 0
@@ -372,6 +382,19 @@ class VPP():
             self.vpp_dictionary[name] = { 'id' : i, 'crc' : crc }
             self.vpp_dictionary_maxid = max(self.vpp_dictionary_maxid, i)
 
+    def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, async):
+       rv = vpp_api.connect(name, msg_handler, chroot_prefix, rx_qlen)
+        if rv != 0:
+            raise IOError(2, 'Connect failed')
+        self.connected = True
+
+        self._load_dictionary()
+        self._register_functions(async=async)
+
+        # Initialise control ping
+        self.control_ping_index = self.vpp_dictionary['control_ping']['id']
+        self.control_ping_msgdef = self.messages['control_ping']
+
     def connect(self, name, chroot_prefix = None, async = False, rx_qlen = 32):
         """Attach to VPP.
 
@@ -381,22 +404,22 @@ class VPP():
         rx_qlen - the length of the VPP message receive queue between
         client and server.
         """
-        msg_handler = self.msg_handler_sync if not async else self.msg_handler_async
-       if chroot_prefix is not None:
-           rv = vpp_api.connect(name, msg_handler, rx_qlen, chroot_prefix)
-        else:
-           rv = vpp_api.connect(name, msg_handler, rx_qlen)
+        msg_handler = self.msg_handler_sync if not async \
+                      else self.msg_handler_async
+        return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
+                                     async)
 
-        if rv != 0:
-            raise IOError(2, 'Connect failed')
-        self.connected = True
+    def connect_sync (self, name, chroot_prefix = None, rx_qlen = 32):
+        """Attach to VPP in synchronous mode. Application must poll for events.
 
-        self._load_dictionary()
-        self._register_functions(async=async)
+        name - the name of the client.
+        chroot_prefix - if VPP is chroot'ed, the prefix of the jail
+        rx_qlen - the length of the VPP message receive queue between
+        client and server.
+        """
 
-        # Initialise control ping
-        self.control_ping_index = self.vpp_dictionary['control_ping']['id']
-        self.control_ping_msgdef = self.messages['control_ping']
+        return self.connect_internal(name, None, chroot_prefix, rx_qlen,
+                                     async=False)
 
     def disconnect(self):
         """Detach from VPP."""
@@ -404,56 +427,6 @@ class VPP():
         self.connected = False
         return rv
 
-    def results_wait(self, context):
-        """In a sync call, wait for the reply
-
-        The context ID is used to pair reply to request.
-        """
-
-        # Results is filled by the background callback.  It will
-        # raise the event when the context receives a response.
-        # Given there are two threads we have to be careful with the
-        # use of results and the structures under it, hence the lock.
-        with self.results_lock:
-            result = self.results[context]
-            ev = result['e']
-
-       timed_out = not ev.wait(self.timeout)
-
-       if timed_out:
-          raise IOError(3, 'Waiting for reply timed out')
-       else:
-           with self.results_lock:
-                result = self.results[context]
-               del self.results[context]
-               return result['r']
-
-    def results_prepare(self, context, multi=False):
-        """Prep for receiving a result in response to a request msg
-
-        context - unique context number sent in request and
-        returned in reply or replies
-        multi - true if we expect multiple messages from this
-        reply.
-        """
-
-        # The event is used to indicate that all results are in
-        new_result = {
-            'e': threading.Event(),
-        }
-        if multi:
-            # Make it clear to the BG thread it's going to see several
-            # messages; messages are stored in a results array
-            new_result['m'] = True
-            new_result['r'] = []
-
-        new_result['e'].clear()
-
-        # Put the prepped result structure into results, at which point
-        # the bg thread can also access it (hence the thread lock)
-        with self.results_lock:
-            self.results[context] = new_result
-
     def msg_handler_sync(self, msg):
         """Process an incoming message from VPP in sync mode.
 
@@ -473,32 +446,9 @@ class VPP():
 
         if context == 0:
             # No context -> async notification that we feed to the callback
-           if self.event_callback:
-               self.event_callback(msgname, r)
+            self.message_queue.put_nowait(r)
         else:
-            # Context -> use the results structure (carefully) to find
-            # who we're responding to and return the message to that
-            # thread
-            with self.results_lock:
-                if context not in self.results:
-                    eprint('Not expecting results for this context', context, r)
-                else:
-                    result = self.results[context]
-
-                    #
-                    # Collect results until control ping
-                    #
-
-                    if msgname == 'control_ping_reply':
-                        # End of a multipart
-                        result['e'].set()
-                    elif 'm' in self.results[context]:
-                        # One element in a multipart
-                        result['r'].append(r)
-                    else:
-                        # All of a single result
-                        result['r'] = r
-                        result['e'].set()
+            raise IOError(2, 'RPC reply message received in event handler')
 
     def decode_incoming_msg(self, msg):
         if not msg:
@@ -556,16 +506,16 @@ class VPP():
         no response within the timeout window.
         """
 
-        # We need a context if not supplied, in order to get the
-        # response
-        context = kwargs.get('context', self.get_context())
-       kwargs['context'] = context
-
-        # Set up to receive a response
-        self.results_prepare(context, multi=multipart)
+        if not 'context' in kwargs:
+            context = self.get_context()
+            kwargs['context'] = context
+        else:
+            context = kwargs['context']
+        kwargs['_vl_msg_id'] = i
+        b = self.encode(msgdef, kwargs)
 
-        # Output the message
-        self._call_vpp_async(i, msgdef, **kwargs)
+        vpp_api.suspend()
+        self._write(b)
 
         if multipart:
             # Send a ping after the request - we use its response
@@ -573,9 +523,30 @@ class VPP():
             self._control_ping(context)
 
         # Block until we get a reply.
-        r = self.results_wait(context)
+        rl = []
+        while (True):
+            msg = self._read()
+            if not msg:
+                print('PNEUM ERROR: OH MY GOD')
+                raise IOError(2, 'PNEUM read failed')
+
+            r = self.decode_incoming_msg(msg)
+            msgname = type(r).__name__
+            if not context in r or r.context == 0 or context != r.context:
+                self.message_queue.put_nowait(r)
+                continue
 
-        return r
+            if not multipart:
+                rl = r
+                break
+            if msgname == 'control_ping_reply':
+                break
+
+            rl.append(r)
+
+        vpp_api.resume()
+
+        return rl
 
     def _call_vpp_async(self, i, msgdef, **kwargs):
         """Given a message, send the message and await a reply.
@@ -613,3 +584,16 @@ class VPP():
         callback.
         """
         self.event_callback = callback
+
+    def thread_msg_handler(self):
+        """Python thread calling the user registerd message handler.
+
+        This is to emulate the old style event callback scheme. Modern
+        clients should provide their own thread to poll the event
+        queue.
+        """
+        while True:
+            r = self.message_queue.get()
+            msgname = type(r).__name__
+           if self.event_callback:
+               self.event_callback(msgname, r)