VPP-1508: vpp_transport_socket.py fix import
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_transport_socket.py
1 #
2 # VPP Unix Domain Socket Transport.
3 #
4 import socket
5 import struct
6 import threading
7 import select
8 import multiprocessing
9 try:
10     import queue as queue
11 except ImportError:
12     import Queue as queue
13 import logging
14
15
16 class VppTransport(object):
17     def __init__(self, parent, read_timeout, server_address):
18         self.connected = False
19         self.read_timeout = read_timeout if read_timeout > 0 else 1
20         self.parent = parent
21         self.server_address = server_address
22         self.header = struct.Struct('>QII')
23         self.message_table = {}
24         self.sque = multiprocessing.Queue()
25         self.q = multiprocessing.Queue()
26         self.message_thread = threading.Thread(target=self.msg_thread_func)
27
28     def msg_thread_func(self):
29         while True:
30             try:
31                 rlist, _, _ = select.select([self.socket,
32                                              self.sque._reader], [], [])
33             except socket.error:
34                 # Terminate thread
35                 logging.error('select failed')
36                 self.q.put(None)
37                 return
38
39             for r in rlist:
40                 if r == self.sque._reader:
41                     # Terminate
42                     self.q.put(None)
43                     return
44
45                 elif r == self.socket:
46                     try:
47                         msg = self._read()
48                         if not msg:
49                             self.q.put(None)
50                             return
51                     except socket.error:
52                         self.q.put(None)
53                         return
54                     # Put either to local queue or if context == 0
55                     # callback queue
56                     r = self.parent.decode_incoming_msg(msg)
57                     if hasattr(r, 'context') and r.context > 0:
58                         self.q.put(msg)
59                     else:
60                         self.parent.msg_handler_async(msg)
61                 else:
62                     raise IOError(2, 'Unknown response from select')
63
64     def connect(self, name, pfx, msg_handler, rx_qlen):
65
66         # Create a UDS socket
67         self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
68         self.socket.settimeout(self.read_timeout)
69
70         # Connect the socket to the port where the server is listening
71         try:
72             self.socket.connect(self.server_address)
73         except socket.error as msg:
74             logging.error("{} on socket {}".format(msg, self.server_address))
75             raise
76
77         self.connected = True
78         # Initialise sockclnt_create
79         sockclnt_create = self.parent.messages['sockclnt_create']
80         sockclnt_create_reply = self.parent.messages['sockclnt_create_reply']
81
82         args = {'_vl_msg_id': 15,
83                 'name': name,
84                 'context': 124}
85         b = sockclnt_create.pack(args)
86         self.write(b)
87         msg = self._read()
88         hdr, length = self.parent.header.unpack(msg, 0)
89         if hdr.msgid != 16:
90             raise IOError('Invalid reply message')
91
92         r, length = sockclnt_create_reply.unpack(msg)
93         self.socket_index = r.index
94         for m in r.message_table:
95             n = m.name.rstrip(b'\x00\x13')
96             self.message_table[n] = m.index
97
98         self.message_thread.daemon = True
99         self.message_thread.start()
100
101         return 0
102
103     def disconnect(self):
104         rv = 0
105         try:  # Might fail, if VPP closes socket before packet makes it out
106             rv = self.parent.api.sockclnt_delete(index=self.socket_index)
107         except IOError:
108             pass
109         self.connected = False
110         self.socket.close()
111         self.sque.put(True)  # Terminate listening thread
112         self.message_thread.join()
113         return rv
114
115     def suspend(self):
116         pass
117
118     def resume(self):
119         pass
120
121     def callback(self):
122         raise NotImplemented
123
124     def get_callback(self, do_async):
125         return self.callback
126
127     def get_msg_index(self, name):
128         try:
129             return self.message_table[name]
130         except KeyError:
131             return 0
132
133     def msg_table_max_index(self):
134         return len(self.message_table)
135
136     def write(self, buf):
137         """Send a binary-packed message to VPP."""
138         if not self.connected:
139             raise IOError(1, 'Not connected')
140
141         # Send header
142         header = self.header.pack(0, len(buf), 0)
143         n = self.socket.send(header)
144         n = self.socket.send(buf)
145
146     def _read(self):
147         # Header and message
148         try:
149             msg = self.socket.recv(4096)
150             if len(msg) == 0:
151                 return None
152         except socket.error as message:
153             logging.error(message)
154             raise
155
156         (_, l, _) = self.header.unpack(msg[:16])
157
158         if l > len(msg):
159             buf = bytearray(l + 16)
160             view = memoryview(buf)
161             view[:4096] = msg
162             view = view[4096:]
163             # Read rest of message
164             remaining_bytes = l - 4096 + 16
165             while remaining_bytes > 0:
166                 bytes_to_read = (remaining_bytes if remaining_bytes
167                                  <= 4096 else 4096)
168                 nbytes = self.socket.recv_into(view, bytes_to_read)
169                 if nbytes == 0:
170                     logging.error('recv failed')
171                     break
172                 view = view[nbytes:]
173                 remaining_bytes -= nbytes
174         else:
175             buf = msg
176         return buf[16:]
177
178     def read(self):
179         if not self.connected:
180             raise IOError(1, 'Not connected')
181         try:
182             return self.q.get(True, self.read_timeout)
183         except queue.Empty:
184             return None