import threading
import select
import multiprocessing
+try:
+ import queue as queue
+except ImportError:
+ import Queue as queue
import logging
-class VppTransport:
+class VppTransportSocketIOError(IOError):
+ pass
+
+
+class VppTransport(object):
+ VppTransportSocketIOError = VppTransportSocketIOError
+
def __init__(self, parent, read_timeout, server_address):
self.connected = False
self.read_timeout = read_timeout if read_timeout > 0 else 1
return
# Put either to local queue or if context == 0
# callback queue
- r = self.parent.decode_incoming_msg(msg)
- if hasattr(r, 'context') and r.context > 0:
+ if self.parent.has_context(msg):
self.q.put(msg)
else:
self.parent.msg_handler_async(msg)
else:
- raise IOError(2, 'Unknown response from select')
+ raise VppTransportSocketIOError(
+ 2, 'Unknown response from select')
def connect(self, name, pfx, msg_handler, rx_qlen):
# Create a UDS socket
- self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
+ self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket.settimeout(self.read_timeout)
# Connect the socket to the port where the server is listening
try:
self.socket.connect(self.server_address)
except socket.error as msg:
- logging.error(msg)
+ logging.error("{} on socket {}".format(msg, self.server_address))
raise
self.connected = True
-
# Initialise sockclnt_create
sockclnt_create = self.parent.messages['sockclnt_create']
sockclnt_create_reply = self.parent.messages['sockclnt_create_reply']
msg = self._read()
hdr, length = self.parent.header.unpack(msg, 0)
if hdr.msgid != 16:
- raise IOError('Invalid reply message')
+ raise VppTransportSocketIOError('Invalid reply message')
r, length = sockclnt_create_reply.unpack(msg)
self.socket_index = r.index
return 0
def disconnect(self):
+ rv = 0
try: # Might fail, if VPP closes socket before packet makes it out
rv = self.parent.api.sockclnt_delete(index=self.socket_index)
except IOError:
self.socket.close()
self.sque.put(True) # Terminate listening thread
self.message_thread.join()
+ return rv
def suspend(self):
pass
pass
def callback(self):
- raise NotImplemented
+ raise NotImplementedError
- def get_callback(self, async):
+ def get_callback(self, do_async):
return self.callback
def get_msg_index(self, name):
def write(self, buf):
"""Send a binary-packed message to VPP."""
if not self.connected:
- raise IOError(1, 'Not connected')
+ raise VppTransportSocketIOError(1, 'Not connected')
# Send header
header = self.header.pack(0, len(buf), 0)
n = self.socket.send(buf)
def _read(self):
- # Header and message
- try:
- msg = self.socket.recv(4096)
- if len(msg) == 0:
- return None
- except socket.error as message:
- logging.error(message)
- raise
-
- (_, l, _) = self.header.unpack(msg[:16])
+ hdr = self.socket.recv(16)
+ if not hdr:
+ return
+ (_, l, _) = self.header.unpack(hdr) # If at head of message
+ # Read rest of message
+ msg = self.socket.recv(l)
if l > len(msg):
- buf = bytearray(l + 16)
+ nbytes = len(msg)
+ buf = bytearray(l)
view = memoryview(buf)
- view[:4096] = msg
- view = view[4096:]
- # Read rest of message
- remaining_bytes = l - 4096 + 16
- while remaining_bytes > 0:
- bytes_to_read = (remaining_bytes if remaining_bytes
- <= 4096 else 4096)
- nbytes = self.socket.recv_into(view, bytes_to_read)
- if nbytes == 0:
- logging.error('recv failed')
- break
+ view[:nbytes] = msg
+ view = view[nbytes:]
+ left = l - nbytes
+ while left:
+ nbytes = self.socket.recv_into(view, left)
view = view[nbytes:]
- remaining_bytes -= nbytes
- else:
- buf = msg
- return buf[16:]
+ left -= nbytes
+ return buf
+ if l == len(msg):
+ return msg
+ raise VPPTransportSocketIOError(1, 'Unknown socket read error')
def read(self):
if not self.connected:
- raise IOError(1, 'Not connected')
- return self.q.get()
+ raise VppTransportSocketIOError(1, 'Not connected')
+ try:
+ return self.q.get(True, self.read_timeout)
+ except queue.Empty:
+ return None