4341cad3e908cf290ca2535d6c2f1fb3a1344ece
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_transport_socket.py
1 #
2 # VPP Unix Domain Socket Transport.
3 #
4 import socket
5 import struct
6 import threading
7 import select
8 import multiprocessing
9 try:
10     import queue as queue
11 except ImportError:
12     import Queue as queue
13 import logging
14
15
16 class VppTransportSocketIOError(IOError):
17     pass
18
19
20 class VppTransport(object):
21     VppTransportSocketIOError = VppTransportSocketIOError
22
23     def __init__(self, parent, read_timeout, server_address):
24         self.connected = False
25         self.read_timeout = read_timeout if read_timeout > 0 else 1
26         self.parent = parent
27         self.server_address = server_address
28         self.header = struct.Struct('>QII')
29         self.message_table = {}
30         self.sque = multiprocessing.Queue()
31         self.q = multiprocessing.Queue()
32         self.message_thread = threading.Thread(target=self.msg_thread_func)
33
34     def msg_thread_func(self):
35         while True:
36             try:
37                 rlist, _, _ = select.select([self.socket,
38                                              self.sque._reader], [], [])
39             except socket.error:
40                 # Terminate thread
41                 logging.error('select failed')
42                 self.q.put(None)
43                 return
44
45             for r in rlist:
46                 if r == self.sque._reader:
47                     # Terminate
48                     self.q.put(None)
49                     return
50
51                 elif r == self.socket:
52                     try:
53                         msg = self._read()
54                         if not msg:
55                             self.q.put(None)
56                             return
57                     except socket.error:
58                         self.q.put(None)
59                         return
60                     # Put either to local queue or if context == 0
61                     # callback queue
62                     if self.parent.has_context(msg):
63                         self.q.put(msg)
64                     else:
65                         self.parent.msg_handler_async(msg)
66                 else:
67                     raise VppTransportSocketIOError(
68                         2, 'Unknown response from select')
69
70     def connect(self, name, pfx, msg_handler, rx_qlen):
71
72         # Create a UDS socket
73         self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
74         self.socket.settimeout(self.read_timeout)
75
76         # Connect the socket to the port where the server is listening
77         try:
78             self.socket.connect(self.server_address)
79         except socket.error as msg:
80             logging.error("{} on socket {}".format(msg, self.server_address))
81             raise
82
83         self.connected = True
84         # Initialise sockclnt_create
85         sockclnt_create = self.parent.messages['sockclnt_create']
86         sockclnt_create_reply = self.parent.messages['sockclnt_create_reply']
87
88         args = {'_vl_msg_id': 15,
89                 'name': name,
90                 'context': 124}
91         b = sockclnt_create.pack(args)
92         self.write(b)
93         msg = self._read()
94         hdr, length = self.parent.header.unpack(msg, 0)
95         if hdr.msgid != 16:
96             raise VppTransportSocketIOError('Invalid reply message')
97
98         r, length = sockclnt_create_reply.unpack(msg)
99         self.socket_index = r.index
100         for m in r.message_table:
101             n = m.name.rstrip(b'\x00\x13')
102             self.message_table[n] = m.index
103
104         self.message_thread.daemon = True
105         self.message_thread.start()
106
107         return 0
108
109     def disconnect(self):
110         rv = 0
111         try:  # Might fail, if VPP closes socket before packet makes it out
112             rv = self.parent.api.sockclnt_delete(index=self.socket_index)
113         except IOError:
114             pass
115         self.connected = False
116         self.socket.close()
117         self.sque.put(True)  # Terminate listening thread
118         self.message_thread.join()
119         return rv
120
121     def suspend(self):
122         pass
123
124     def resume(self):
125         pass
126
127     def callback(self):
128         raise NotImplementedError
129
130     def get_callback(self, do_async):
131         return self.callback
132
133     def get_msg_index(self, name):
134         try:
135             return self.message_table[name]
136         except KeyError:
137             return 0
138
139     def msg_table_max_index(self):
140         return len(self.message_table)
141
142     def write(self, buf):
143         """Send a binary-packed message to VPP."""
144         if not self.connected:
145             raise VppTransportSocketIOError(1, 'Not connected')
146
147         # Send header
148         header = self.header.pack(0, len(buf), 0)
149         n = self.socket.send(header)
150         n = self.socket.send(buf)
151
152     def _read(self):
153         # Header and message
154         try:
155             msg = self.socket.recv(4096)
156             if len(msg) == 0:
157                 return None
158         except socket.error as message:
159             logging.error(message)
160             raise
161
162         (_, l, _) = self.header.unpack(msg[:16])
163
164         if l > len(msg):
165             buf = bytearray(l + 16)
166             view = memoryview(buf)
167             view[:4096] = msg
168             view = view[4096:]
169             # Read rest of message
170             remaining_bytes = l - 4096 + 16
171             while remaining_bytes > 0:
172                 bytes_to_read = (remaining_bytes if remaining_bytes
173                                  <= 4096 else 4096)
174                 nbytes = self.socket.recv_into(view, bytes_to_read)
175                 if nbytes == 0:
176                     logging.error('recv failed')
177                     break
178                 view = view[nbytes:]
179                 remaining_bytes -= nbytes
180         else:
181             buf = msg
182         return buf[16:]
183
184     def read(self):
185         if not self.connected:
186             raise VppTransportSocketIOError(1, 'Not connected')
187         try:
188             return self.q.get(True, self.read_timeout)
189         except queue.Empty:
190             return None