2 # VPP Unix Domain Socket Transport.
13 def __init__(self, parent, read_timeout, server_address):
14 self.connected = False
15 self.read_timeout = read_timeout if read_timeout > 0 else 1
17 self.server_address = server_address
18 self.header = struct.Struct('>QII')
19 self.message_table = {}
20 self.sque = multiprocessing.Queue()
21 self.q = multiprocessing.Queue()
22 self.message_thread = threading.Thread(target=self.msg_thread_func)
24 def msg_thread_func(self):
27 rlist, _, _ = select.select([self.socket,
28 self.sque._reader], [], [])
31 logging.error('select failed')
36 if r == self.sque._reader:
41 elif r == self.socket:
50 # Put either to local queue or if context == 0
52 r = self.parent.decode_incoming_msg(msg)
53 if hasattr(r, 'context') and r.context > 0:
56 self.parent.msg_handler_async(msg)
58 raise IOError(2, 'Unknown response from select')
60 def connect(self, name, pfx, msg_handler, rx_qlen):
63 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
64 self.socket.settimeout(self.read_timeout)
66 # Connect the socket to the port where the server is listening
68 self.socket.connect(self.server_address)
69 except socket.error as msg:
75 # Initialise sockclnt_create
76 sockclnt_create = self.parent.messages['sockclnt_create']
77 sockclnt_create_reply = self.parent.messages['sockclnt_create_reply']
79 args = {'_vl_msg_id': 15,
82 b = sockclnt_create.pack(args)
85 hdr, length = self.parent.header.unpack(msg, 0)
87 raise IOError('Invalid reply message')
89 r, length = sockclnt_create_reply.unpack(msg)
90 self.socket_index = r.index
91 for m in r.message_table:
92 n = m.name.rstrip(b'\x00\x13')
93 self.message_table[n] = m.index
95 self.message_thread.daemon = True
96 self.message_thread.start()
100 def disconnect(self):
101 try: # Might fail, if VPP closes socket before packet makes it out
102 rv = self.parent.api.sockclnt_delete(index=self.socket_index)
105 self.connected = False
107 self.sque.put(True) # Terminate listening thread
108 self.message_thread.join()
119 def get_callback(self, async):
122 def get_msg_index(self, name):
124 return self.message_table[name]
128 def msg_table_max_index(self):
129 return len(self.message_table)
131 def write(self, buf):
132 """Send a binary-packed message to VPP."""
133 if not self.connected:
134 raise IOError(1, 'Not connected')
137 header = self.header.pack(0, len(buf), 0)
138 n = self.socket.send(header)
139 n = self.socket.send(buf)
144 msg = self.socket.recv(4096)
147 except socket.error as message:
148 logging.error(message)
151 (_, l, _) = self.header.unpack(msg[:16])
154 buf = bytearray(l + 16)
155 view = memoryview(buf)
158 # Read rest of message
159 remaining_bytes = l - 4096 + 16
160 while remaining_bytes > 0:
161 bytes_to_read = (remaining_bytes if remaining_bytes
163 nbytes = self.socket.recv_into(view, bytes_to_read)
165 logging.error('recv failed')
168 remaining_bytes -= nbytes
174 if not self.connected:
175 raise IOError(1, 'Not connected')