VPP-959: Support old version of CFFI
[vpp.git] / src / vpp-api / python / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 import sys
19 import os
20 import logging
21 import collections
22 import struct
23 import json
24 import threading
25 import glob
26 import atexit
27 from cffi import FFI
28 import cffi
29
30 if sys.version[0] == '2':
31     import Queue as queue
32 else:
33     import queue as queue
34
35 ffi = FFI()
36 ffi.cdef("""
37 typedef void (*vac_callback_t)(unsigned char * data, int len);
38 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
39 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
40     int rx_qlen);
41 int vac_disconnect(void);
42 int vac_read(char **data, int *l, unsigned short timeout);
43 int vac_write(char *data, int len);
44 void vac_free(void * msg);
45
46 int vac_get_msg_index(unsigned char * name);
47 int vac_msg_table_size(void);
48 int vac_msg_table_max_index(void);
49
50 void vac_rx_suspend (void);
51 void vac_rx_resume (void);
52 void vac_set_error_handler(vac_error_callback_t);
53  """)
54
55 # Barfs on failure, no need to check success.
56 vpp_api = ffi.dlopen('libvppapiclient.so')
57
58 def vpp_atexit(self):
59     """Clean up VPP connection on shutdown."""
60     if self.connected:
61         self.logger.debug('Cleaning up VPP on exit')
62         self.disconnect()
63
64 vpp_object = None
65
66
67 def vpp_iterator(d):
68     if sys.version[0] == '2':
69         return d.iteritems()
70     else:
71         return d.items()
72
73
74 @ffi.callback("void(unsigned char *, int)")
75 def vac_callback_sync(data, len):
76     vpp_object.msg_handler_sync(ffi.buffer(data, len))
77
78
79 @ffi.callback("void(unsigned char *, int)")
80 def vac_callback_async(data, len):
81     vpp_object.msg_handler_async(ffi.buffer(data, len))
82
83
84 @ffi.callback("void(void *, unsigned char *, int)")
85 def vac_error_handler(arg, msg, msg_len):
86     vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
87
88
89 class Empty(object):
90     pass
91
92
93 class FuncWrapper(object):
94     def __init__(self, func):
95         self._func = func
96         self.__name__ = func.__name__
97
98     def __call__(self, **kwargs):
99         return self._func(**kwargs)
100
101
102 class VPP():
103     """VPP interface.
104
105     This class provides the APIs to VPP.  The APIs are loaded
106     from provided .api.json files and makes functions accordingly.
107     These functions are documented in the VPP .api files, as they
108     are dynamically created.
109
110     Additionally, VPP can send callback messages; this class
111     provides a means to register a callback function to receive
112     these messages in a background thread.
113     """
114     def __init__(self, apifiles=None, testmode=False, async_thread=True,
115                  logger=logging.getLogger('vpp_papi'), loglevel='debug'):
116         """Create a VPP API object.
117
118         apifiles is a list of files containing API
119         descriptions that will be loaded - methods will be
120         dynamically created reflecting these APIs.  If not
121         provided this will load the API files from VPP's
122         default install location.
123         """
124         global vpp_object
125         vpp_object = self
126         self.logger = logger
127         logging.basicConfig(level=getattr(logging, loglevel.upper()))
128
129         self.messages = {}
130         self.id_names = []
131         self.id_msgdef = []
132         self.connected = False
133         self.header = struct.Struct('>HI')
134         self.apifiles = []
135         self.event_callback = None
136         self.message_queue = queue.Queue()
137         self.read_timeout = 0
138         self.vpp_api = vpp_api
139         if async_thread:
140             self.event_thread = threading.Thread(
141                 target=self.thread_msg_handler)
142             self.event_thread.daemon = True
143             self.event_thread.start()
144
145         if not apifiles:
146             # Pick up API definitions from default directory
147             apifiles = glob.glob('/usr/share/vpp/api/*.api.json')
148
149         for file in apifiles:
150             with open(file) as apidef_file:
151                 api = json.load(apidef_file)
152                 for t in api['types']:
153                     self.add_type(t[0], t[1:])
154
155                 for m in api['messages']:
156                     self.add_message(m[0], m[1:])
157         self.apifiles = apifiles
158
159         # Basic sanity check
160         if len(self.messages) == 0 and not testmode:
161             raise ValueError(1, 'Missing JSON message definitions')
162
163         # Make sure we allow VPP to clean up the message rings.
164         atexit.register(vpp_atexit, self)
165
166         # Register error handler
167         vpp_api.vac_set_error_handler(vac_error_handler)
168
169         # Support legacy CFFI
170         # from_buffer supported from 1.8.0
171         (major, minor, patch) = [int(s) for s in cffi.__version__.split('.', 3)]
172         if major >= 1 and minor >= 8:
173             self._write = self._write_new_cffi
174         else:
175             self._write = self._write_legacy_cffi
176
177     class ContextId(object):
178         """Thread-safe provider of unique context IDs."""
179         def __init__(self):
180             self.context = 0
181             self.lock = threading.Lock()
182
183         def __call__(self):
184             """Get a new unique (or, at least, not recently used) context."""
185             with self.lock:
186                 self.context += 1
187                 return self.context
188     get_context = ContextId()
189
190     def status(self):
191         """Debug function: report current VPP API status to stdout."""
192         print('Connected') if self.connected else print('Not Connected')
193         print('Read API definitions from', ', '.join(self.apifiles))
194
195     def __struct(self, t, n=None, e=-1, vl=None):
196         """Create a packing structure for a message."""
197         base_types = {'u8': 'B',
198                       'u16': 'H',
199                       'u32': 'I',
200                       'i32': 'i',
201                       'u64': 'Q',
202                       'f64': 'd', }
203         pack = None
204         if t in base_types:
205             pack = base_types[t]
206             if not vl:
207                 if e > 0 and t == 'u8':
208                     # Fixed byte array
209                     s = struct.Struct('>' + str(e) + 's')
210                     return s.size, s
211                 if e > 0:
212                     # Fixed array of base type
213                     s = struct.Struct('>' + base_types[t])
214                     return s.size, [e, s]
215                 elif e == 0:
216                     # Old style variable array
217                     s = struct.Struct('>' + base_types[t])
218                     return s.size, [-1, s]
219             else:
220                 # Variable length array
221                 if t == 'u8':
222                     s = struct.Struct('>s')
223                     return s.size, [vl, s]
224                 else:
225                     s = struct.Struct('>' + base_types[t])
226                 return s.size, [vl, s]
227
228             s = struct.Struct('>' + base_types[t])
229             return s.size, s
230
231         if t in self.messages:
232             size = self.messages[t]['sizes'][0]
233
234             # Return a list in case of array
235             if e > 0 and not vl:
236                 return size, [e, lambda self, encode, buf, offset, args: (
237                     self.__struct_type(encode, self.messages[t], buf, offset,
238                                        args))]
239             if vl:
240                 return size, [vl, lambda self, encode, buf, offset, args: (
241                     self.__struct_type(encode, self.messages[t], buf, offset,
242                                        args))]
243             elif e == 0:
244                 # Old style VLA
245                 raise NotImplementedError(1,
246                                           'No support for compound types ' + t)
247             return size, lambda self, encode, buf, offset, args: (
248                 self.__struct_type(encode, self.messages[t], buf, offset, args)
249             )
250
251         raise ValueError(1, 'Invalid message type: ' + t)
252
253     def __struct_type(self, encode, msgdef, buf, offset, kwargs):
254         """Get a message packer or unpacker."""
255         if encode:
256             return self.__struct_type_encode(msgdef, buf, offset, kwargs)
257         else:
258             return self.__struct_type_decode(msgdef, buf, offset)
259
260     def __struct_type_encode(self, msgdef, buf, offset, kwargs):
261         off = offset
262         size = 0
263
264         for k in kwargs:
265             if k not in msgdef['args']:
266                 raise ValueError(1,'Non existing argument [' + k + ']' + \
267                                  ' used in call to: ' + \
268                                  self.id_names[kwargs['_vl_msg_id']] + '()' )
269
270         for k, v in vpp_iterator(msgdef['args']):
271             off += size
272             if k in kwargs:
273                 if type(v) is list:
274                     if callable(v[1]):
275                         e = kwargs[v[0]] if v[0] in kwargs else v[0]
276                         if e != len(kwargs[k]):
277                             raise (ValueError(1, 'Input list length mismatch: %s (%s != %s)' %  (k, e, len(kwargs[k]))))
278                         size = 0
279                         for i in range(e):
280                             size += v[1](self, True, buf, off + size,
281                                          kwargs[k][i])
282                     else:
283                         if v[0] in kwargs:
284                             l = kwargs[v[0]]
285                             if l != len(kwargs[k]):
286                                 raise ValueError(1, 'Input list length mistmatch: %s (%s != %s)' % (k, l, len(kwargs[k])))
287                         else:
288                             l = len(kwargs[k])
289                         if v[1].size == 1:
290                             buf[off:off + l] = bytearray(kwargs[k])
291                             size = l
292                         else:
293                             size = 0
294                             for i in kwargs[k]:
295                                 v[1].pack_into(buf, off + size, i)
296                                 size += v[1].size
297                 else:
298                     if callable(v):
299                         size = v(self, True, buf, off, kwargs[k])
300                     else:
301                         if type(kwargs[k]) is str and v.size < len(kwargs[k]):
302                             raise ValueError(1, 'Input list length mistmatch: %s (%s < %s)' % (k, v.size, len(kwargs[k])))
303                         v.pack_into(buf, off, kwargs[k])
304                         size = v.size
305             else:
306                 size = v.size if not type(v) is list else 0
307
308         return off + size - offset
309
310     def __getitem__(self, name):
311         if name in self.messages:
312             return self.messages[name]
313         return None
314
315     def get_size(self, sizes, kwargs):
316         total_size = sizes[0]
317         for e in sizes[1]:
318             if e in kwargs and type(kwargs[e]) is list:
319                 total_size += len(kwargs[e]) * sizes[1][e]
320         return total_size
321
322     def encode(self, msgdef, kwargs):
323         # Make suitably large buffer
324         size = self.get_size(msgdef['sizes'], kwargs)
325         buf = bytearray(size)
326         offset = 0
327         size = self.__struct_type(True, msgdef, buf, offset, kwargs)
328         return buf[:offset + size]
329
330     def decode(self, msgdef, buf):
331         return self.__struct_type(False, msgdef, buf, 0, None)[1]
332
333     def __struct_type_decode(self, msgdef, buf, offset):
334         res = []
335         off = offset
336         size = 0
337         for k, v in vpp_iterator(msgdef['args']):
338             off += size
339             if type(v) is list:
340                 lst = []
341                 if callable(v[1]):  # compound type
342                     size = 0
343                     if v[0] in msgdef['args']:  # vla
344                         e = res[v[2]]
345                     else:  # fixed array
346                         e = v[0]
347                     res.append(lst)
348                     for i in range(e):
349                         (s, l) = v[1](self, False, buf, off + size, None)
350                         lst.append(l)
351                         size += s
352                     continue
353                 if v[1].size == 1:
354                     if type(v[0]) is int:
355                         size = len(buf) - off
356                     else:
357                         size = res[v[2]]
358                     res.append(buf[off:off + size])
359                 else:
360                     e = v[0] if type(v[0]) is int else res[v[2]]
361                     if e == -1:
362                         e = (len(buf) - off) / v[1].size
363                     lst = []
364                     res.append(lst)
365                     size = 0
366                     for i in range(e):
367                         lst.append(v[1].unpack_from(buf, off + size)[0])
368                         size += v[1].size
369             else:
370                 if callable(v):
371                     size = 0
372                     (s, l) = v(self, False, buf, off, None)
373                     res.append(l)
374                     size += s
375                 else:
376                     res.append(v.unpack_from(buf, off)[0])
377                     size = v.size
378
379         return off + size - offset, msgdef['return_tuple']._make(res)
380
381     def ret_tup(self, name):
382         if name in self.messages and 'return_tuple' in self.messages[name]:
383             return self.messages[name]['return_tuple']
384         return None
385
386     def add_message(self, name, msgdef, typeonly=False):
387         if name in self.messages:
388             raise ValueError('Duplicate message name: ' + name)
389
390         args = collections.OrderedDict()
391         argtypes = collections.OrderedDict()
392         fields = []
393         msg = {}
394         total_size = 0
395         sizes = {}
396         for i, f in enumerate(msgdef):
397             if type(f) is dict and 'crc' in f:
398                 msg['crc'] = f['crc']
399                 continue
400             field_type = f[0]
401             field_name = f[1]
402             if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
403                 raise ValueError('Variable Length Array must be last: ' + name)
404             size, s = self.__struct(*f)
405             args[field_name] = s
406             if type(s) == list and type(s[0]) == int and type(s[1]) == struct.Struct:
407                 if s[0] < 0:
408                     sizes[field_name] = size
409                 else:
410                     sizes[field_name] = size
411                     total_size += s[0] * size
412             else:
413                 sizes[field_name] = size
414                 total_size += size
415
416             argtypes[field_name] = field_type
417             if len(f) == 4:  # Find offset to # elements field
418                 idx = list(args.keys()).index(f[3]) - i
419                 args[field_name].append(idx)
420             fields.append(field_name)
421         msg['return_tuple'] = collections.namedtuple(name, fields,
422                                                      rename=True)
423         self.messages[name] = msg
424         self.messages[name]['args'] = args
425         self.messages[name]['argtypes'] = argtypes
426         self.messages[name]['typeonly'] = typeonly
427         self.messages[name]['sizes'] = [total_size, sizes]
428         return self.messages[name]
429
430     def add_type(self, name, typedef):
431         return self.add_message('vl_api_' + name + '_t', typedef,
432                                 typeonly=True)
433
434     def make_function(self, name, i, msgdef, multipart, async):
435         if (async):
436             f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
437         else:
438             f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart,
439                                                  **kwargs))
440         args = self.messages[name]['args']
441         argtypes = self.messages[name]['argtypes']
442         f.__name__ = str(name)
443         f.__doc__ = ", ".join(["%s %s" %
444                                (argtypes[k], k) for k in args.keys()])
445         return f
446
447     @property
448     def api(self):
449         if not hasattr(self, "_api"):
450             raise Exception("Not connected, api definitions not available")
451         return self._api
452
453     def _register_functions(self, async=False):
454         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
455         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
456         self._api = Empty()
457         for name, msgdef in vpp_iterator(self.messages):
458             if self.messages[name]['typeonly']:
459                 continue
460             crc = self.messages[name]['crc']
461             n = name + '_' + crc[2:]
462             i = vpp_api.vac_get_msg_index(n.encode())
463             if i > 0:
464                 self.id_msgdef[i] = msgdef
465                 self.id_names[i] = name
466                 multipart = True if name.find('_dump') > 0 else False
467                 f = self.make_function(name, i, msgdef, multipart, async)
468                 setattr(self._api, name, FuncWrapper(f))
469
470                 # old API stuff starts here - will be removed in 17.07
471                 if hasattr(self, name):
472                     raise NameError(
473                         3, "Conflicting name in JSON definition: `%s'" % name)
474                 setattr(self, name, f)
475                 # old API stuff ends here
476             else:
477                 self.logger.debug(
478                     'No such message type or failed CRC checksum: %s', n)
479
480     def _write_new_cffi(self, buf):
481         """Send a binary-packed message to VPP."""
482         if not self.connected:
483             raise IOError(1, 'Not connected')
484         return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
485
486     def _write_legacy_cffi(self, buf):
487         """Send a binary-packed message to VPP."""
488         if not self.connected:
489             raise IOError(1, 'Not connected')
490         return vpp_api.vac_write(str(buf), len(buf))
491
492     def _read(self):
493         if not self.connected:
494             raise IOError(1, 'Not connected')
495         mem = ffi.new("char **")
496         size = ffi.new("int *")
497         rv = vpp_api.vac_read(mem, size, self.read_timeout)
498         if rv:
499             raise IOError(rv, 'vac_read failed')
500         msg = bytes(ffi.buffer(mem[0], size[0]))
501         vpp_api.vac_free(mem[0])
502         return msg
503
504     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
505                          async):
506         pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
507         rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
508         if rv != 0:
509             raise IOError(2, 'Connect failed')
510         self.connected = True
511
512         self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
513         self._register_functions(async=async)
514
515         # Initialise control ping
516         crc = self.messages['control_ping']['crc']
517         self.control_ping_index = vpp_api.vac_get_msg_index(
518             ('control_ping' + '_' + crc[2:]).encode())
519         self.control_ping_msgdef = self.messages['control_ping']
520         return rv
521
522     def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
523         """Attach to VPP.
524
525         name - the name of the client.
526         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
527         async - if true, messages are sent without waiting for a reply
528         rx_qlen - the length of the VPP message receive queue between
529         client and server.
530         """
531         msg_handler = vac_callback_sync if not async else vac_callback_async
532         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
533                                      async)
534
535     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
536         """Attach to VPP in synchronous mode. Application must poll for events.
537
538         name - the name of the client.
539         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
540         rx_qlen - the length of the VPP message receive queue between
541         client and server.
542         """
543
544         return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
545                                      async=False)
546
547     def disconnect(self):
548         """Detach from VPP."""
549         rv = vpp_api.vac_disconnect()
550         self.connected = False
551         return rv
552
553     def msg_handler_sync(self, msg):
554         """Process an incoming message from VPP in sync mode.
555
556         The message may be a reply or it may be an async notification.
557         """
558         r = self.decode_incoming_msg(msg)
559         if r is None:
560             return
561
562         # If we have a context, then use the context to find any
563         # request waiting for a reply
564         context = 0
565         if hasattr(r, 'context') and r.context > 0:
566             context = r.context
567
568         msgname = type(r).__name__
569
570         if context == 0:
571             # No context -> async notification that we feed to the callback
572             self.message_queue.put_nowait(r)
573         else:
574             raise IOError(2, 'RPC reply message received in event handler')
575
576     def decode_incoming_msg(self, msg):
577         if not msg:
578             self.logger.warning('vpp_api.read failed')
579             return
580
581         i, ci = self.header.unpack_from(msg, 0)
582         if self.id_names[i] == 'rx_thread_exit':
583             return
584
585         #
586         # Decode message and returns a tuple.
587         #
588         msgdef = self.id_msgdef[i]
589         if not msgdef:
590             raise IOError(2, 'Reply message undefined')
591
592         r = self.decode(msgdef, msg)
593
594         return r
595
596     def msg_handler_async(self, msg):
597         """Process a message from VPP in async mode.
598
599         In async mode, all messages are returned to the callback.
600         """
601         r = self.decode_incoming_msg(msg)
602         if r is None:
603             return
604
605         msgname = type(r).__name__
606
607         if self.event_callback:
608             self.event_callback(msgname, r)
609
610     def _control_ping(self, context):
611         """Send a ping command."""
612         self._call_vpp_async(self.control_ping_index,
613                              self.control_ping_msgdef,
614                              context=context)
615
616     def _call_vpp(self, i, msgdef, multipart, **kwargs):
617         """Given a message, send the message and await a reply.
618
619         msgdef - the message packing definition
620         i - the message type index
621         multipart - True if the message returns multiple
622         messages in return.
623         context - context number - chosen at random if not
624         supplied.
625         The remainder of the kwargs are the arguments to the API call.
626
627         The return value is the message or message array containing
628         the response.  It will raise an IOError exception if there was
629         no response within the timeout window.
630         """
631
632         if 'context' not in kwargs:
633             context = self.get_context()
634             kwargs['context'] = context
635         else:
636             context = kwargs['context']
637         kwargs['_vl_msg_id'] = i
638         b = self.encode(msgdef, kwargs)
639
640         vpp_api.vac_rx_suspend()
641         self._write(b)
642
643         if multipart:
644             # Send a ping after the request - we use its response
645             # to detect that we have seen all results.
646             self._control_ping(context)
647
648         # Block until we get a reply.
649         rl = []
650         while (True):
651             msg = self._read()
652             if not msg:
653                 raise IOError(2, 'VPP API client: read failed')
654
655             r = self.decode_incoming_msg(msg)
656             msgname = type(r).__name__
657             if context not in r or r.context == 0 or context != r.context:
658                 self.message_queue.put_nowait(r)
659                 continue
660
661             if not multipart:
662                 rl = r
663                 break
664             if msgname == 'control_ping_reply':
665                 break
666
667             rl.append(r)
668
669         vpp_api.vac_rx_resume()
670
671         return rl
672
673     def _call_vpp_async(self, i, msgdef, **kwargs):
674         """Given a message, send the message and await a reply.
675
676         msgdef - the message packing definition
677         i - the message type index
678         context - context number - chosen at random if not
679         supplied.
680         The remainder of the kwargs are the arguments to the API call.
681         """
682         if 'context' not in kwargs:
683             context = self.get_context()
684             kwargs['context'] = context
685         else:
686             context = kwargs['context']
687         kwargs['_vl_msg_id'] = i
688         b = self.encode(msgdef, kwargs)
689
690         self._write(b)
691
692     def register_event_callback(self, callback):
693         """Register a callback for async messages.
694
695         This will be called for async notifications in sync mode,
696         and all messages in async mode.  In sync mode, replies to
697         requests will not come here.
698
699         callback is a fn(msg_type_name, msg_type) that will be
700         called when a message comes in.  While this function is
701         executing, note that (a) you are in a background thread and
702         may wish to use threading.Lock to protect your datastructures,
703         and (b) message processing from VPP will stop (so if you take
704         a long while about it you may provoke reply timeouts or cause
705         VPP to fill the RX buffer).  Passing None will disable the
706         callback.
707         """
708         self.event_callback = callback
709
710     def thread_msg_handler(self):
711         """Python thread calling the user registerd message handler.
712
713         This is to emulate the old style event callback scheme. Modern
714         clients should provide their own thread to poll the event
715         queue.
716         """
717         while True:
718             r = self.message_queue.get()
719             msgname = type(r).__name__
720             if self.event_callback:
721                 self.event_callback(msgname, r)