2 # VPP Unix Domain Socket Transport.
12 logger = logging.getLogger("vpp_papi.transport")
13 logger.addHandler(logging.NullHandler())
16 class VppTransportSocketIOError(IOError):
17 # TODO: Document different values of error number (first numeric argument).
22 VppTransportSocketIOError = VppTransportSocketIOError
24 def __init__(self, parent, read_timeout, server_address):
25 self.connected = False
26 self.read_timeout = read_timeout if read_timeout > 0 else None
28 self.server_address = server_address
29 self.header = struct.Struct(">QII")
30 self.message_table = {}
31 # These queues can be accessed async.
32 # They are always up, but replaced on connect.
33 # TODO: Use multiprocessing.Pipe instead of multiprocessing.Queue
35 self.sque = multiprocessing.Queue()
36 self.q = multiprocessing.Queue()
37 # The following fields are set in connect().
38 self.message_thread = None
41 def msg_thread_func(self):
44 rlist, _, _ = select.select([self.socket, self.sque._reader], [], [])
45 except (socket.error, ValueError):
47 logging.error("select failed")
52 if r == self.sque._reader:
57 elif r == self.socket:
66 # Put either to local queue or if context == 0
68 if not self.do_async and self.parent.has_context(msg):
71 self.parent.msg_handler_async(msg)
73 raise VppTransportSocketIOError(2, "Unknown response from select")
75 def connect(self, name, pfx, msg_handler, rx_qlen, do_async=False):
76 # TODO: Reorder the actions and add "roll-backs",
77 # to restore clean disconnect state when failure happens durng connect.
79 if self.message_thread is not None:
80 raise VppTransportSocketIOError(
81 1, "PAPI socket transport connect: Need to disconnect first."
85 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
86 self.socket.settimeout(self.read_timeout)
88 # Connect the socket to the port where the server is listening
90 self.socket.connect(self.server_address)
91 except socket.error as msg:
92 # logging.error("{} on socket {}".format(msg, self.server_address))
97 # Queues' feeder threads from previous connect may still be sending.
98 # Close and join to avoid any errors.
101 self.sque.join_thread()
103 # Finally safe to replace.
104 self.sque = multiprocessing.Queue()
105 self.q = multiprocessing.Queue()
106 self.message_thread = threading.Thread(target=self.msg_thread_func)
108 # Initialise sockclnt_create
109 sockclnt_create = self.parent.messages["sockclnt_create"]
110 sockclnt_create_reply = self.parent.messages["sockclnt_create_reply"]
112 args = {"_vl_msg_id": 15, "name": name, "context": 124}
113 b = sockclnt_create.pack(args)
116 hdr, length = self.parent.header.unpack(msg, 0)
118 # TODO: Add first numeric argument.
119 raise VppTransportSocketIOError("Invalid reply message")
121 r, length = sockclnt_create_reply.unpack(msg)
122 self.socket_index = r.index
123 for m in r.message_table:
125 self.message_table[n] = m.index
127 self.message_thread.daemon = True
128 self.do_async = do_async
129 self.message_thread.start()
133 def disconnect(self):
134 # TODO: Support repeated disconnect calls, recommend users to call
135 # disconnect when they are not sure what the state is after failures.
136 # TODO: Any volunteer for comprehensive docstrings?
139 # Might fail, if VPP closes socket before packet makes it out,
140 # or if there was a failure during connect().
141 # TODO: manually build message so that .disconnect releases server-side resources
142 rv = self.parent.api.sockclnt_delete(index=self.socket_index)
143 except (IOError, self.parent.VPPApiError):
145 self.connected = False
146 if self.socket is not None:
148 if self.sque is not None:
149 self.sque.put(True) # Terminate listening thread
150 if self.message_thread is not None and self.message_thread.is_alive():
151 # Allow additional connect() calls.
152 self.message_thread.join()
153 # Wipe message table, VPP can be restarted with different plugins.
154 self.message_table = {}
156 self.message_thread = None
158 # Queues will be collected after connect replaces them.
168 raise NotImplementedError
170 def get_callback(self, do_async):
173 def get_msg_index(self, name):
175 return self.message_table[name]
179 def msg_table_max_index(self):
180 return len(self.message_table)
182 def write(self, buf):
183 """Send a binary-packed message to VPP."""
184 if not self.connected:
185 raise VppTransportSocketIOError(1, "Not connected")
188 header = self.header.pack(0, len(buf), 0)
190 self.socket.sendall(header)
191 self.socket.sendall(buf)
192 except socket.error as err:
193 raise VppTransportSocketIOError(1, "Sendall error: {err!r}".format(err=err))
195 def _read_fixed(self, size):
196 """Repeat receive until fixed size is read. Return empty on error."""
197 buf = bytearray(size)
198 view = memoryview(buf)
201 got = self.socket.recv_into(view, left)
206 # TODO: Raise if got > left?
213 """Read single complete message, return it or empty on error."""
214 hdr = self._read_fixed(16)
217 (_, hdrlen, _) = self.header.unpack(hdr) # If at head of message
219 # Read rest of message
220 msg = self._read_fixed(hdrlen)
221 if hdrlen == len(msg):
223 raise VppTransportSocketIOError(1, "Unknown socket read error")
225 def read(self, timeout=None):
226 if not self.connected:
227 raise VppTransportSocketIOError(1, "Not connected")
229 timeout = self.read_timeout
231 return self.q.get(True, timeout)