Python API: Synchronous mode.
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 import sys, os, logging, collections, struct, json, threading, glob
19 import atexit, Queue
20
21 logging.basicConfig(level=logging.DEBUG)
22 import vpp_api
23
24 def eprint(*args, **kwargs):
25     """Print critical diagnostics to stderr."""
26     print(*args, file=sys.stderr, **kwargs)
27
28 def vpp_atexit(self):
29     """Clean up VPP connection on shutdown."""
30     if self.connected:
31         eprint ('Cleaning up VPP on exit')
32         self.disconnect()
33
34
35 class Empty(object):
36     pass
37
38
39 class FuncWrapper(object):
40     def __init__(self, func):
41         self._func = func
42         self.__name__ = func.__name__
43
44     def __call__(self, **kwargs):
45         return self._func(**kwargs)
46
47
48 class VPP():
49     """VPP interface.
50
51     This class provides the APIs to VPP.  The APIs are loaded
52     from provided .api.json files and makes functions accordingly.
53     These functions are documented in the VPP .api files, as they
54     are dynamically created.
55
56     Additionally, VPP can send callback messages; this class
57     provides a means to register a callback function to receive
58     these messages in a background thread.
59     """
60     def __init__(self, apifiles = None, testmode = False, async_thread = True):
61         """Create a VPP API object.
62
63         apifiles is a list of files containing API
64         descriptions that will be loaded - methods will be
65         dynamically created reflecting these APIs.  If not
66         provided this will load the API files from VPP's
67         default install location.
68         """
69         self.messages = {}
70         self.id_names = []
71         self.id_msgdef = []
72         self.buffersize = 10000
73         self.connected = False
74         self.header = struct.Struct('>HI')
75         self.apifiles = []
76         self.event_callback = None
77         self.message_queue = Queue.Queue()
78         self.read_timeout = 0
79         self.vpp_api = vpp_api
80         if async_thread:
81             self.event_thread = threading.Thread(target=self.thread_msg_handler)
82             self.event_thread.daemon = True
83             self.event_thread.start()
84
85         if not apifiles:
86             # Pick up API definitions from default directory
87             apifiles = glob.glob('/usr/share/vpp/api/*.api.json')
88
89         for file in apifiles:
90             with open(file) as apidef_file:
91                 api = json.load(apidef_file)
92                 for t in api['types']:
93                     self.add_type(t[0], t[1:])
94
95                 for m in api['messages']:
96                     self.add_message(m[0], m[1:])
97         self.apifiles = apifiles
98
99         # Basic sanity check
100         if len(self.messages) == 0 and not testmode:
101             raise ValueError(1, 'Missing JSON message definitions')
102
103         # Make sure we allow VPP to clean up the message rings.
104         atexit.register(vpp_atexit, self)
105
106     class ContextId(object):
107         """Thread-safe provider of unique context IDs."""
108         def __init__(self):
109             self.context = 0
110             self.lock = threading.Lock()
111         def __call__(self):
112             """Get a new unique (or, at least, not recently used) context."""
113             with self.lock:
114                 self.context += 1
115                 return self.context
116     get_context = ContextId()
117
118     def status(self):
119         """Debug function: report current VPP API status to stdout."""
120         print('Connected') if self.connected else print('Not Connected')
121         print('Read API definitions from', ', '.join(self.apifiles))
122
123     def __struct (self, t, n = None, e = -1, vl = None):
124         """Create a packing structure for a message."""
125         base_types = { 'u8' : 'B',
126                        'u16' : 'H',
127                        'u32' : 'I',
128                        'i32' : 'i',
129                        'u64' : 'Q',
130                        'f64' : 'd',
131                        }
132         pack = None
133         if t in base_types:
134             pack = base_types[t]
135             if not vl:
136                 if e > 0 and t == 'u8':
137                     # Fixed byte array
138                     return struct.Struct('>' + str(e) + 's')
139                 if e > 0:
140                     # Fixed array of base type
141                     return [e, struct.Struct('>' + base_types[t])]
142                 elif e == 0:
143                     # Old style variable array
144                     return [-1, struct.Struct('>' + base_types[t])]
145             else:
146                 # Variable length array
147                 return [vl, struct.Struct('>s')] if t == 'u8' else \
148                     [vl, struct.Struct('>' + base_types[t])]
149
150             return struct.Struct('>' + base_types[t])
151
152         if t in self.messages:
153             ### Return a list in case of array ###
154             if e > 0 and not vl:
155                 return [e, lambda self, encode, buf, offset, args: (
156                     self.__struct_type(encode, self.messages[t], buf, offset,
157                                        args))]
158             if vl:
159                 return [vl, lambda self, encode, buf, offset, args: (
160                     self.__struct_type(encode, self.messages[t], buf, offset,
161                                        args))]
162             elif e == 0:
163                 # Old style VLA
164                 raise NotImplementedError(1, 'No support for compound types ' + t)
165             return lambda self, encode, buf, offset, args: (
166                 self.__struct_type(encode, self.messages[t], buf, offset, args)
167             )
168
169         raise ValueError(1, 'Invalid message type: ' + t)
170
171     def __struct_type(self, encode, msgdef, buf, offset, kwargs):
172         """Get a message packer or unpacker."""
173         if encode:
174             return self.__struct_type_encode(msgdef, buf, offset, kwargs)
175         else:
176             return self.__struct_type_decode(msgdef, buf, offset)
177
178     def __struct_type_encode(self, msgdef, buf, offset, kwargs):
179         off = offset
180         size = 0
181
182         for k in kwargs:
183             if k not in msgdef['args']:
184                 raise ValueError(1, 'Invalid field-name in message call ' + k)
185
186         for k,v in msgdef['args'].iteritems():
187             off += size
188             if k in kwargs:
189                 if type(v) is list:
190                     if callable(v[1]):
191                         e = kwargs[v[0]] if v[0] in kwargs else v[0]
192                         size = 0
193                         for i in range(e):
194                             size += v[1](self, True, buf, off + size,
195                                          kwargs[k][i])
196                     else:
197                         if v[0] in kwargs:
198                             l = kwargs[v[0]]
199                         else:
200                             l = len(kwargs[k])
201                         if v[1].size == 1:
202                             buf[off:off + l] = bytearray(kwargs[k])
203                             size = l
204                         else:
205                             size = 0
206                             for i in kwargs[k]:
207                                 v[1].pack_into(buf, off + size, i)
208                                 size += v[1].size
209                 else:
210                     if callable(v):
211                         size = v(self, True, buf, off, kwargs[k])
212                     else:
213                         v.pack_into(buf, off, kwargs[k])
214                         size = v.size
215             else:
216                 size = v.size if not type(v) is list else 0
217
218         return off + size - offset
219
220
221     def __getitem__(self, name):
222         if name in self.messages:
223             return self.messages[name]
224         return None
225
226     def encode(self, msgdef, kwargs):
227         # Make suitably large buffer
228         buf = bytearray(self.buffersize)
229         offset = 0
230         size = self.__struct_type(True, msgdef, buf, offset, kwargs)
231         return buf[:offset + size]
232
233     def decode(self, msgdef, buf):
234         return self.__struct_type(False, msgdef, buf, 0, None)[1]
235
236     def __struct_type_decode(self, msgdef, buf, offset):
237         res = []
238         off = offset
239         size = 0
240         for k,v in msgdef['args'].iteritems():
241             off += size
242             if type(v) is list:
243                 lst = []
244                 if callable(v[1]): # compound type
245                     size = 0
246                     if v[0] in msgdef['args']: # vla
247                         e = res[v[2]]
248                     else: # fixed array
249                         e = v[0]
250                     res.append(lst)
251                     for i in range(e):
252                         (s,l) = v[1](self, False, buf, off + size, None)
253                         lst.append(l)
254                         size += s
255                     continue
256                 if v[1].size == 1:
257                     if type(v[0]) is int:
258                         size = len(buf) - off
259                     else:
260                         size = res[v[2]]
261                     res.append(buf[off:off + size])
262                 else:
263                     e = v[0] if type(v[0]) is int else res[v[2]]
264                     if e == -1:
265                         e = (len(buf) - off) / v[1].size
266                     lst = []
267                     res.append(lst)
268                     size = 0
269                     for i in range(e):
270                         lst.append(v[1].unpack_from(buf, off + size)[0])
271                         size += v[1].size
272             else:
273                 if callable(v):
274                     (s,l) = v(self, False, buf, off, None)
275                     res.append(l)
276                     size += s
277                 else:
278                     res.append(v.unpack_from(buf, off)[0])
279                     size = v.size
280
281         return off + size - offset, msgdef['return_tuple']._make(res)
282
283     def ret_tup(self, name):
284         if name in self.messages and 'return_tuple' in self.messages[name]:
285             return self.messages[name]['return_tuple']
286         return None
287
288     def add_message(self, name, msgdef):
289         if name in self.messages:
290             raise ValueError('Duplicate message name: ' + name)
291
292         args = collections.OrderedDict()
293         argtypes = collections.OrderedDict()
294         fields = []
295         msg = {}
296         for i, f in enumerate(msgdef):
297             if type(f) is dict and 'crc' in f:
298                 msg['crc'] = f['crc']
299                 continue
300             field_type = f[0]
301             field_name = f[1]
302             if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
303                 raise ValueError('Variable Length Array must be last: ' + name)
304             args[field_name] = self.__struct(*f)
305             argtypes[field_name] = field_type
306             if len(f) == 4: # Find offset to # elements field
307                 args[field_name].append(args.keys().index(f[3]) - i)
308             fields.append(field_name)
309         msg['return_tuple'] = collections.namedtuple(name, fields,
310                                                      rename = True)
311         self.messages[name] = msg
312         self.messages[name]['args'] = args
313         self.messages[name]['argtypes'] = argtypes
314         return self.messages[name]
315
316     def add_type(self, name, typedef):
317         return self.add_message('vl_api_' + name + '_t', typedef)
318
319     def make_function(self, name, i, msgdef, multipart, async):
320         if (async):
321             f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
322         else:
323             f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart, **kwargs))
324         args = self.messages[name]['args']
325         argtypes = self.messages[name]['argtypes']
326         f.__name__ = str(name)
327         f.__doc__ = ", ".join(["%s %s" % (argtypes[k], k) for k in args.keys()])
328         return f
329
330     @property
331     def api(self):
332         if not hasattr(self, "_api"):
333             raise Exception("Not connected, api definitions not available")
334         return self._api
335
336     def _register_functions(self, async=False):
337         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
338         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
339         self._api = Empty()
340         for name, msgdef in self.messages.iteritems():
341             if name in self.vpp_dictionary:
342                 if self.messages[name]['crc'] != self.vpp_dictionary[name]['crc']:
343                     raise ValueError(3, 'Failed CRC checksum ' + name +
344                                      ' ' + self.messages[name]['crc'] +
345                                      ' ' + self.vpp_dictionary[name]['crc'])
346                 i = self.vpp_dictionary[name]['id']
347                 self.id_msgdef[i] = msgdef
348                 self.id_names[i] = name
349                 multipart = True if name.find('_dump') > 0 else False
350                 f = self.make_function(name, i, msgdef, multipart, async)
351                 setattr(self._api, name, FuncWrapper(f))
352
353                 # old API stuff starts here - will be removed in 17.07
354                 if hasattr(self, name):
355                     raise NameError(
356                         3, "Conflicting name in JSON definition: `%s'" % name)
357                 setattr(self, name, f)
358                 # old API stuff ends here
359
360     def _write (self, buf):
361         """Send a binary-packed message to VPP."""
362         if not self.connected:
363             raise IOError(1, 'Not connected')
364         return vpp_api.write(str(buf))
365
366     def _read (self):
367         if not self.connected:
368             raise IOError(1, 'Not connected')
369
370         return vpp_api.read(self.read_timeout)
371
372     def _load_dictionary(self):
373         self.vpp_dictionary = {}
374         self.vpp_dictionary_maxid = 0
375         d = vpp_api.msg_table()
376
377         if not d:
378             raise IOError(3, 'Cannot get VPP API dictionary')
379         for i,n in d:
380             name, crc =  n.rsplit('_', 1)
381             crc = '0x' + crc
382             self.vpp_dictionary[name] = { 'id' : i, 'crc' : crc }
383             self.vpp_dictionary_maxid = max(self.vpp_dictionary_maxid, i)
384
385     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, async):
386         rv = vpp_api.connect(name, msg_handler, chroot_prefix, rx_qlen)
387         if rv != 0:
388             raise IOError(2, 'Connect failed')
389         self.connected = True
390
391         self._load_dictionary()
392         self._register_functions(async=async)
393
394         # Initialise control ping
395         self.control_ping_index = self.vpp_dictionary['control_ping']['id']
396         self.control_ping_msgdef = self.messages['control_ping']
397
398     def connect(self, name, chroot_prefix = None, async = False, rx_qlen = 32):
399         """Attach to VPP.
400
401         name - the name of the client.
402         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
403         async - if true, messages are sent without waiting for a reply
404         rx_qlen - the length of the VPP message receive queue between
405         client and server.
406         """
407         msg_handler = self.msg_handler_sync if not async \
408                       else self.msg_handler_async
409         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
410                                      async)
411
412     def connect_sync (self, name, chroot_prefix = None, rx_qlen = 32):
413         """Attach to VPP in synchronous mode. Application must poll for events.
414
415         name - the name of the client.
416         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
417         rx_qlen - the length of the VPP message receive queue between
418         client and server.
419         """
420
421         return self.connect_internal(name, None, chroot_prefix, rx_qlen,
422                                      async=False)
423
424     def disconnect(self):
425         """Detach from VPP."""
426         rv = vpp_api.disconnect()
427         self.connected = False
428         return rv
429
430     def msg_handler_sync(self, msg):
431         """Process an incoming message from VPP in sync mode.
432
433         The message may be a reply or it may be an async notification.
434         """
435         r = self.decode_incoming_msg(msg)
436         if r is None:
437             return
438
439         # If we have a context, then use the context to find any
440         # request waiting for a reply
441         context = 0
442         if hasattr(r, 'context') and r.context > 0:
443             context = r.context
444
445         msgname = type(r).__name__
446
447         if context == 0:
448             # No context -> async notification that we feed to the callback
449             self.message_queue.put_nowait(r)
450         else:
451             raise IOError(2, 'RPC reply message received in event handler')
452
453     def decode_incoming_msg(self, msg):
454         if not msg:
455             eprint('vpp_api.read failed')
456             return
457
458         i, ci = self.header.unpack_from(msg, 0)
459         if self.id_names[i] == 'rx_thread_exit':
460             return
461
462         #
463         # Decode message and returns a tuple.
464         #
465         msgdef = self.id_msgdef[i]
466         if not msgdef:
467             raise IOError(2, 'Reply message undefined')
468
469         r = self.decode(msgdef, msg)
470
471         return r
472
473     def msg_handler_async(self, msg):
474         """Process a message from VPP in async mode.
475
476         In async mode, all messages are returned to the callback.
477         """
478         r = self.decode_incoming_msg(msg)
479         if r is None:
480             return
481
482         msgname = type(r).__name__
483
484         if self.event_callback:
485             self.event_callback(msgname, r)
486
487     def _control_ping(self, context):
488         """Send a ping command."""
489         self._call_vpp_async(self.control_ping_index,
490                              self.control_ping_msgdef,
491                              context=context)
492
493     def _call_vpp(self, i, msgdef, multipart, **kwargs):
494         """Given a message, send the message and await a reply.
495
496         msgdef - the message packing definition
497         i - the message type index
498         multipart - True if the message returns multiple
499         messages in return.
500         context - context number - chosen at random if not
501         supplied.
502         The remainder of the kwargs are the arguments to the API call.
503
504         The return value is the message or message array containing
505         the response.  It will raise an IOError exception if there was
506         no response within the timeout window.
507         """
508
509         if not 'context' in kwargs:
510             context = self.get_context()
511             kwargs['context'] = context
512         else:
513             context = kwargs['context']
514         kwargs['_vl_msg_id'] = i
515         b = self.encode(msgdef, kwargs)
516
517         vpp_api.suspend()
518         self._write(b)
519
520         if multipart:
521             # Send a ping after the request - we use its response
522             # to detect that we have seen all results.
523             self._control_ping(context)
524
525         # Block until we get a reply.
526         rl = []
527         while (True):
528             msg = self._read()
529             if not msg:
530                 print('PNEUM ERROR: OH MY GOD')
531                 raise IOError(2, 'PNEUM read failed')
532
533             r = self.decode_incoming_msg(msg)
534             msgname = type(r).__name__
535             if not context in r or r.context == 0 or context != r.context:
536                 self.message_queue.put_nowait(r)
537                 continue
538
539             if not multipart:
540                 rl = r
541                 break
542             if msgname == 'control_ping_reply':
543                 break
544
545             rl.append(r)
546
547         vpp_api.resume()
548
549         return rl
550
551     def _call_vpp_async(self, i, msgdef, **kwargs):
552         """Given a message, send the message and await a reply.
553
554         msgdef - the message packing definition
555         i - the message type index
556         context - context number - chosen at random if not
557         supplied.
558         The remainder of the kwargs are the arguments to the API call.
559         """
560         if not 'context' in kwargs:
561             context = self.get_context()
562             kwargs['context'] = context
563         else:
564             context = kwargs['context']
565         kwargs['_vl_msg_id'] = i
566         b = self.encode(msgdef, kwargs)
567
568         self._write(b)
569
570     def register_event_callback(self, callback):
571         """Register a callback for async messages.
572
573         This will be called for async notifications in sync mode,
574         and all messages in async mode.  In sync mode, replies to
575         requests will not come here.
576
577         callback is a fn(msg_type_name, msg_type) that will be
578         called when a message comes in.  While this function is
579         executing, note that (a) you are in a background thread and
580         may wish to use threading.Lock to protect your datastructures,
581         and (b) message processing from VPP will stop (so if you take
582         a long while about it you may provoke reply timeouts or cause
583         VPP to fill the RX buffer).  Passing None will disable the
584         callback.
585         """
586         self.event_callback = callback
587
588     def thread_msg_handler(self):
589         """Python thread calling the user registerd message handler.
590
591         This is to emulate the old style event callback scheme. Modern
592         clients should provide their own thread to poll the event
593         queue.
594         """
595         while True:
596             r = self.message_queue.get()
597             msgname = type(r).__name__
598             if self.event_callback:
599                 self.event_callback(msgname, r)