2 # VPP Unix Domain Socket Transport.
16 class VppTransportSocketIOError(IOError):
17 # TODO: Document different values of error number (first numeric argument).
21 class VppTransport(object):
22 VppTransportSocketIOError = VppTransportSocketIOError
24 def __init__(self, parent, read_timeout, server_address):
25 self.connected = False
26 self.read_timeout = read_timeout if read_timeout > 0 else 1
28 self.server_address = server_address
29 self.header = struct.Struct('>QII')
30 self.message_table = {}
31 self.sque = multiprocessing.Queue()
32 self.q = multiprocessing.Queue()
33 self.message_thread = None # Will be set on connect().
35 def msg_thread_func(self):
38 rlist, _, _ = select.select([self.socket,
39 self.sque._reader], [], [])
42 logging.error('select failed')
47 if r == self.sque._reader:
52 elif r == self.socket:
61 # Put either to local queue or if context == 0
63 if self.parent.has_context(msg):
66 self.parent.msg_handler_async(msg)
68 raise VppTransportSocketIOError(
69 2, 'Unknown response from select')
71 def connect(self, name, pfx, msg_handler, rx_qlen):
73 if self.message_thread is not None:
74 raise VppTransportSocketIOError(
75 1, "PAPI socket transport connect: Need to disconnect first.")
76 self.message_thread = threading.Thread(target=self.msg_thread_func)
79 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
80 self.socket.settimeout(self.read_timeout)
82 # Connect the socket to the port where the server is listening
84 self.socket.connect(self.server_address)
85 except socket.error as msg:
86 logging.error("{} on socket {}".format(msg, self.server_address))
90 # Initialise sockclnt_create
91 sockclnt_create = self.parent.messages['sockclnt_create']
92 sockclnt_create_reply = self.parent.messages['sockclnt_create_reply']
94 args = {'_vl_msg_id': 15,
97 b = sockclnt_create.pack(args)
100 hdr, length = self.parent.header.unpack(msg, 0)
102 # TODO: Add first numeric argument.
103 raise VppTransportSocketIOError('Invalid reply message')
105 r, length = sockclnt_create_reply.unpack(msg)
106 self.socket_index = r.index
107 for m in r.message_table:
108 n = m.name.rstrip(b'\x00\x13')
109 self.message_table[n] = m.index
111 self.message_thread.daemon = True
112 self.message_thread.start()
116 def disconnect(self):
117 # TODO: Should we detect if user forgot to connect first?
119 try: # Might fail, if VPP closes socket before packet makes it out
120 rv = self.parent.api.sockclnt_delete(index=self.socket_index)
123 self.connected = False
125 self.sque.put(True) # Terminate listening thread
126 self.message_thread.join()
127 # Allow additional connect() calls.
128 self.message_thread = None
138 raise NotImplementedError
140 def get_callback(self, do_async):
143 def get_msg_index(self, name):
145 return self.message_table[name]
149 def msg_table_max_index(self):
150 return len(self.message_table)
152 def write(self, buf):
153 """Send a binary-packed message to VPP."""
154 if not self.connected:
155 raise VppTransportSocketIOError(1, 'Not connected')
158 header = self.header.pack(0, len(buf), 0)
159 n = self.socket.send(header)
160 n = self.socket.send(buf)
163 hdr = self.socket.recv(16)
166 (_, l, _) = self.header.unpack(hdr) # If at head of message
168 # Read rest of message
169 msg = self.socket.recv(l)
173 view = memoryview(buf)
178 nbytes = self.socket.recv_into(view, left)
184 raise VppTransportSocketIOError(1, 'Unknown socket read error')
187 if not self.connected:
188 raise VppTransportSocketIOError(1, 'Not connected')
190 return self.q.get(True, self.read_timeout)