papi: allow client control over loggers
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_transport_socket.py
1 #
2 # VPP Unix Domain Socket Transport.
3 #
4 import socket
5 import struct
6 import threading
7 import select
8 import multiprocessing
9 try:
10     import queue as queue
11 except ImportError:
12     import Queue as queue
13 import logging
14 from . import vpp_papi
15
16 logger = logging.getLogger('vpp_papi.transport')
17 logger.addHandler(logging.NullHandler())
18
19
20 class VppTransportSocketIOError(IOError):
21     # TODO: Document different values of error number (first numeric argument).
22     pass
23
24
25 class VppTransport(object):
26     VppTransportSocketIOError = VppTransportSocketIOError
27
28     def __init__(self, parent, read_timeout, server_address):
29         self.connected = False
30         self.read_timeout = read_timeout if read_timeout > 0 else 1
31         self.parent = parent
32         self.server_address = server_address
33         self.header = struct.Struct('>QII')
34         self.message_table = {}
35         # These queues can be accessed async.
36         # They are always up, but replaced on connect.
37         # TODO: Use multiprocessing.Pipe instead of multiprocessing.Queue
38         # if possible.
39         self.sque = multiprocessing.Queue()
40         self.q = multiprocessing.Queue()
41         # The following fields are set in connect().
42         self.message_thread = None
43         self.socket = None
44
45     def msg_thread_func(self):
46         while True:
47             try:
48                 rlist, _, _ = select.select([self.socket,
49                                              self.sque._reader], [], [])
50             except socket.error:
51                 # Terminate thread
52                 logging.error('select failed')
53                 self.q.put(None)
54                 return
55
56             for r in rlist:
57                 if r == self.sque._reader:
58                     # Terminate
59                     self.q.put(None)
60                     return
61
62                 elif r == self.socket:
63                     try:
64                         msg = self._read()
65                         if not msg:
66                             self.q.put(None)
67                             return
68                     except socket.error:
69                         self.q.put(None)
70                         return
71                     # Put either to local queue or if context == 0
72                     # callback queue
73                     if self.parent.has_context(msg):
74                         self.q.put(msg)
75                     else:
76                         self.parent.msg_handler_async(msg)
77                 else:
78                     raise VppTransportSocketIOError(
79                         2, 'Unknown response from select')
80
81     def connect(self, name, pfx, msg_handler, rx_qlen):
82         # TODO: Reorder the actions and add "roll-backs",
83         # to restore clean disconnect state when failure happens durng connect.
84
85         if self.message_thread is not None:
86             raise VppTransportSocketIOError(
87                 1, "PAPI socket transport connect: Need to disconnect first.")
88
89         # Create a UDS socket
90         self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
91         self.socket.settimeout(self.read_timeout)
92
93         # Connect the socket to the port where the server is listening
94         try:
95             self.socket.connect(self.server_address)
96         except socket.error as msg:
97             logging.error("{} on socket {}".format(msg, self.server_address))
98             raise
99
100         self.connected = True
101
102         # Queues' feeder threads from previous connect may still be sending.
103         # Close and join to avoid any errors.
104         self.sque.close()
105         self.q.close()
106         self.sque.join_thread()
107         self.q.join_thread()
108         # Finally safe to replace.
109         self.sque = multiprocessing.Queue()
110         self.q = multiprocessing.Queue()
111         self.message_thread = threading.Thread(target=self.msg_thread_func)
112
113         # Initialise sockclnt_create
114         sockclnt_create = self.parent.messages['sockclnt_create']
115         sockclnt_create_reply = self.parent.messages['sockclnt_create_reply']
116
117         args = {'_vl_msg_id': 15,
118                 'name': name,
119                 'context': 124}
120         b = sockclnt_create.pack(args)
121         self.write(b)
122         msg = self._read()
123         hdr, length = self.parent.header.unpack(msg, 0)
124         if hdr.msgid != 16:
125             # TODO: Add first numeric argument.
126             raise VppTransportSocketIOError('Invalid reply message')
127
128         r, length = sockclnt_create_reply.unpack(msg)
129         self.socket_index = r.index
130         for m in r.message_table:
131             n = m.name
132             self.message_table[n] = m.index
133
134         self.message_thread.daemon = True
135         self.message_thread.start()
136
137         return 0
138
139     def disconnect(self):
140         # TODO: Support repeated disconnect calls, recommend users to call
141         # disconnect when they are not sure what the state is after failures.
142         # TODO: Any volunteer for comprehensive docstrings?
143         rv = 0
144         try:
145             # Might fail, if VPP closes socket before packet makes it out,
146             # or if there was a failure during connect().
147             rv = self.parent.api.sockclnt_delete(index=self.socket_index)
148         except (IOError, vpp_papi.VPPApiError):
149             pass
150         self.connected = False
151         if self.socket is not None:
152             self.socket.close()
153         if self.sque is not None:
154             self.sque.put(True)  # Terminate listening thread
155         if self.message_thread is not None and self.message_thread.is_alive():
156             # Allow additional connect() calls.
157             self.message_thread.join()
158         # Wipe message table, VPP can be restarted with different plugins.
159         self.message_table = {}
160         # Collect garbage.
161         self.message_thread = None
162         self.socket = None
163         # Queues will be collected after connect replaces them.
164         return rv
165
166     def suspend(self):
167         pass
168
169     def resume(self):
170         pass
171
172     def callback(self):
173         raise NotImplementedError
174
175     def get_callback(self, do_async):
176         return self.callback
177
178     def get_msg_index(self, name):
179         try:
180             return self.message_table[name]
181         except KeyError:
182             return 0
183
184     def msg_table_max_index(self):
185         return len(self.message_table)
186
187     def write(self, buf):
188         """Send a binary-packed message to VPP."""
189         if not self.connected:
190             raise VppTransportSocketIOError(1, 'Not connected')
191
192         # Send header
193         header = self.header.pack(0, len(buf), 0)
194         try:
195             self.socket.sendall(header)
196             self.socket.sendall(buf)
197         except socket.error as err:
198             raise VppTransportSocketIOError(1, 'Sendall error: {err!r}'.format(
199                 err=err))
200
201     def _read_fixed(self, size):
202         """Repeat receive until fixed size is read. Return empty on error."""
203         buf = bytearray(size)
204         view = memoryview(buf)
205         left = size
206         while 1:
207             got = self.socket.recv_into(view, left)
208             if got <= 0:
209                 # Read error.
210                 return ""
211             if got >= left:
212                 # TODO: Raise if got > left?
213                 break
214             left -= got
215             view = view[got:]
216         return buf
217
218     def _read(self):
219         """Read single complete message, return it or empty on error."""
220         hdr = self._read_fixed(16)
221         if not hdr:
222             return
223         (_, hdrlen, _) = self.header.unpack(hdr)  # If at head of message
224
225         # Read rest of message
226         msg = self._read_fixed(hdrlen)
227         if hdrlen == len(msg):
228             return msg
229         raise VppTransportSocketIOError(1, 'Unknown socket read error')
230
231     def read(self, timeout=None):
232         if not self.connected:
233             raise VppTransportSocketIOError(1, 'Not connected')
234         if timeout is None:
235             timeout = self.read_timeout
236         try:
237             return self.q.get(True, timeout)
238         except queue.Empty:
239             return None