2 # VPP Unix Domain Socket Transport.
14 from . import vpp_papi
17 class VppTransportSocketIOError(IOError):
18 # TODO: Document different values of error number (first numeric argument).
22 class VppTransport(object):
23 VppTransportSocketIOError = VppTransportSocketIOError
25 def __init__(self, parent, read_timeout, server_address):
26 self.connected = False
27 self.read_timeout = read_timeout if read_timeout > 0 else 1
29 self.server_address = server_address
30 self.header = struct.Struct('>QII')
31 self.message_table = {}
32 # These queues can be accessed async.
33 # They are always up, but replaced on connect.
34 # TODO: Use multiprocessing.Pipe instead of multiprocessing.Queue
36 self.sque = multiprocessing.Queue()
37 self.q = multiprocessing.Queue()
38 # The following fields are set in connect().
39 self.message_thread = None
42 def msg_thread_func(self):
45 rlist, _, _ = select.select([self.socket,
46 self.sque._reader], [], [])
49 logging.error('select failed')
54 if r == self.sque._reader:
59 elif r == self.socket:
68 # Put either to local queue or if context == 0
70 if self.parent.has_context(msg):
73 self.parent.msg_handler_async(msg)
75 raise VppTransportSocketIOError(
76 2, 'Unknown response from select')
78 def connect(self, name, pfx, msg_handler, rx_qlen):
79 # TODO: Reorder the actions and add "roll-backs",
80 # to restore clean disconnect state when failure happens durng connect.
82 if self.message_thread is not None:
83 raise VppTransportSocketIOError(
84 1, "PAPI socket transport connect: Need to disconnect first.")
87 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
88 self.socket.settimeout(self.read_timeout)
90 # Connect the socket to the port where the server is listening
92 self.socket.connect(self.server_address)
93 except socket.error as msg:
94 logging.error("{} on socket {}".format(msg, self.server_address))
99 # Queues' feeder threads from previous connect may still be sending.
100 # Close and join to avoid any errors.
103 self.sque.join_thread()
105 # Finally safe to replace.
106 self.sque = multiprocessing.Queue()
107 self.q = multiprocessing.Queue()
108 self.message_thread = threading.Thread(target=self.msg_thread_func)
110 # Initialise sockclnt_create
111 sockclnt_create = self.parent.messages['sockclnt_create']
112 sockclnt_create_reply = self.parent.messages['sockclnt_create_reply']
114 args = {'_vl_msg_id': 15,
117 b = sockclnt_create.pack(args)
120 hdr, length = self.parent.header.unpack(msg, 0)
122 # TODO: Add first numeric argument.
123 raise VppTransportSocketIOError('Invalid reply message')
125 r, length = sockclnt_create_reply.unpack(msg)
126 self.socket_index = r.index
127 for m in r.message_table:
128 n = m.name.rstrip(b'\x00\x13')
129 self.message_table[n] = m.index
131 self.message_thread.daemon = True
132 self.message_thread.start()
136 def disconnect(self):
137 # TODO: Support repeated disconnect calls, recommend users to call
138 # disconnect when they are not sure what the state is after failures.
139 # TODO: Any volunteer for comprehensive docstrings?
142 # Might fail, if VPP closes socket before packet makes it out,
143 # or if there was a failure during connect().
144 rv = self.parent.api.sockclnt_delete(index=self.socket_index)
145 except (IOError, vpp_papi.VPPApiError):
147 self.connected = False
148 if self.socket is not None:
150 if self.sque is not None:
151 self.sque.put(True) # Terminate listening thread
152 if self.message_thread is not None and self.message_thread.is_alive():
153 # Allow additional connect() calls.
154 self.message_thread.join()
155 # Wipe message table, VPP can be restarted with different plugins.
156 self.message_table = {}
158 self.message_thread = None
160 # Queues will be collected after connect replaces them.
170 raise NotImplementedError
172 def get_callback(self, do_async):
175 def get_msg_index(self, name):
177 return self.message_table[name]
181 def msg_table_max_index(self):
182 return len(self.message_table)
184 def write(self, buf):
185 """Send a binary-packed message to VPP."""
186 if not self.connected:
187 raise VppTransportSocketIOError(1, 'Not connected')
190 header = self.header.pack(0, len(buf), 0)
191 n = self.socket.send(header)
192 n = self.socket.send(buf)
194 raise VppTransportSocketIOError(1, 'Not connected')
197 hdr = self.socket.recv(16)
200 (_, hdrlen, _) = self.header.unpack(hdr) # If at head of message
202 # Read rest of message
203 msg = self.socket.recv(hdrlen)
204 if hdrlen > len(msg):
206 buf = bytearray(hdrlen)
207 view = memoryview(buf)
210 left = hdrlen - nbytes
212 nbytes = self.socket.recv_into(view, left)
216 if hdrlen == len(msg):
218 raise VppTransportSocketIOError(1, 'Unknown socket read error')
221 if not self.connected:
222 raise VppTransportSocketIOError(1, 'Not connected')
224 return self.q.get(True, self.read_timeout)