api: memclnt api use string type.
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_transport_socket.py
1 #
2 # VPP Unix Domain Socket Transport.
3 #
4 import socket
5 import struct
6 import threading
7 import select
8 import multiprocessing
9 try:
10     import queue as queue
11 except ImportError:
12     import Queue as queue
13 import logging
14 from . import vpp_papi
15
16
17 class VppTransportSocketIOError(IOError):
18     # TODO: Document different values of error number (first numeric argument).
19     pass
20
21
22 class VppTransport(object):
23     VppTransportSocketIOError = VppTransportSocketIOError
24
25     def __init__(self, parent, read_timeout, server_address):
26         self.connected = False
27         self.read_timeout = read_timeout if read_timeout > 0 else 1
28         self.parent = parent
29         self.server_address = server_address
30         self.header = struct.Struct('>QII')
31         self.message_table = {}
32         # These queues can be accessed async.
33         # They are always up, but replaced on connect.
34         # TODO: Use multiprocessing.Pipe instead of multiprocessing.Queue
35         # if possible.
36         self.sque = multiprocessing.Queue()
37         self.q = multiprocessing.Queue()
38         # The following fields are set in connect().
39         self.message_thread = None
40         self.socket = None
41
42     def msg_thread_func(self):
43         while True:
44             try:
45                 rlist, _, _ = select.select([self.socket,
46                                              self.sque._reader], [], [])
47             except socket.error:
48                 # Terminate thread
49                 logging.error('select failed')
50                 self.q.put(None)
51                 return
52
53             for r in rlist:
54                 if r == self.sque._reader:
55                     # Terminate
56                     self.q.put(None)
57                     return
58
59                 elif r == self.socket:
60                     try:
61                         msg = self._read()
62                         if not msg:
63                             self.q.put(None)
64                             return
65                     except socket.error:
66                         self.q.put(None)
67                         return
68                     # Put either to local queue or if context == 0
69                     # callback queue
70                     if self.parent.has_context(msg):
71                         self.q.put(msg)
72                     else:
73                         self.parent.msg_handler_async(msg)
74                 else:
75                     raise VppTransportSocketIOError(
76                         2, 'Unknown response from select')
77
78     def connect(self, name, pfx, msg_handler, rx_qlen):
79         # TODO: Reorder the actions and add "roll-backs",
80         # to restore clean disconnect state when failure happens durng connect.
81
82         if self.message_thread is not None:
83             raise VppTransportSocketIOError(
84                 1, "PAPI socket transport connect: Need to disconnect first.")
85
86         # Create a UDS socket
87         self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
88         self.socket.settimeout(self.read_timeout)
89
90         # Connect the socket to the port where the server is listening
91         try:
92             self.socket.connect(self.server_address)
93         except socket.error as msg:
94             logging.error("{} on socket {}".format(msg, self.server_address))
95             raise
96
97         self.connected = True
98
99         # Queues' feeder threads from previous connect may still be sending.
100         # Close and join to avoid any errors.
101         self.sque.close()
102         self.q.close()
103         self.sque.join_thread()
104         self.q.join_thread()
105         # Finally safe to replace.
106         self.sque = multiprocessing.Queue()
107         self.q = multiprocessing.Queue()
108         self.message_thread = threading.Thread(target=self.msg_thread_func)
109
110         # Initialise sockclnt_create
111         sockclnt_create = self.parent.messages['sockclnt_create']
112         sockclnt_create_reply = self.parent.messages['sockclnt_create_reply']
113
114         args = {'_vl_msg_id': 15,
115                 'name': name,
116                 'context': 124}
117         b = sockclnt_create.pack(args)
118         self.write(b)
119         msg = self._read()
120         hdr, length = self.parent.header.unpack(msg, 0)
121         if hdr.msgid != 16:
122             # TODO: Add first numeric argument.
123             raise VppTransportSocketIOError('Invalid reply message')
124
125         r, length = sockclnt_create_reply.unpack(msg)
126         self.socket_index = r.index
127         for m in r.message_table:
128             n = m.name
129             self.message_table[n] = m.index
130
131         self.message_thread.daemon = True
132         self.message_thread.start()
133
134         return 0
135
136     def disconnect(self):
137         # TODO: Support repeated disconnect calls, recommend users to call
138         # disconnect when they are not sure what the state is after failures.
139         # TODO: Any volunteer for comprehensive docstrings?
140         rv = 0
141         try:
142             # Might fail, if VPP closes socket before packet makes it out,
143             # or if there was a failure during connect().
144             rv = self.parent.api.sockclnt_delete(index=self.socket_index)
145         except (IOError, vpp_papi.VPPApiError):
146             pass
147         self.connected = False
148         if self.socket is not None:
149             self.socket.close()
150         if self.sque is not None:
151             self.sque.put(True)  # Terminate listening thread
152         if self.message_thread is not None and self.message_thread.is_alive():
153             # Allow additional connect() calls.
154             self.message_thread.join()
155         # Wipe message table, VPP can be restarted with different plugins.
156         self.message_table = {}
157         # Collect garbage.
158         self.message_thread = None
159         self.socket = None
160         # Queues will be collected after connect replaces them.
161         return rv
162
163     def suspend(self):
164         pass
165
166     def resume(self):
167         pass
168
169     def callback(self):
170         raise NotImplementedError
171
172     def get_callback(self, do_async):
173         return self.callback
174
175     def get_msg_index(self, name):
176         try:
177             return self.message_table[name]
178         except KeyError:
179             return 0
180
181     def msg_table_max_index(self):
182         return len(self.message_table)
183
184     def write(self, buf):
185         """Send a binary-packed message to VPP."""
186         if not self.connected:
187             raise VppTransportSocketIOError(1, 'Not connected')
188
189         # Send header
190         header = self.header.pack(0, len(buf), 0)
191         n = self.socket.send(header)
192         n = self.socket.send(buf)
193         if n == 0:
194             raise VppTransportSocketIOError(1, 'Not connected')
195
196     def _read(self):
197         hdr = self.socket.recv(16)
198         if not hdr:
199             return
200         (_, hdrlen, _) = self.header.unpack(hdr)  # If at head of message
201
202         # Read rest of message
203         msg = self.socket.recv(hdrlen)
204         if hdrlen > len(msg):
205             nbytes = len(msg)
206             buf = bytearray(hdrlen)
207             view = memoryview(buf)
208             view[:nbytes] = msg
209             view = view[nbytes:]
210             left = hdrlen - nbytes
211             while left:
212                 nbytes = self.socket.recv_into(view, left)
213                 view = view[nbytes:]
214                 left -= nbytes
215             return buf
216         if hdrlen == len(msg):
217             return msg
218         raise VppTransportSocketIOError(1, 'Unknown socket read error')
219
220     def read(self):
221         if not self.connected:
222             raise VppTransportSocketIOError(1, 'Not connected')
223         try:
224             return self.q.get(True, self.read_timeout)
225         except queue.Empty:
226             return None