From 94e4531a02099eb017e84f42a1d5ed0a43c32dd0 Mon Sep 17 00:00:00 2001 From: Vratko Polak Date: Mon, 27 May 2019 18:36:23 +0200 Subject: [PATCH] papi: avoid IOError on disconnect Change-Id: I331efb20b98a7e3c507d9158d0221ee7d5353b18 Signed-off-by: Vratko Polak --- src/vpp-api/python/vpp_papi/vpp_papi.py | 5 ++++- src/vpp-api/python/vpp_papi/vpp_transport_socket.py | 19 ++++++++++++++----- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/src/vpp-api/python/vpp_papi/vpp_papi.py b/src/vpp-api/python/vpp_papi/vpp_papi.py index f29c250744d..cd1f2e549de 100644 --- a/src/vpp-api/python/vpp_papi/vpp_papi.py +++ b/src/vpp-api/python/vpp_papi/vpp_papi.py @@ -465,6 +465,8 @@ class VPPApiClient(object): target=self.thread_msg_handler) self.event_thread.daemon = True self.event_thread.start() + else: + self.event_thread = None return rv def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32): @@ -495,7 +497,8 @@ class VPPApiClient(object): def disconnect(self): """Detach from VPP.""" rv = self.transport.disconnect() - self.message_queue.put("terminate event thread") + if self.event_thread is not None: + self.message_queue.put("terminate event thread") return rv def msg_handler_sync(self, msg): diff --git a/src/vpp-api/python/vpp_papi/vpp_transport_socket.py b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py index 115a2c26428..6989e9ac9ba 100644 --- a/src/vpp-api/python/vpp_papi/vpp_transport_socket.py +++ b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py @@ -29,9 +29,13 @@ class VppTransport(object): self.server_address = server_address self.header = struct.Struct('>QII') self.message_table = {} + # These queues can be accessed async. + # They are always up, but replaced on connect. + # TODO: Use multiprocessing.Pipe instead of multiprocessing.Queue + # if possible. + self.sque = multiprocessing.Queue() + self.q = multiprocessing.Queue() # The following fields are set in connect(). - self.sque = None - self.q = None self.message_thread = None self.socket = None @@ -92,7 +96,13 @@ class VppTransport(object): self.connected = True - # TODO: Can this block be moved even later? + # Queues' feeder threads from previous connect may still be sending. + # Close and join to avoid any errors. + self.sque.close() + self.q.close() + self.sque.join_thread() + self.q.join_thread() + # Finally safe to replace. self.sque = multiprocessing.Queue() self.q = multiprocessing.Queue() self.message_thread = threading.Thread(target=self.msg_thread_func) @@ -143,10 +153,9 @@ class VppTransport(object): # Allow additional connect() calls. self.message_thread.join() # Collect garbage. - self.sque = None - self.q = None self.message_thread = None self.socket = None + # Queues will be collected after connect replaces them. return rv def suspend(self): -- 2.16.6