X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvpp-api%2Fpython%2Fvpp_papi%2Fvpp_transport_socket.py;h=b0861057e7206dd02e367b1ec4e71bb6f4fc5d6b;hb=refs%2Fchanges%2F85%2F31485%2F10;hp=49e565980f2abb168fe927939dcb321aa49d63f1;hpb=039a1c2f1401ebf3b38fa9bd55dffff0713b8b98;p=vpp.git diff --git a/src/vpp-api/python/vpp_papi/vpp_transport_socket.py b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py index 49e565980f2..b0861057e72 100644 --- a/src/vpp-api/python/vpp_papi/vpp_transport_socket.py +++ b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py @@ -6,14 +6,21 @@ import struct import threading import select import multiprocessing -try: - import queue as queue -except ImportError: - import Queue as queue +import queue import logging +logger = logging.getLogger('vpp_papi.transport') +logger.addHandler(logging.NullHandler()) + + +class VppTransportSocketIOError(IOError): + # TODO: Document different values of error number (first numeric argument). + pass + + +class VppTransport: + VppTransportSocketIOError = VppTransportSocketIOError -class VppTransport(object): def __init__(self, parent, read_timeout, server_address): self.connected = False self.read_timeout = read_timeout if read_timeout > 0 else 1 @@ -21,9 +28,15 @@ class VppTransport(object): self.server_address = server_address self.header = struct.Struct('>QII') self.message_table = {} + # These queues can be accessed async. + # They are always up, but replaced on connect. + # TODO: Use multiprocessing.Pipe instead of multiprocessing.Queue + # if possible. self.sque = multiprocessing.Queue() self.q = multiprocessing.Queue() - self.message_thread = threading.Thread(target=self.msg_thread_func) + # The following fields are set in connect(). + self.message_thread = None + self.socket = None def msg_thread_func(self): while True: @@ -53,28 +66,46 @@ class VppTransport(object): return # Put either to local queue or if context == 0 # callback queue - r = self.parent.decode_incoming_msg(msg) - if hasattr(r, 'context') and r.context > 0: + if self.parent.has_context(msg): self.q.put(msg) else: self.parent.msg_handler_async(msg) else: - raise IOError(2, 'Unknown response from select') + raise VppTransportSocketIOError( + 2, 'Unknown response from select') def connect(self, name, pfx, msg_handler, rx_qlen): + # TODO: Reorder the actions and add "roll-backs", + # to restore clean disconnect state when failure happens durng connect. + + if self.message_thread is not None: + raise VppTransportSocketIOError( + 1, "PAPI socket transport connect: Need to disconnect first.") # Create a UDS socket - self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) + self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.socket.settimeout(self.read_timeout) # Connect the socket to the port where the server is listening try: self.socket.connect(self.server_address) except socket.error as msg: - logging.error("{} on socket {}".format(msg, self.server_address)) - raise + # logging.error("{} on socket {}".format(msg, self.server_address)) + raise msg self.connected = True + + # Queues' feeder threads from previous connect may still be sending. + # Close and join to avoid any errors. + self.sque.close() + self.q.close() + self.sque.join_thread() + self.q.join_thread() + # Finally safe to replace. + self.sque = multiprocessing.Queue() + self.q = multiprocessing.Queue() + self.message_thread = threading.Thread(target=self.msg_thread_func) + # Initialise sockclnt_create sockclnt_create = self.parent.messages['sockclnt_create'] sockclnt_create_reply = self.parent.messages['sockclnt_create_reply'] @@ -87,12 +118,13 @@ class VppTransport(object): msg = self._read() hdr, length = self.parent.header.unpack(msg, 0) if hdr.msgid != 16: - raise IOError('Invalid reply message') + # TODO: Add first numeric argument. + raise VppTransportSocketIOError('Invalid reply message') r, length = sockclnt_create_reply.unpack(msg) self.socket_index = r.index for m in r.message_table: - n = m.name.rstrip(b'\x00\x13') + n = m.name self.message_table[n] = m.index self.message_thread.daemon = True @@ -101,15 +133,31 @@ class VppTransport(object): return 0 def disconnect(self): + # TODO: Support repeated disconnect calls, recommend users to call + # disconnect when they are not sure what the state is after failures. + # TODO: Any volunteer for comprehensive docstrings? rv = 0 - try: # Might fail, if VPP closes socket before packet makes it out + try: + # Might fail, if VPP closes socket before packet makes it out, + # or if there was a failure during connect(). + # TODO: manually build message so that .disconnect releases server-side resources rv = self.parent.api.sockclnt_delete(index=self.socket_index) - except IOError: + except (IOError, self.parent.VPPApiError): pass self.connected = False - self.socket.close() - self.sque.put(True) # Terminate listening thread - self.message_thread.join() + if self.socket is not None: + self.socket.close() + if self.sque is not None: + self.sque.put(True) # Terminate listening thread + if self.message_thread is not None and self.message_thread.is_alive(): + # Allow additional connect() calls. + self.message_thread.join() + # Wipe message table, VPP can be restarted with different plugins. + self.message_table = {} + # Collect garbage. + self.message_thread = None + self.socket = None + # Queues will be collected after connect replaces them. return rv def suspend(self): @@ -136,49 +184,53 @@ class VppTransport(object): def write(self, buf): """Send a binary-packed message to VPP.""" if not self.connected: - raise IOError(1, 'Not connected') + raise VppTransportSocketIOError(1, 'Not connected') # Send header header = self.header.pack(0, len(buf), 0) - n = self.socket.send(header) - n = self.socket.send(buf) + try: + self.socket.sendall(header) + self.socket.sendall(buf) + except socket.error as err: + raise VppTransportSocketIOError(1, 'Sendall error: {err!r}'.format( + err=err)) + + def _read_fixed(self, size): + """Repeat receive until fixed size is read. Return empty on error.""" + buf = bytearray(size) + view = memoryview(buf) + left = size + while 1: + got = self.socket.recv_into(view, left) + if got <= 0: + # Read error. + return "" + if got >= left: + # TODO: Raise if got > left? + break + left -= got + view = view[got:] + return buf def _read(self): - # Header and message - try: - msg = self.socket.recv(4096) - if len(msg) == 0: - return None - except socket.error as message: - logging.error(message) - raise - - (_, l, _) = self.header.unpack(msg[:16]) - - if l > len(msg): - buf = bytearray(l + 16) - view = memoryview(buf) - view[:4096] = msg - view = view[4096:] - # Read rest of message - remaining_bytes = l - 4096 + 16 - while remaining_bytes > 0: - bytes_to_read = (remaining_bytes if remaining_bytes - <= 4096 else 4096) - nbytes = self.socket.recv_into(view, bytes_to_read) - if nbytes == 0: - logging.error('recv failed') - break - view = view[nbytes:] - remaining_bytes -= nbytes - else: - buf = msg - return buf[16:] - - def read(self): + """Read single complete message, return it or empty on error.""" + hdr = self._read_fixed(16) + if not hdr: + return + (_, hdrlen, _) = self.header.unpack(hdr) # If at head of message + + # Read rest of message + msg = self._read_fixed(hdrlen) + if hdrlen == len(msg): + return msg + raise VppTransportSocketIOError(1, 'Unknown socket read error') + + def read(self, timeout=None): if not self.connected: - raise IOError(1, 'Not connected') + raise VppTransportSocketIOError(1, 'Not connected') + if timeout is None: + timeout = self.read_timeout try: - return self.q.get(True, self.read_timeout) + return self.q.get(True, timeout) except queue.Empty: return None