2 # VPP Unix Domain Socket Transport.
14 from . import vpp_papi
16 logger = logging.getLogger('vpp_papi.transport')
17 logger.addHandler(logging.NullHandler())
20 class VppTransportSocketIOError(IOError):
21 # TODO: Document different values of error number (first numeric argument).
25 class VppTransport(object):
26 VppTransportSocketIOError = VppTransportSocketIOError
28 def __init__(self, parent, read_timeout, server_address):
29 self.connected = False
30 self.read_timeout = read_timeout if read_timeout > 0 else 1
32 self.server_address = server_address
33 self.header = struct.Struct('>QII')
34 self.message_table = {}
35 # These queues can be accessed async.
36 # They are always up, but replaced on connect.
37 # TODO: Use multiprocessing.Pipe instead of multiprocessing.Queue
39 self.sque = multiprocessing.Queue()
40 self.q = multiprocessing.Queue()
41 # The following fields are set in connect().
42 self.message_thread = None
45 def msg_thread_func(self):
48 rlist, _, _ = select.select([self.socket,
49 self.sque._reader], [], [])
52 logging.error('select failed')
57 if r == self.sque._reader:
62 elif r == self.socket:
71 # Put either to local queue or if context == 0
73 if self.parent.has_context(msg):
76 self.parent.msg_handler_async(msg)
78 raise VppTransportSocketIOError(
79 2, 'Unknown response from select')
81 def connect(self, name, pfx, msg_handler, rx_qlen):
82 # TODO: Reorder the actions and add "roll-backs",
83 # to restore clean disconnect state when failure happens durng connect.
85 if self.message_thread is not None:
86 raise VppTransportSocketIOError(
87 1, "PAPI socket transport connect: Need to disconnect first.")
90 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
91 self.socket.settimeout(self.read_timeout)
93 # Connect the socket to the port where the server is listening
95 self.socket.connect(self.server_address)
96 except socket.error as msg:
97 logging.error("{} on socket {}".format(msg, self.server_address))
100 self.connected = True
102 # Queues' feeder threads from previous connect may still be sending.
103 # Close and join to avoid any errors.
106 self.sque.join_thread()
108 # Finally safe to replace.
109 self.sque = multiprocessing.Queue()
110 self.q = multiprocessing.Queue()
111 self.message_thread = threading.Thread(target=self.msg_thread_func)
113 # Initialise sockclnt_create
114 sockclnt_create = self.parent.messages['sockclnt_create']
115 sockclnt_create_reply = self.parent.messages['sockclnt_create_reply']
117 args = {'_vl_msg_id': 15,
120 b = sockclnt_create.pack(args)
123 hdr, length = self.parent.header.unpack(msg, 0)
125 # TODO: Add first numeric argument.
126 raise VppTransportSocketIOError('Invalid reply message')
128 r, length = sockclnt_create_reply.unpack(msg)
129 self.socket_index = r.index
130 for m in r.message_table:
132 self.message_table[n] = m.index
134 self.message_thread.daemon = True
135 self.message_thread.start()
139 def disconnect(self):
140 # TODO: Support repeated disconnect calls, recommend users to call
141 # disconnect when they are not sure what the state is after failures.
142 # TODO: Any volunteer for comprehensive docstrings?
145 # Might fail, if VPP closes socket before packet makes it out,
146 # or if there was a failure during connect().
147 rv = self.parent.api.sockclnt_delete(index=self.socket_index)
148 except (IOError, vpp_papi.VPPApiError):
150 self.connected = False
151 if self.socket is not None:
153 if self.sque is not None:
154 self.sque.put(True) # Terminate listening thread
155 if self.message_thread is not None and self.message_thread.is_alive():
156 # Allow additional connect() calls.
157 self.message_thread.join()
158 # Wipe message table, VPP can be restarted with different plugins.
159 self.message_table = {}
161 self.message_thread = None
163 # Queues will be collected after connect replaces them.
173 raise NotImplementedError
175 def get_callback(self, do_async):
178 def get_msg_index(self, name):
180 return self.message_table[name]
184 def msg_table_max_index(self):
185 return len(self.message_table)
187 def write(self, buf):
188 """Send a binary-packed message to VPP."""
189 if not self.connected:
190 raise VppTransportSocketIOError(1, 'Not connected')
193 header = self.header.pack(0, len(buf), 0)
195 self.socket.sendall(header)
196 self.socket.sendall(buf)
197 except socket.error as err:
198 raise VppTransportSocketIOError(1, 'Sendall error: {err!r}'.format(
201 def _read_fixed(self, size):
202 """Repeat receive until fixed size is read. Return empty on error."""
203 buf = bytearray(size)
204 view = memoryview(buf)
207 got = self.socket.recv_into(view, left)
212 # TODO: Raise if got > left?
219 """Read single complete message, return it or empty on error."""
220 hdr = self._read_fixed(16)
223 (_, hdrlen, _) = self.header.unpack(hdr) # If at head of message
225 # Read rest of message
226 msg = self._read_fixed(hdrlen)
227 if hdrlen == len(msg):
229 raise VppTransportSocketIOError(1, 'Unknown socket read error')
231 def read(self, timeout=None):
232 if not self.connected:
233 raise VppTransportSocketIOError(1, 'Not connected')
235 timeout = self.read_timeout
237 return self.q.get(True, timeout)