393e2e9c9c172068847ac1d3917b987dad0e492b
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_transport_socket.py
1 #
2 # VPP Unix Domain Socket Transport.
3 #
4 import socket
5 import struct
6 import threading
7 import select
8 import multiprocessing
9 try:
10     import queue as queue
11 except ImportError:
12     import Queue as queue
13 import logging
14
15
16 class VppTransportSocketIOError(IOError):
17     pass
18
19
20 class VppTransport(object):
21     VppTransportSocketIOError = VppTransportSocketIOError
22
23     def __init__(self, parent, read_timeout, server_address):
24         self.connected = False
25         self.read_timeout = read_timeout if read_timeout > 0 else 1
26         self.parent = parent
27         self.server_address = server_address
28         self.header = struct.Struct('>QII')
29         self.message_table = {}
30         self.sque = multiprocessing.Queue()
31         self.q = multiprocessing.Queue()
32         self.message_thread = threading.Thread(target=self.msg_thread_func)
33
34     def msg_thread_func(self):
35         while True:
36             try:
37                 rlist, _, _ = select.select([self.socket,
38                                              self.sque._reader], [], [])
39             except socket.error:
40                 # Terminate thread
41                 logging.error('select failed')
42                 self.q.put(None)
43                 return
44
45             for r in rlist:
46                 if r == self.sque._reader:
47                     # Terminate
48                     self.q.put(None)
49                     return
50
51                 elif r == self.socket:
52                     try:
53                         msg = self._read()
54                         if not msg:
55                             self.q.put(None)
56                             return
57                     except socket.error:
58                         self.q.put(None)
59                         return
60                     # Put either to local queue or if context == 0
61                     # callback queue
62                     r = self.parent.decode_incoming_msg(msg)
63                     if hasattr(r, 'context') and r.context > 0:
64                         self.q.put(msg)
65                     else:
66                         self.parent.msg_handler_async(msg)
67                 else:
68                     raise VppTransportSocketIOError(
69                         2, 'Unknown response from select')
70
71     def connect(self, name, pfx, msg_handler, rx_qlen):
72
73         # Create a UDS socket
74         self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
75         self.socket.settimeout(self.read_timeout)
76
77         # Connect the socket to the port where the server is listening
78         try:
79             self.socket.connect(self.server_address)
80         except socket.error as msg:
81             logging.error("{} on socket {}".format(msg, self.server_address))
82             raise
83
84         self.connected = True
85         # Initialise sockclnt_create
86         sockclnt_create = self.parent.messages['sockclnt_create']
87         sockclnt_create_reply = self.parent.messages['sockclnt_create_reply']
88
89         args = {'_vl_msg_id': 15,
90                 'name': name,
91                 'context': 124}
92         b = sockclnt_create.pack(args)
93         self.write(b)
94         msg = self._read()
95         hdr, length = self.parent.header.unpack(msg, 0)
96         if hdr.msgid != 16:
97             raise VppTransportSocketIOError('Invalid reply message')
98
99         r, length = sockclnt_create_reply.unpack(msg)
100         self.socket_index = r.index
101         for m in r.message_table:
102             n = m.name.rstrip(b'\x00\x13')
103             self.message_table[n] = m.index
104
105         self.message_thread.daemon = True
106         self.message_thread.start()
107
108         return 0
109
110     def disconnect(self):
111         rv = 0
112         try:  # Might fail, if VPP closes socket before packet makes it out
113             rv = self.parent.api.sockclnt_delete(index=self.socket_index)
114         except IOError:
115             pass
116         self.connected = False
117         self.socket.close()
118         self.sque.put(True)  # Terminate listening thread
119         self.message_thread.join()
120         return rv
121
122     def suspend(self):
123         pass
124
125     def resume(self):
126         pass
127
128     def callback(self):
129         raise NotImplementedError
130
131     def get_callback(self, do_async):
132         return self.callback
133
134     def get_msg_index(self, name):
135         try:
136             return self.message_table[name]
137         except KeyError:
138             return 0
139
140     def msg_table_max_index(self):
141         return len(self.message_table)
142
143     def write(self, buf):
144         """Send a binary-packed message to VPP."""
145         if not self.connected:
146             raise VppTransportSocketIOError(1, 'Not connected')
147
148         # Send header
149         header = self.header.pack(0, len(buf), 0)
150         n = self.socket.send(header)
151         n = self.socket.send(buf)
152
153     def _read(self):
154         # Header and message
155         try:
156             msg = self.socket.recv(4096)
157             if len(msg) == 0:
158                 return None
159         except socket.error as message:
160             logging.error(message)
161             raise
162
163         (_, l, _) = self.header.unpack(msg[:16])
164
165         if l > len(msg):
166             buf = bytearray(l + 16)
167             view = memoryview(buf)
168             view[:4096] = msg
169             view = view[4096:]
170             # Read rest of message
171             remaining_bytes = l - 4096 + 16
172             while remaining_bytes > 0:
173                 bytes_to_read = (remaining_bytes if remaining_bytes
174                                  <= 4096 else 4096)
175                 nbytes = self.socket.recv_into(view, bytes_to_read)
176                 if nbytes == 0:
177                     logging.error('recv failed')
178                     break
179                 view = view[nbytes:]
180                 remaining_bytes -= nbytes
181         else:
182             buf = msg
183         return buf[16:]
184
185     def read(self):
186         if not self.connected:
187             raise VppTransportSocketIOError(1, 'Not connected')
188         try:
189             return self.q.get(True, self.read_timeout)
190         except queue.Empty:
191             return None