Python API: Change from cPython to CFFI.
[vpp.git] / src / vpp-api / python / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 import sys, os, logging, collections, struct, json, threading, glob
19 import atexit, Queue
20
21 from cffi import FFI
22 ffi = FFI()
23 ffi.cdef("""
24 typedef void (*pneum_callback_t)(unsigned char * data, int len);
25 typedef void (*pneum_error_callback_t)(void *, unsigned char *, int);
26 int pneum_connect(char * name, char * chroot_prefix, pneum_callback_t cb,
27     int rx_qlen);
28 int pneum_disconnect(void);
29 int pneum_read(char **data, int *l, unsigned short timeout);
30 int pneum_write(char *data, int len);
31 void pneum_free(void * msg);
32
33 int pneum_get_msg_index(unsigned char * name);
34 int pneum_msg_table_size(void);
35 int pneum_msg_table_max_index(void);
36
37 void pneum_rx_suspend (void);
38 void pneum_rx_resume (void);
39 void pneum_set_error_handler(pneum_error_callback_t);
40  """)
41
42 # Barfs on failure, no need to check success.
43 vpp_api = ffi.dlopen('libpneum.so')
44
45 def vpp_atexit(self):
46     """Clean up VPP connection on shutdown."""
47     if self.connected:
48         self.logger.debug('Cleaning up VPP on exit')
49         self.disconnect()
50
51 vpp_object = None
52
53 @ffi.callback("void(unsigned char *, int)")
54 def pneum_callback_sync(data, len):
55     vpp_object.msg_handler_sync(ffi.buffer(data, len))
56 @ffi.callback("void(unsigned char *, int)")
57 def pneum_callback_async(data, len):
58     vpp_object.msg_handler_async(ffi.buffer(data, len))
59 @ffi.callback("void(void *, unsigned char *, int)")
60 def pneum_error_handler(arg, msg, msg_len):
61     vpp_object.logger.warning("PNEUM: %s", ffi.string(msg, msg_len))
62
63 class Empty(object):
64     pass
65
66
67 class FuncWrapper(object):
68     def __init__(self, func):
69         self._func = func
70         self.__name__ = func.__name__
71
72     def __call__(self, **kwargs):
73         return self._func(**kwargs)
74
75
76 class VPP():
77     """VPP interface.
78
79     This class provides the APIs to VPP.  The APIs are loaded
80     from provided .api.json files and makes functions accordingly.
81     These functions are documented in the VPP .api files, as they
82     are dynamically created.
83
84     Additionally, VPP can send callback messages; this class
85     provides a means to register a callback function to receive
86     these messages in a background thread.
87     """
88     def __init__(self, apifiles = None, testmode = False, async_thread = True,
89                  logger = logging.getLogger('vpp_papi'), loglevel = 'debug'):
90         """Create a VPP API object.
91
92         apifiles is a list of files containing API
93         descriptions that will be loaded - methods will be
94         dynamically created reflecting these APIs.  If not
95         provided this will load the API files from VPP's
96         default install location.
97         """
98         global vpp_object
99         vpp_object = self
100         self.logger = logger
101         logging.basicConfig(level=getattr(logging, loglevel.upper()))
102
103         self.messages = {}
104         self.id_names = []
105         self.id_msgdef = []
106         self.buffersize = 10000
107         self.connected = False
108         self.header = struct.Struct('>HI')
109         self.apifiles = []
110         self.event_callback = None
111         self.message_queue = Queue.Queue()
112         self.read_timeout = 0
113         self.vpp_api = vpp_api
114         if async_thread:
115             self.event_thread = threading.Thread(target=self.thread_msg_handler)
116             self.event_thread.daemon = True
117             self.event_thread.start()
118
119         if not apifiles:
120             # Pick up API definitions from default directory
121             apifiles = glob.glob('/usr/share/vpp/api/*.api.json')
122
123         for file in apifiles:
124             with open(file) as apidef_file:
125                 api = json.load(apidef_file)
126                 for t in api['types']:
127                     self.add_type(t[0], t[1:])
128
129                 for m in api['messages']:
130                     self.add_message(m[0], m[1:])
131         self.apifiles = apifiles
132
133         # Basic sanity check
134         if len(self.messages) == 0 and not testmode:
135             raise ValueError(1, 'Missing JSON message definitions')
136
137         # Make sure we allow VPP to clean up the message rings.
138         atexit.register(vpp_atexit, self)
139
140         # Register error handler
141         vpp_api.pneum_set_error_handler(pneum_error_handler)
142
143     class ContextId(object):
144         """Thread-safe provider of unique context IDs."""
145         def __init__(self):
146             self.context = 0
147             self.lock = threading.Lock()
148         def __call__(self):
149             """Get a new unique (or, at least, not recently used) context."""
150             with self.lock:
151                 self.context += 1
152                 return self.context
153     get_context = ContextId()
154
155     def status(self):
156         """Debug function: report current VPP API status to stdout."""
157         print('Connected') if self.connected else print('Not Connected')
158         print('Read API definitions from', ', '.join(self.apifiles))
159
160     def __struct (self, t, n = None, e = -1, vl = None):
161         """Create a packing structure for a message."""
162         base_types = { 'u8' : 'B',
163                        'u16' : 'H',
164                        'u32' : 'I',
165                        'i32' : 'i',
166                        'u64' : 'Q',
167                        'f64' : 'd',
168                        }
169         pack = None
170         if t in base_types:
171             pack = base_types[t]
172             if not vl:
173                 if e > 0 and t == 'u8':
174                     # Fixed byte array
175                     return struct.Struct('>' + str(e) + 's')
176                 if e > 0:
177                     # Fixed array of base type
178                     return [e, struct.Struct('>' + base_types[t])]
179                 elif e == 0:
180                     # Old style variable array
181                     return [-1, struct.Struct('>' + base_types[t])]
182             else:
183                 # Variable length array
184                 return [vl, struct.Struct('>s')] if t == 'u8' else \
185                     [vl, struct.Struct('>' + base_types[t])]
186
187             return struct.Struct('>' + base_types[t])
188
189         if t in self.messages:
190             ### Return a list in case of array ###
191             if e > 0 and not vl:
192                 return [e, lambda self, encode, buf, offset, args: (
193                     self.__struct_type(encode, self.messages[t], buf, offset,
194                                        args))]
195             if vl:
196                 return [vl, lambda self, encode, buf, offset, args: (
197                     self.__struct_type(encode, self.messages[t], buf, offset,
198                                        args))]
199             elif e == 0:
200                 # Old style VLA
201                 raise NotImplementedError(1, 'No support for compound types ' + t)
202             return lambda self, encode, buf, offset, args: (
203                 self.__struct_type(encode, self.messages[t], buf, offset, args)
204             )
205
206         raise ValueError(1, 'Invalid message type: ' + t)
207
208     def __struct_type(self, encode, msgdef, buf, offset, kwargs):
209         """Get a message packer or unpacker."""
210         if encode:
211             return self.__struct_type_encode(msgdef, buf, offset, kwargs)
212         else:
213             return self.__struct_type_decode(msgdef, buf, offset)
214
215     def __struct_type_encode(self, msgdef, buf, offset, kwargs):
216         off = offset
217         size = 0
218
219         for k in kwargs:
220             if k not in msgdef['args']:
221                 raise ValueError(1, 'Invalid field-name in message call ' + k)
222
223         for k,v in msgdef['args'].iteritems():
224             off += size
225             if k in kwargs:
226                 if type(v) is list:
227                     if callable(v[1]):
228                         e = kwargs[v[0]] if v[0] in kwargs else v[0]
229                         size = 0
230                         for i in range(e):
231                             size += v[1](self, True, buf, off + size,
232                                          kwargs[k][i])
233                     else:
234                         if v[0] in kwargs:
235                             l = kwargs[v[0]]
236                         else:
237                             l = len(kwargs[k])
238                         if v[1].size == 1:
239                             buf[off:off + l] = bytearray(kwargs[k])
240                             size = l
241                         else:
242                             size = 0
243                             for i in kwargs[k]:
244                                 v[1].pack_into(buf, off + size, i)
245                                 size += v[1].size
246                 else:
247                     if callable(v):
248                         size = v(self, True, buf, off, kwargs[k])
249                     else:
250                         v.pack_into(buf, off, kwargs[k])
251                         size = v.size
252             else:
253                 size = v.size if not type(v) is list else 0
254
255         return off + size - offset
256
257
258     def __getitem__(self, name):
259         if name in self.messages:
260             return self.messages[name]
261         return None
262
263     def encode(self, msgdef, kwargs):
264         # Make suitably large buffer
265         buf = bytearray(self.buffersize)
266         offset = 0
267         size = self.__struct_type(True, msgdef, buf, offset, kwargs)
268         return buf[:offset + size]
269
270     def decode(self, msgdef, buf):
271         return self.__struct_type(False, msgdef, buf, 0, None)[1]
272
273     def __struct_type_decode(self, msgdef, buf, offset):
274         res = []
275         off = offset
276         size = 0
277         for k,v in msgdef['args'].iteritems():
278             off += size
279             if type(v) is list:
280                 lst = []
281                 if callable(v[1]): # compound type
282                     size = 0
283                     if v[0] in msgdef['args']: # vla
284                         e = res[v[2]]
285                     else: # fixed array
286                         e = v[0]
287                     res.append(lst)
288                     for i in range(e):
289                         (s,l) = v[1](self, False, buf, off + size, None)
290                         lst.append(l)
291                         size += s
292                     continue
293                 if v[1].size == 1:
294                     if type(v[0]) is int:
295                         size = len(buf) - off
296                     else:
297                         size = res[v[2]]
298                     res.append(buf[off:off + size])
299                 else:
300                     e = v[0] if type(v[0]) is int else res[v[2]]
301                     if e == -1:
302                         e = (len(buf) - off) / v[1].size
303                     lst = []
304                     res.append(lst)
305                     size = 0
306                     for i in range(e):
307                         lst.append(v[1].unpack_from(buf, off + size)[0])
308                         size += v[1].size
309             else:
310                 if callable(v):
311                     (s,l) = v(self, False, buf, off, None)
312                     res.append(l)
313                     size += s
314                 else:
315                     res.append(v.unpack_from(buf, off)[0])
316                     size = v.size
317
318         return off + size - offset, msgdef['return_tuple']._make(res)
319
320     def ret_tup(self, name):
321         if name in self.messages and 'return_tuple' in self.messages[name]:
322             return self.messages[name]['return_tuple']
323         return None
324
325     def add_message(self, name, msgdef, typeonly = False):
326         if name in self.messages:
327             raise ValueError('Duplicate message name: ' + name)
328
329         args = collections.OrderedDict()
330         argtypes = collections.OrderedDict()
331         fields = []
332         msg = {}
333         for i, f in enumerate(msgdef):
334             if type(f) is dict and 'crc' in f:
335                 msg['crc'] = f['crc']
336                 continue
337             field_type = f[0]
338             field_name = f[1]
339             if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
340                 raise ValueError('Variable Length Array must be last: ' + name)
341             args[field_name] = self.__struct(*f)
342             argtypes[field_name] = field_type
343             if len(f) == 4: # Find offset to # elements field
344                 args[field_name].append(args.keys().index(f[3]) - i)
345             fields.append(field_name)
346         msg['return_tuple'] = collections.namedtuple(name, fields,
347                                                      rename = True)
348         self.messages[name] = msg
349         self.messages[name]['args'] = args
350         self.messages[name]['argtypes'] = argtypes
351         self.messages[name]['typeonly'] = typeonly
352         return self.messages[name]
353
354     def add_type(self, name, typedef):
355         return self.add_message('vl_api_' + name + '_t', typedef, typeonly=True)
356
357     def make_function(self, name, i, msgdef, multipart, async):
358         if (async):
359             f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
360         else:
361             f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart, **kwargs))
362         args = self.messages[name]['args']
363         argtypes = self.messages[name]['argtypes']
364         f.__name__ = str(name)
365         f.__doc__ = ", ".join(["%s %s" % (argtypes[k], k) for k in args.keys()])
366         return f
367
368     @property
369     def api(self):
370         if not hasattr(self, "_api"):
371             raise Exception("Not connected, api definitions not available")
372         return self._api
373
374     def _register_functions(self, async=False):
375         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
376         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
377         self._api = Empty()
378         for name, msgdef in self.messages.iteritems():
379             if self.messages[name]['typeonly']: continue
380             crc = self.messages[name]['crc']
381             n = name + '_' + crc[2:]
382             i = vpp_api.pneum_get_msg_index(bytes(n))
383             if i > 0:
384                 self.id_msgdef[i] = msgdef
385                 self.id_names[i] = name
386                 multipart = True if name.find('_dump') > 0 else False
387                 f = self.make_function(name, i, msgdef, multipart, async)
388                 setattr(self._api, name, FuncWrapper(f))
389
390                 # old API stuff starts here - will be removed in 17.07
391                 if hasattr(self, name):
392                     raise NameError(
393                         3, "Conflicting name in JSON definition: `%s'" % name)
394                 setattr(self, name, f)
395                 # old API stuff ends here
396             else:
397                 self.logger.debug('No such message type or failed CRC checksum: %s', n)
398
399     def _write (self, buf):
400         """Send a binary-packed message to VPP."""
401         if not self.connected:
402             raise IOError(1, 'Not connected')
403         return vpp_api.pneum_write(str(buf), len(buf))
404
405     def _read (self):
406         if not self.connected:
407             raise IOError(1, 'Not connected')
408         mem = ffi.new("char **")
409         size = ffi.new("int *")
410         rv = vpp_api.pneum_read(mem, size, self.read_timeout)
411         if rv:
412             raise IOError(rv, 'pneum_read filed')
413         msg = bytes(ffi.buffer(mem[0], size[0]))
414         vpp_api.pneum_free(mem[0])
415         return msg
416
417     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, async):
418         rv = vpp_api.pneum_connect(name, chroot_prefix, msg_handler, rx_qlen)
419         if rv != 0:
420             raise IOError(2, 'Connect failed')
421         self.connected = True
422
423         self.vpp_dictionary_maxid = vpp_api.pneum_msg_table_max_index()
424         self._register_functions(async=async)
425
426         # Initialise control ping
427         crc = self.messages['control_ping']['crc']
428         self.control_ping_index = \
429                                   vpp_api.pneum_get_msg_index(
430                                       bytes('control_ping' + '_' + crc[2:]))
431         self.control_ping_msgdef = self.messages['control_ping']
432
433     def connect(self, name, chroot_prefix = ffi.NULL,
434                 async = False, rx_qlen = 32):
435         """Attach to VPP.
436
437         name - the name of the client.
438         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
439         async - if true, messages are sent without waiting for a reply
440         rx_qlen - the length of the VPP message receive queue between
441         client and server.
442         """
443         msg_handler = pneum_callback_sync if not async \
444                       else pneum_callback_async
445         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
446                                      async)
447
448     def connect_sync (self, name, chroot_prefix = ffi.NULL, rx_qlen = 32):
449         """Attach to VPP in synchronous mode. Application must poll for events.
450
451         name - the name of the client.
452         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
453         rx_qlen - the length of the VPP message receive queue between
454         client and server.
455         """
456
457         return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
458                                      async=False)
459
460     def disconnect(self):
461         """Detach from VPP."""
462         rv = vpp_api.pneum_disconnect()
463         self.connected = False
464         return rv
465
466     def msg_handler_sync(self, msg):
467         """Process an incoming message from VPP in sync mode.
468
469         The message may be a reply or it may be an async notification.
470         """
471         r = self.decode_incoming_msg(msg)
472         if r is None:
473             return
474
475         # If we have a context, then use the context to find any
476         # request waiting for a reply
477         context = 0
478         if hasattr(r, 'context') and r.context > 0:
479             context = r.context
480
481         msgname = type(r).__name__
482
483         if context == 0:
484             # No context -> async notification that we feed to the callback
485             self.message_queue.put_nowait(r)
486         else:
487             raise IOError(2, 'RPC reply message received in event handler')
488
489     def decode_incoming_msg(self, msg):
490         if not msg:
491             self.logger.warning('vpp_api.read failed')
492             return
493
494         i, ci = self.header.unpack_from(msg, 0)
495         if self.id_names[i] == 'rx_thread_exit':
496             return
497
498         #
499         # Decode message and returns a tuple.
500         #
501         msgdef = self.id_msgdef[i]
502         if not msgdef:
503             raise IOError(2, 'Reply message undefined')
504
505         r = self.decode(msgdef, msg)
506
507         return r
508
509     def msg_handler_async(self, msg):
510         """Process a message from VPP in async mode.
511
512         In async mode, all messages are returned to the callback.
513         """
514         r = self.decode_incoming_msg(msg)
515         if r is None:
516             return
517
518         msgname = type(r).__name__
519
520         if self.event_callback:
521             self.event_callback(msgname, r)
522
523     def _control_ping(self, context):
524         """Send a ping command."""
525         self._call_vpp_async(self.control_ping_index,
526                              self.control_ping_msgdef,
527                              context=context)
528
529     def _call_vpp(self, i, msgdef, multipart, **kwargs):
530         """Given a message, send the message and await a reply.
531
532         msgdef - the message packing definition
533         i - the message type index
534         multipart - True if the message returns multiple
535         messages in return.
536         context - context number - chosen at random if not
537         supplied.
538         The remainder of the kwargs are the arguments to the API call.
539
540         The return value is the message or message array containing
541         the response.  It will raise an IOError exception if there was
542         no response within the timeout window.
543         """
544
545         if not 'context' in kwargs:
546             context = self.get_context()
547             kwargs['context'] = context
548         else:
549             context = kwargs['context']
550         kwargs['_vl_msg_id'] = i
551         b = self.encode(msgdef, kwargs)
552
553         vpp_api.pneum_rx_suspend()
554         self._write(b)
555
556         if multipart:
557             # Send a ping after the request - we use its response
558             # to detect that we have seen all results.
559             self._control_ping(context)
560
561         # Block until we get a reply.
562         rl = []
563         while (True):
564             msg = self._read()
565             if not msg:
566                 print('PNEUM ERROR: OH MY GOD')
567                 raise IOError(2, 'PNEUM read failed')
568
569             r = self.decode_incoming_msg(msg)
570             msgname = type(r).__name__
571             if not context in r or r.context == 0 or context != r.context:
572                 self.message_queue.put_nowait(r)
573                 continue
574
575             if not multipart:
576                 rl = r
577                 break
578             if msgname == 'control_ping_reply':
579                 break
580
581             rl.append(r)
582
583         vpp_api.pneum_rx_resume()
584
585         return rl
586
587     def _call_vpp_async(self, i, msgdef, **kwargs):
588         """Given a message, send the message and await a reply.
589
590         msgdef - the message packing definition
591         i - the message type index
592         context - context number - chosen at random if not
593         supplied.
594         The remainder of the kwargs are the arguments to the API call.
595         """
596         if not 'context' in kwargs:
597             context = self.get_context()
598             kwargs['context'] = context
599         else:
600             context = kwargs['context']
601         kwargs['_vl_msg_id'] = i
602         b = self.encode(msgdef, kwargs)
603
604         self._write(b)
605
606     def register_event_callback(self, callback):
607         """Register a callback for async messages.
608
609         This will be called for async notifications in sync mode,
610         and all messages in async mode.  In sync mode, replies to
611         requests will not come here.
612
613         callback is a fn(msg_type_name, msg_type) that will be
614         called when a message comes in.  While this function is
615         executing, note that (a) you are in a background thread and
616         may wish to use threading.Lock to protect your datastructures,
617         and (b) message processing from VPP will stop (so if you take
618         a long while about it you may provoke reply timeouts or cause
619         VPP to fill the RX buffer).  Passing None will disable the
620         callback.
621         """
622         self.event_callback = callback
623
624     def thread_msg_handler(self):
625         """Python thread calling the user registerd message handler.
626
627         This is to emulate the old style event callback scheme. Modern
628         clients should provide their own thread to poll the event
629         queue.
630         """
631         while True:
632             r = self.message_queue.get()
633             msgname = type(r).__name__
634             if self.event_callback:
635                 self.event_callback(msgname, r)