papi: avoid IOError on disconnect 44/19844/2
authorVratko Polak <vrpolak@cisco.com>
Mon, 27 May 2019 16:36:23 +0000 (18:36 +0200)
committerOle Trøan <otroan@employees.org>
Tue, 28 May 2019 08:49:26 +0000 (08:49 +0000)
Change-Id: I331efb20b98a7e3c507d9158d0221ee7d5353b18
Signed-off-by: Vratko Polak <vrpolak@cisco.com>
src/vpp-api/python/vpp_papi/vpp_papi.py
src/vpp-api/python/vpp_papi/vpp_transport_socket.py

index f29c250..cd1f2e5 100644 (file)
@@ -465,6 +465,8 @@ class VPPApiClient(object):
                 target=self.thread_msg_handler)
             self.event_thread.daemon = True
             self.event_thread.start()
+        else:
+            self.event_thread = None
         return rv
 
     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
@@ -495,7 +497,8 @@ class VPPApiClient(object):
     def disconnect(self):
         """Detach from VPP."""
         rv = self.transport.disconnect()
-        self.message_queue.put("terminate event thread")
+        if self.event_thread is not None:
+            self.message_queue.put("terminate event thread")
         return rv
 
     def msg_handler_sync(self, msg):
index 115a2c2..6989e9a 100644 (file)
@@ -29,9 +29,13 @@ class VppTransport(object):
         self.server_address = server_address
         self.header = struct.Struct('>QII')
         self.message_table = {}
+        # These queues can be accessed async.
+        # They are always up, but replaced on connect.
+        # TODO: Use multiprocessing.Pipe instead of multiprocessing.Queue
+        # if possible.
+        self.sque = multiprocessing.Queue()
+        self.q = multiprocessing.Queue()
         # The following fields are set in connect().
-        self.sque = None
-        self.q = None
         self.message_thread = None
         self.socket = None
 
@@ -92,7 +96,13 @@ class VppTransport(object):
 
         self.connected = True
 
-        # TODO: Can this block be moved even later?
+        # Queues' feeder threads from previous connect may still be sending.
+        # Close and join to avoid any errors.
+        self.sque.close()
+        self.q.close()
+        self.sque.join_thread()
+        self.q.join_thread()
+        # Finally safe to replace.
         self.sque = multiprocessing.Queue()
         self.q = multiprocessing.Queue()
         self.message_thread = threading.Thread(target=self.msg_thread_func)
@@ -143,10 +153,9 @@ class VppTransport(object):
             # Allow additional connect() calls.
             self.message_thread.join()
         # Collect garbage.
-        self.sque = None
-        self.q = None
         self.message_thread = None
         self.socket = None
+        # Queues will be collected after connect replaces them.
         return rv
 
     def suspend(self):