vpp_papi: Reserved keywords.
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_transport_socket.py
1 #
2 # VPP Unix Domain Socket Transport.
3 #
4 import socket
5 import struct
6 import threading
7 import select
8 import multiprocessing
9 import queue
10 import logging
11
12
13 class VppTransport(object):
14     def __init__(self, parent, read_timeout, server_address):
15         self.connected = False
16         self.read_timeout = read_timeout if read_timeout > 0 else 1
17         self.parent = parent
18         self.server_address = server_address
19         self.header = struct.Struct('>QII')
20         self.message_table = {}
21         self.sque = multiprocessing.Queue()
22         self.q = multiprocessing.Queue()
23         self.message_thread = threading.Thread(target=self.msg_thread_func)
24
25     def msg_thread_func(self):
26         while True:
27             try:
28                 rlist, _, _ = select.select([self.socket,
29                                              self.sque._reader], [], [])
30             except socket.error:
31                 # Terminate thread
32                 logging.error('select failed')
33                 self.q.put(None)
34                 return
35
36             for r in rlist:
37                 if r == self.sque._reader:
38                     # Terminate
39                     self.q.put(None)
40                     return
41
42                 elif r == self.socket:
43                     try:
44                         msg = self._read()
45                         if not msg:
46                             self.q.put(None)
47                             return
48                     except socket.error:
49                         self.q.put(None)
50                         return
51                     # Put either to local queue or if context == 0
52                     # callback queue
53                     r = self.parent.decode_incoming_msg(msg)
54                     if hasattr(r, 'context') and r.context > 0:
55                         self.q.put(msg)
56                     else:
57                         self.parent.msg_handler_async(msg)
58                 else:
59                     raise IOError(2, 'Unknown response from select')
60
61     def connect(self, name, pfx, msg_handler, rx_qlen):
62
63         # Create a UDS socket
64         self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
65         self.socket.settimeout(self.read_timeout)
66
67         # Connect the socket to the port where the server is listening
68         try:
69             self.socket.connect(self.server_address)
70         except socket.error as msg:
71             logging.error("{} on socket {}".format(msg, self.server_address))
72             raise
73
74         self.connected = True
75         # Initialise sockclnt_create
76         sockclnt_create = self.parent.messages['sockclnt_create']
77         sockclnt_create_reply = self.parent.messages['sockclnt_create_reply']
78
79         args = {'_vl_msg_id': 15,
80                 'name': name,
81                 'context': 124}
82         b = sockclnt_create.pack(args)
83         self.write(b)
84         msg = self._read()
85         hdr, length = self.parent.header.unpack(msg, 0)
86         if hdr.msgid != 16:
87             raise IOError('Invalid reply message')
88
89         r, length = sockclnt_create_reply.unpack(msg)
90         self.socket_index = r.index
91         for m in r.message_table:
92             n = m.name.rstrip(b'\x00\x13')
93             self.message_table[n] = m.index
94
95         self.message_thread.daemon = True
96         self.message_thread.start()
97
98         return 0
99
100     def disconnect(self):
101         rv = 0
102         try:  # Might fail, if VPP closes socket before packet makes it out
103             rv = self.parent.api.sockclnt_delete(index=self.socket_index)
104         except IOError:
105             pass
106         self.connected = False
107         self.socket.close()
108         self.sque.put(True)  # Terminate listening thread
109         self.message_thread.join()
110         return rv
111
112     def suspend(self):
113         pass
114
115     def resume(self):
116         pass
117
118     def callback(self):
119         raise NotImplemented
120
121     def get_callback(self, do_async):
122         return self.callback
123
124     def get_msg_index(self, name):
125         try:
126             return self.message_table[name]
127         except KeyError:
128             return 0
129
130     def msg_table_max_index(self):
131         return len(self.message_table)
132
133     def write(self, buf):
134         """Send a binary-packed message to VPP."""
135         if not self.connected:
136             raise IOError(1, 'Not connected')
137
138         # Send header
139         header = self.header.pack(0, len(buf), 0)
140         n = self.socket.send(header)
141         n = self.socket.send(buf)
142
143     def _read(self):
144         # Header and message
145         try:
146             msg = self.socket.recv(4096)
147             if len(msg) == 0:
148                 return None
149         except socket.error as message:
150             logging.error(message)
151             raise
152
153         (_, l, _) = self.header.unpack(msg[:16])
154
155         if l > len(msg):
156             buf = bytearray(l + 16)
157             view = memoryview(buf)
158             view[:4096] = msg
159             view = view[4096:]
160             # Read rest of message
161             remaining_bytes = l - 4096 + 16
162             while remaining_bytes > 0:
163                 bytes_to_read = (remaining_bytes if remaining_bytes
164                                  <= 4096 else 4096)
165                 nbytes = self.socket.recv_into(view, bytes_to_read)
166                 if nbytes == 0:
167                     logging.error('recv failed')
168                     break
169                 view = view[nbytes:]
170                 remaining_bytes -= nbytes
171         else:
172             buf = msg
173         return buf[16:]
174
175     def read(self):
176         if not self.connected:
177             raise IOError(1, 'Not connected')
178         try:
179             return self.q.get(True, self.read_timeout)
180         except queue.Empty:
181             return None