2 # VPP Unix Domain Socket Transport.
14 from . import vpp_papi
17 class VppTransportSocketIOError(IOError):
18 # TODO: Document different values of error number (first numeric argument).
22 class VppTransport(object):
23 VppTransportSocketIOError = VppTransportSocketIOError
25 def __init__(self, parent, read_timeout, server_address):
26 self.connected = False
27 self.read_timeout = read_timeout if read_timeout > 0 else 1
29 self.server_address = server_address
30 self.header = struct.Struct('>QII')
31 self.message_table = {}
32 # The following fields are set in connect().
35 self.message_thread = None
38 def msg_thread_func(self):
41 rlist, _, _ = select.select([self.socket,
42 self.sque._reader], [], [])
45 logging.error('select failed')
50 if r == self.sque._reader:
55 elif r == self.socket:
64 # Put either to local queue or if context == 0
66 if self.parent.has_context(msg):
69 self.parent.msg_handler_async(msg)
71 raise VppTransportSocketIOError(
72 2, 'Unknown response from select')
74 def connect(self, name, pfx, msg_handler, rx_qlen):
75 # TODO: Reorder the actions and add "roll-backs",
76 # to restore clean disconnect state when failure happens durng connect.
78 if self.message_thread is not None:
79 raise VppTransportSocketIOError(
80 1, "PAPI socket transport connect: Need to disconnect first.")
83 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
84 self.socket.settimeout(self.read_timeout)
86 # Connect the socket to the port where the server is listening
88 self.socket.connect(self.server_address)
89 except socket.error as msg:
90 logging.error("{} on socket {}".format(msg, self.server_address))
95 # TODO: Can this block be moved even later?
96 self.sque = multiprocessing.Queue()
97 self.q = multiprocessing.Queue()
98 self.message_thread = threading.Thread(target=self.msg_thread_func)
100 # Initialise sockclnt_create
101 sockclnt_create = self.parent.messages['sockclnt_create']
102 sockclnt_create_reply = self.parent.messages['sockclnt_create_reply']
104 args = {'_vl_msg_id': 15,
107 b = sockclnt_create.pack(args)
110 hdr, length = self.parent.header.unpack(msg, 0)
112 # TODO: Add first numeric argument.
113 raise VppTransportSocketIOError('Invalid reply message')
115 r, length = sockclnt_create_reply.unpack(msg)
116 self.socket_index = r.index
117 for m in r.message_table:
118 n = m.name.rstrip(b'\x00\x13')
119 self.message_table[n] = m.index
121 self.message_thread.daemon = True
122 self.message_thread.start()
126 def disconnect(self):
127 # TODO: Support repeated disconnect calls, recommend users to call
128 # disconnect when they are not sure what the state is after failures.
129 # TODO: Any volunteer for comprehensive docstrings?
132 # Might fail, if VPP closes socket before packet makes it out,
133 # or if there was a failure during connect().
134 rv = self.parent.api.sockclnt_delete(index=self.socket_index)
135 except (IOError, vpp_papi.VPPApiError):
137 self.connected = False
138 if self.socket is not None:
140 if self.sque is not None:
141 self.sque.put(True) # Terminate listening thread
142 if self.message_thread is not None:
143 # Allow additional connect() calls.
144 self.message_thread.join()
148 self.message_thread = None
159 raise NotImplementedError
161 def get_callback(self, do_async):
164 def get_msg_index(self, name):
166 return self.message_table[name]
170 def msg_table_max_index(self):
171 return len(self.message_table)
173 def write(self, buf):
174 """Send a binary-packed message to VPP."""
175 if not self.connected:
176 raise VppTransportSocketIOError(1, 'Not connected')
179 header = self.header.pack(0, len(buf), 0)
180 n = self.socket.send(header)
181 n = self.socket.send(buf)
184 hdr = self.socket.recv(16)
187 (_, l, _) = self.header.unpack(hdr) # If at head of message
189 # Read rest of message
190 msg = self.socket.recv(l)
194 view = memoryview(buf)
199 nbytes = self.socket.recv_into(view, left)
205 raise VppTransportSocketIOError(1, 'Unknown socket read error')
208 if not self.connected:
209 raise VppTransportSocketIOError(1, 'Not connected')
211 return self.q.get(True, self.read_timeout)