target=self.thread_msg_handler)
self.event_thread.daemon = True
self.event_thread.start()
+ else:
+ self.event_thread = None
return rv
def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
def disconnect(self):
"""Detach from VPP."""
rv = self.transport.disconnect()
- self.message_queue.put("terminate event thread")
+ if self.event_thread is not None:
+ self.message_queue.put("terminate event thread")
return rv
def msg_handler_sync(self, msg):
self.server_address = server_address
self.header = struct.Struct('>QII')
self.message_table = {}
+ # These queues can be accessed async.
+ # They are always up, but replaced on connect.
+ # TODO: Use multiprocessing.Pipe instead of multiprocessing.Queue
+ # if possible.
+ self.sque = multiprocessing.Queue()
+ self.q = multiprocessing.Queue()
# The following fields are set in connect().
- self.sque = None
- self.q = None
self.message_thread = None
self.socket = None
self.connected = True
- # TODO: Can this block be moved even later?
+ # Queues' feeder threads from previous connect may still be sending.
+ # Close and join to avoid any errors.
+ self.sque.close()
+ self.q.close()
+ self.sque.join_thread()
+ self.q.join_thread()
+ # Finally safe to replace.
self.sque = multiprocessing.Queue()
self.q = multiprocessing.Queue()
self.message_thread = threading.Thread(target=self.msg_thread_func)
# Allow additional connect() calls.
self.message_thread.join()
# Collect garbage.
- self.sque = None
- self.q = None
self.message_thread = None
self.socket = None
+ # Queues will be collected after connect replaces them.
return rv
def suspend(self):