2 # VPP Unix Domain Socket Transport.
16 class VppTransportSocketIOError(IOError):
20 class VppTransport(object):
21 VppTransportSocketIOError = VppTransportSocketIOError
23 def __init__(self, parent, read_timeout, server_address):
24 self.connected = False
25 self.read_timeout = read_timeout if read_timeout > 0 else 1
27 self.server_address = server_address
28 self.header = struct.Struct('>QII')
29 self.message_table = {}
30 self.sque = multiprocessing.Queue()
31 self.q = multiprocessing.Queue()
32 self.message_thread = threading.Thread(target=self.msg_thread_func)
34 def msg_thread_func(self):
37 rlist, _, _ = select.select([self.socket,
38 self.sque._reader], [], [])
41 logging.error('select failed')
46 if r == self.sque._reader:
51 elif r == self.socket:
60 # Put either to local queue or if context == 0
62 if self.parent.has_context(msg):
65 self.parent.msg_handler_async(msg)
67 raise VppTransportSocketIOError(
68 2, 'Unknown response from select')
70 def connect(self, name, pfx, msg_handler, rx_qlen):
73 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
74 self.socket.settimeout(self.read_timeout)
76 # Connect the socket to the port where the server is listening
78 self.socket.connect(self.server_address)
79 except socket.error as msg:
80 logging.error("{} on socket {}".format(msg, self.server_address))
84 # Initialise sockclnt_create
85 sockclnt_create = self.parent.messages['sockclnt_create']
86 sockclnt_create_reply = self.parent.messages['sockclnt_create_reply']
88 args = {'_vl_msg_id': 15,
91 b = sockclnt_create.pack(args)
94 hdr, length = self.parent.header.unpack(msg, 0)
96 raise VppTransportSocketIOError('Invalid reply message')
98 r, length = sockclnt_create_reply.unpack(msg)
99 self.socket_index = r.index
100 for m in r.message_table:
101 n = m.name.rstrip(b'\x00\x13')
102 self.message_table[n] = m.index
104 self.message_thread.daemon = True
105 self.message_thread.start()
109 def disconnect(self):
111 try: # Might fail, if VPP closes socket before packet makes it out
112 rv = self.parent.api.sockclnt_delete(index=self.socket_index)
115 self.connected = False
117 self.sque.put(True) # Terminate listening thread
118 self.message_thread.join()
128 raise NotImplementedError
130 def get_callback(self, do_async):
133 def get_msg_index(self, name):
135 return self.message_table[name]
139 def msg_table_max_index(self):
140 return len(self.message_table)
142 def write(self, buf):
143 """Send a binary-packed message to VPP."""
144 if not self.connected:
145 raise VppTransportSocketIOError(1, 'Not connected')
148 header = self.header.pack(0, len(buf), 0)
149 n = self.socket.send(header)
150 n = self.socket.send(buf)
155 msg = self.socket.recv(4096)
158 except socket.error as message:
159 logging.error(message)
162 (_, l, _) = self.header.unpack(msg[:16])
165 buf = bytearray(l + 16)
166 view = memoryview(buf)
169 # Read rest of message
170 remaining_bytes = l - 4096 + 16
171 while remaining_bytes > 0:
172 bytes_to_read = (remaining_bytes if remaining_bytes
174 nbytes = self.socket.recv_into(view, bytes_to_read)
176 logging.error('recv failed')
179 remaining_bytes -= nbytes
185 if not self.connected:
186 raise VppTransportSocketIOError(1, 'Not connected')
188 return self.q.get(True, self.read_timeout)