hs-test: fixed timed out tests passing in the CI
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_transport_socket.py
1 #
2 # VPP Unix Domain Socket Transport.
3 #
4 import socket
5 import struct
6 import threading
7 import select
8 import multiprocessing
9 import queue
10 import logging
11
12 logger = logging.getLogger("vpp_papi.transport")
13 logger.addHandler(logging.NullHandler())
14
15
16 class VppTransportSocketIOError(IOError):
17     # TODO: Document different values of error number (first numeric argument).
18     pass
19
20
21 class VppTransport:
22     VppTransportSocketIOError = VppTransportSocketIOError
23
24     def __init__(self, parent, read_timeout, server_address):
25         self.connected = False
26         self.read_timeout = read_timeout if read_timeout > 0 else None
27         self.parent = parent
28         self.server_address = server_address
29         self.header = struct.Struct(">QII")
30         self.message_table = {}
31         # These queues can be accessed async.
32         # They are always up, but replaced on connect.
33         # TODO: Use multiprocessing.Pipe instead of multiprocessing.Queue
34         # if possible.
35         self.sque = multiprocessing.Queue()
36         self.q = multiprocessing.Queue()
37         # The following fields are set in connect().
38         self.message_thread = None
39         self.socket = None
40
41     def msg_thread_func(self):
42         while True:
43             try:
44                 rlist, _, _ = select.select([self.socket, self.sque._reader], [], [])
45             except (socket.error, ValueError):
46                 # Terminate thread
47                 logging.error("select failed")
48                 self.q.put(None)
49                 return
50
51             for r in rlist:
52                 if r == self.sque._reader:
53                     # Terminate
54                     self.q.put(None)
55                     return
56
57                 elif r == self.socket:
58                     try:
59                         msg = self._read()
60                         if not msg:
61                             self.q.put(None)
62                             return
63                     except socket.error:
64                         self.q.put(None)
65                         return
66                     # Put either to local queue or if context == 0
67                     # callback queue
68                     if not self.do_async and self.parent.has_context(msg):
69                         self.q.put(msg)
70                     else:
71                         self.parent.msg_handler_async(msg)
72                 else:
73                     raise VppTransportSocketIOError(2, "Unknown response from select")
74
75     def connect(self, name, pfx, msg_handler, rx_qlen, do_async=False):
76         # TODO: Reorder the actions and add "roll-backs",
77         # to restore clean disconnect state when failure happens durng connect.
78
79         if self.message_thread is not None:
80             raise VppTransportSocketIOError(
81                 1, "PAPI socket transport connect: Need to disconnect first."
82             )
83
84         # Create a UDS socket
85         self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
86         self.socket.settimeout(self.read_timeout)
87
88         # Connect the socket to the port where the server is listening
89         try:
90             self.socket.connect(self.server_address)
91         except socket.error as msg:
92             # logging.error("{} on socket {}".format(msg, self.server_address))
93             raise msg
94
95         self.connected = True
96
97         # Queues' feeder threads from previous connect may still be sending.
98         # Close and join to avoid any errors.
99         self.sque.close()
100         self.q.close()
101         self.sque.join_thread()
102         self.q.join_thread()
103         # Finally safe to replace.
104         self.sque = multiprocessing.Queue()
105         self.q = multiprocessing.Queue()
106         self.message_thread = threading.Thread(target=self.msg_thread_func)
107
108         # Initialise sockclnt_create
109         sockclnt_create = self.parent.messages["sockclnt_create"]
110         sockclnt_create_reply = self.parent.messages["sockclnt_create_reply"]
111
112         args = {"_vl_msg_id": 15, "name": name, "context": 124}
113         b = sockclnt_create.pack(args)
114         self.write(b)
115         msg = self._read()
116         hdr, length = self.parent.header.unpack(msg, 0)
117         if hdr.msgid != 16:
118             # TODO: Add first numeric argument.
119             raise VppTransportSocketIOError("Invalid reply message")
120
121         r, length = sockclnt_create_reply.unpack(msg)
122         self.socket_index = r.index
123         for m in r.message_table:
124             n = m.name
125             self.message_table[n] = m.index
126
127         self.message_thread.daemon = True
128         self.do_async = do_async
129         self.message_thread.start()
130
131         return 0
132
133     def disconnect(self):
134         # TODO: Support repeated disconnect calls, recommend users to call
135         # disconnect when they are not sure what the state is after failures.
136         # TODO: Any volunteer for comprehensive docstrings?
137         rv = 0
138         try:
139             # Might fail, if VPP closes socket before packet makes it out,
140             # or if there was a failure during connect().
141             # TODO: manually build message so that .disconnect releases server-side resources
142             rv = self.parent.api.sockclnt_delete(index=self.socket_index)
143         except (IOError, self.parent.VPPApiError):
144             pass
145         self.connected = False
146         if self.socket is not None:
147             self.socket.close()
148         if self.sque is not None:
149             self.sque.put(True)  # Terminate listening thread
150         if self.message_thread is not None and self.message_thread.is_alive():
151             # Allow additional connect() calls.
152             self.message_thread.join()
153         # Wipe message table, VPP can be restarted with different plugins.
154         self.message_table = {}
155         # Collect garbage.
156         self.message_thread = None
157         self.socket = None
158         # Queues will be collected after connect replaces them.
159         return rv
160
161     def suspend(self):
162         pass
163
164     def resume(self):
165         pass
166
167     def callback(self):
168         raise NotImplementedError
169
170     def get_callback(self, do_async):
171         return self.callback
172
173     def get_msg_index(self, name):
174         try:
175             return self.message_table[name]
176         except KeyError:
177             return 0
178
179     def msg_table_max_index(self):
180         return len(self.message_table)
181
182     def write(self, buf):
183         """Send a binary-packed message to VPP."""
184         if not self.connected:
185             raise VppTransportSocketIOError(1, "Not connected")
186
187         # Send header
188         header = self.header.pack(0, len(buf), 0)
189         try:
190             self.socket.sendall(header)
191             self.socket.sendall(buf)
192         except socket.error as err:
193             raise VppTransportSocketIOError(1, "Sendall error: {err!r}".format(err=err))
194
195     def _read_fixed(self, size):
196         """Repeat receive until fixed size is read. Return empty on error."""
197         buf = bytearray(size)
198         view = memoryview(buf)
199         left = size
200         while 1:
201             got = self.socket.recv_into(view, left)
202             if got <= 0:
203                 # Read error.
204                 return ""
205             if got >= left:
206                 # TODO: Raise if got > left?
207                 break
208             left -= got
209             view = view[got:]
210         return buf
211
212     def _read(self):
213         """Read single complete message, return it or empty on error."""
214         hdr = self._read_fixed(16)
215         if not hdr:
216             return
217         (_, hdrlen, _) = self.header.unpack(hdr)  # If at head of message
218
219         # Read rest of message
220         msg = self._read_fixed(hdrlen)
221         if hdrlen == len(msg):
222             return msg
223         raise VppTransportSocketIOError(1, "Unknown socket read error")
224
225     def read(self, timeout=None):
226         if not self.connected:
227             raise VppTransportSocketIOError(1, "Not connected")
228         if timeout is None:
229             timeout = self.read_timeout
230         try:
231             return self.q.get(True, timeout)
232         except queue.Empty:
233             return None