VPP-1033: Python API support arbitrary sized input parameters.
[vpp.git] / src / vpp-api / python / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 import sys
19 import os
20 import logging
21 import collections
22 import struct
23 import json
24 import threading
25 import glob
26 import atexit
27 from cffi import FFI
28
29 if sys.version[0] == '2':
30     import Queue as queue
31 else:
32     import queue as queue
33
34 ffi = FFI()
35 ffi.cdef("""
36 typedef void (*vac_callback_t)(unsigned char * data, int len);
37 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
38 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
39     int rx_qlen);
40 int vac_disconnect(void);
41 int vac_read(char **data, int *l, unsigned short timeout);
42 int vac_write(char *data, int len);
43 void vac_free(void * msg);
44
45 int vac_get_msg_index(unsigned char * name);
46 int vac_msg_table_size(void);
47 int vac_msg_table_max_index(void);
48
49 void vac_rx_suspend (void);
50 void vac_rx_resume (void);
51 void vac_set_error_handler(vac_error_callback_t);
52  """)
53
54 # Barfs on failure, no need to check success.
55 vpp_api = ffi.dlopen('libvppapiclient.so')
56
57
58 def vpp_atexit(self):
59     """Clean up VPP connection on shutdown."""
60     if self.connected:
61         self.logger.debug('Cleaning up VPP on exit')
62         self.disconnect()
63
64 vpp_object = None
65
66
67 def vpp_iterator(d):
68     if sys.version[0] == '2':
69         return d.iteritems()
70     else:
71         return d.items()
72
73
74 @ffi.callback("void(unsigned char *, int)")
75 def vac_callback_sync(data, len):
76     vpp_object.msg_handler_sync(ffi.buffer(data, len))
77
78
79 @ffi.callback("void(unsigned char *, int)")
80 def vac_callback_async(data, len):
81     vpp_object.msg_handler_async(ffi.buffer(data, len))
82
83
84 @ffi.callback("void(void *, unsigned char *, int)")
85 def vac_error_handler(arg, msg, msg_len):
86     vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
87
88
89 class Empty(object):
90     pass
91
92
93 class FuncWrapper(object):
94     def __init__(self, func):
95         self._func = func
96         self.__name__ = func.__name__
97
98     def __call__(self, **kwargs):
99         return self._func(**kwargs)
100
101
102 class VPP():
103     """VPP interface.
104
105     This class provides the APIs to VPP.  The APIs are loaded
106     from provided .api.json files and makes functions accordingly.
107     These functions are documented in the VPP .api files, as they
108     are dynamically created.
109
110     Additionally, VPP can send callback messages; this class
111     provides a means to register a callback function to receive
112     these messages in a background thread.
113     """
114     def __init__(self, apifiles=None, testmode=False, async_thread=True,
115                  logger=logging.getLogger('vpp_papi'), loglevel='debug'):
116         """Create a VPP API object.
117
118         apifiles is a list of files containing API
119         descriptions that will be loaded - methods will be
120         dynamically created reflecting these APIs.  If not
121         provided this will load the API files from VPP's
122         default install location.
123         """
124         global vpp_object
125         vpp_object = self
126         self.logger = logger
127         logging.basicConfig(level=getattr(logging, loglevel.upper()))
128
129         self.messages = {}
130         self.id_names = []
131         self.id_msgdef = []
132         self.connected = False
133         self.header = struct.Struct('>HI')
134         self.apifiles = []
135         self.event_callback = None
136         self.message_queue = queue.Queue()
137         self.read_timeout = 0
138         self.vpp_api = vpp_api
139         if async_thread:
140             self.event_thread = threading.Thread(
141                 target=self.thread_msg_handler)
142             self.event_thread.daemon = True
143             self.event_thread.start()
144
145         if not apifiles:
146             # Pick up API definitions from default directory
147             apifiles = glob.glob('/usr/share/vpp/api/*.api.json')
148
149         for file in apifiles:
150             with open(file) as apidef_file:
151                 api = json.load(apidef_file)
152                 for t in api['types']:
153                     self.add_type(t[0], t[1:])
154
155                 for m in api['messages']:
156                     self.add_message(m[0], m[1:])
157         self.apifiles = apifiles
158
159         # Basic sanity check
160         if len(self.messages) == 0 and not testmode:
161             raise ValueError(1, 'Missing JSON message definitions')
162
163         # Make sure we allow VPP to clean up the message rings.
164         atexit.register(vpp_atexit, self)
165
166         # Register error handler
167         vpp_api.vac_set_error_handler(vac_error_handler)
168
169     class ContextId(object):
170         """Thread-safe provider of unique context IDs."""
171         def __init__(self):
172             self.context = 0
173             self.lock = threading.Lock()
174
175         def __call__(self):
176             """Get a new unique (or, at least, not recently used) context."""
177             with self.lock:
178                 self.context += 1
179                 return self.context
180     get_context = ContextId()
181
182     def status(self):
183         """Debug function: report current VPP API status to stdout."""
184         print('Connected') if self.connected else print('Not Connected')
185         print('Read API definitions from', ', '.join(self.apifiles))
186
187     def __struct(self, t, n=None, e=-1, vl=None):
188         """Create a packing structure for a message."""
189         base_types = {'u8': 'B',
190                       'u16': 'H',
191                       'u32': 'I',
192                       'i32': 'i',
193                       'u64': 'Q',
194                       'f64': 'd', }
195         pack = None
196         if t in base_types:
197             pack = base_types[t]
198             if not vl:
199                 if e > 0 and t == 'u8':
200                     # Fixed byte array
201                     s = struct.Struct('>' + str(e) + 's')
202                     return s.size, s
203                 if e > 0:
204                     # Fixed array of base type
205                     s = struct.Struct('>' + base_types[t])
206                     return s.size, [e, s]
207                 elif e == 0:
208                     # Old style variable array
209                     s = struct.Struct('>' + base_types[t])
210                     return s.size, [-1, s]
211             else:
212                 # Variable length array
213                 if t == 'u8':
214                     s = struct.Struct('>s')
215                     return s.size, [vl, s]
216                 else:
217                     s = struct.Struct('>' + base_types[t])
218                 return s.size, [vl, s]
219
220             s = struct.Struct('>' + base_types[t])
221             return s.size, s
222
223         if t in self.messages:
224             size = self.messages[t]['sizes'][0]
225
226             # Return a list in case of array
227             if e > 0 and not vl:
228                 return size, [e, lambda self, encode, buf, offset, args: (
229                     self.__struct_type(encode, self.messages[t], buf, offset,
230                                        args))]
231             if vl:
232                 return size, [vl, lambda self, encode, buf, offset, args: (
233                     self.__struct_type(encode, self.messages[t], buf, offset,
234                                        args))]
235             elif e == 0:
236                 # Old style VLA
237                 raise NotImplementedError(1,
238                                           'No support for compound types ' + t)
239             return size, lambda self, encode, buf, offset, args: (
240                 self.__struct_type(encode, self.messages[t], buf, offset, args)
241             )
242
243         raise ValueError(1, 'Invalid message type: ' + t)
244
245     def __struct_type(self, encode, msgdef, buf, offset, kwargs):
246         """Get a message packer or unpacker."""
247         if encode:
248             return self.__struct_type_encode(msgdef, buf, offset, kwargs)
249         else:
250             return self.__struct_type_decode(msgdef, buf, offset)
251
252     def __struct_type_encode(self, msgdef, buf, offset, kwargs):
253         off = offset
254         size = 0
255
256         for k in kwargs:
257             if k not in msgdef['args']:
258                 raise ValueError(1,'Non existing argument [' + k + ']' + \
259                                  ' used in call to: ' + \
260                                  self.id_names[kwargs['_vl_msg_id']] + '()' )
261
262         for k, v in vpp_iterator(msgdef['args']):
263             off += size
264             if k in kwargs:
265                 if type(v) is list:
266                     if callable(v[1]):
267                         e = kwargs[v[0]] if v[0] in kwargs else v[0]
268                         if e != len(kwargs[k]):
269                             raise (ValueError(1, 'Input list length mismatch: %s (%s != %s)' %  (k, e, len(kwargs[k]))))
270                         size = 0
271                         for i in range(e):
272                             size += v[1](self, True, buf, off + size,
273                                          kwargs[k][i])
274                     else:
275                         if v[0] in kwargs:
276                             l = kwargs[v[0]]
277                             if l != len(kwargs[k]):
278                                 raise ValueError(1, 'Input list length mistmatch: %s (%s != %s)' % (k, l, len(kwargs[k])))
279                         else:
280                             l = len(kwargs[k])
281                         if v[1].size == 1:
282                             buf[off:off + l] = bytearray(kwargs[k])
283                             size = l
284                         else:
285                             size = 0
286                             for i in kwargs[k]:
287                                 v[1].pack_into(buf, off + size, i)
288                                 size += v[1].size
289                 else:
290                     if callable(v):
291                         size = v(self, True, buf, off, kwargs[k])
292                     else:
293                         if type(kwargs[k]) is str and v.size < len(kwargs[k]):
294                             raise ValueError(1, 'Input list length mistmatch: %s (%s < %s)' % (k, v.size, len(kwargs[k])))
295                         v.pack_into(buf, off, kwargs[k])
296                         size = v.size
297             else:
298                 size = v.size if not type(v) is list else 0
299
300         return off + size - offset
301
302     def __getitem__(self, name):
303         if name in self.messages:
304             return self.messages[name]
305         return None
306
307     def get_size(self, sizes, kwargs):
308         total_size = sizes[0]
309         for e in sizes[1]:
310             if e in kwargs and type(kwargs[e]) is list:
311                 total_size += len(kwargs[e]) * sizes[1][e]
312         return total_size
313
314     def encode(self, msgdef, kwargs):
315         # Make suitably large buffer
316         size = self.get_size(msgdef['sizes'], kwargs)
317         buf = bytearray(size)
318         offset = 0
319         size = self.__struct_type(True, msgdef, buf, offset, kwargs)
320         return buf[:offset + size]
321
322     def decode(self, msgdef, buf):
323         return self.__struct_type(False, msgdef, buf, 0, None)[1]
324
325     def __struct_type_decode(self, msgdef, buf, offset):
326         res = []
327         off = offset
328         size = 0
329         for k, v in vpp_iterator(msgdef['args']):
330             off += size
331             if type(v) is list:
332                 lst = []
333                 if callable(v[1]):  # compound type
334                     size = 0
335                     if v[0] in msgdef['args']:  # vla
336                         e = res[v[2]]
337                     else:  # fixed array
338                         e = v[0]
339                     res.append(lst)
340                     for i in range(e):
341                         (s, l) = v[1](self, False, buf, off + size, None)
342                         lst.append(l)
343                         size += s
344                     continue
345                 if v[1].size == 1:
346                     if type(v[0]) is int:
347                         size = len(buf) - off
348                     else:
349                         size = res[v[2]]
350                     res.append(buf[off:off + size])
351                 else:
352                     e = v[0] if type(v[0]) is int else res[v[2]]
353                     if e == -1:
354                         e = (len(buf) - off) / v[1].size
355                     lst = []
356                     res.append(lst)
357                     size = 0
358                     for i in range(e):
359                         lst.append(v[1].unpack_from(buf, off + size)[0])
360                         size += v[1].size
361             else:
362                 if callable(v):
363                     (s, l) = v(self, False, buf, off, None)
364                     res.append(l)
365                     size += s
366                 else:
367                     res.append(v.unpack_from(buf, off)[0])
368                     size = v.size
369
370         return off + size - offset, msgdef['return_tuple']._make(res)
371
372     def ret_tup(self, name):
373         if name in self.messages and 'return_tuple' in self.messages[name]:
374             return self.messages[name]['return_tuple']
375         return None
376
377     def add_message(self, name, msgdef, typeonly=False):
378         if name in self.messages:
379             raise ValueError('Duplicate message name: ' + name)
380
381         args = collections.OrderedDict()
382         argtypes = collections.OrderedDict()
383         fields = []
384         msg = {}
385         total_size = 0
386         sizes = {}
387         for i, f in enumerate(msgdef):
388             if type(f) is dict and 'crc' in f:
389                 msg['crc'] = f['crc']
390                 continue
391             field_type = f[0]
392             field_name = f[1]
393             if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
394                 raise ValueError('Variable Length Array must be last: ' + name)
395             size, s = self.__struct(*f)
396             args[field_name] = s
397             if type(s) == list and type(s[0]) == int and type(s[1]) == struct.Struct:
398                 if s[0] < 0:
399                     sizes[field_name] = size
400                 else:
401                     sizes[field_name] = size
402                     total_size += s[0] * size
403             else:
404                 sizes[field_name] = size
405                 total_size += size
406
407             argtypes[field_name] = field_type
408             if len(f) == 4:  # Find offset to # elements field
409                 idx = list(args.keys()).index(f[3]) - i
410                 args[field_name].append(idx)
411             fields.append(field_name)
412         msg['return_tuple'] = collections.namedtuple(name, fields,
413                                                      rename=True)
414         self.messages[name] = msg
415         self.messages[name]['args'] = args
416         self.messages[name]['argtypes'] = argtypes
417         self.messages[name]['typeonly'] = typeonly
418         self.messages[name]['sizes'] = [total_size, sizes]
419         return self.messages[name]
420
421     def add_type(self, name, typedef):
422         return self.add_message('vl_api_' + name + '_t', typedef,
423                                 typeonly=True)
424
425     def make_function(self, name, i, msgdef, multipart, async):
426         if (async):
427             f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
428         else:
429             f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart,
430                                                  **kwargs))
431         args = self.messages[name]['args']
432         argtypes = self.messages[name]['argtypes']
433         f.__name__ = str(name)
434         f.__doc__ = ", ".join(["%s %s" %
435                                (argtypes[k], k) for k in args.keys()])
436         return f
437
438     @property
439     def api(self):
440         if not hasattr(self, "_api"):
441             raise Exception("Not connected, api definitions not available")
442         return self._api
443
444     def _register_functions(self, async=False):
445         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
446         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
447         self._api = Empty()
448         for name, msgdef in vpp_iterator(self.messages):
449             if self.messages[name]['typeonly']:
450                 continue
451             crc = self.messages[name]['crc']
452             n = name + '_' + crc[2:]
453             i = vpp_api.vac_get_msg_index(n.encode())
454             if i > 0:
455                 self.id_msgdef[i] = msgdef
456                 self.id_names[i] = name
457                 multipart = True if name.find('_dump') > 0 else False
458                 f = self.make_function(name, i, msgdef, multipart, async)
459                 setattr(self._api, name, FuncWrapper(f))
460
461                 # old API stuff starts here - will be removed in 17.07
462                 if hasattr(self, name):
463                     raise NameError(
464                         3, "Conflicting name in JSON definition: `%s'" % name)
465                 setattr(self, name, f)
466                 # old API stuff ends here
467             else:
468                 self.logger.debug(
469                     'No such message type or failed CRC checksum: %s', n)
470
471     def _write(self, buf):
472         """Send a binary-packed message to VPP."""
473         if not self.connected:
474             raise IOError(1, 'Not connected')
475         return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
476
477     def _read(self):
478         if not self.connected:
479             raise IOError(1, 'Not connected')
480         mem = ffi.new("char **")
481         size = ffi.new("int *")
482         rv = vpp_api.vac_read(mem, size, self.read_timeout)
483         if rv:
484             raise IOError(rv, 'vac_read failed')
485         msg = bytes(ffi.buffer(mem[0], size[0]))
486         vpp_api.vac_free(mem[0])
487         return msg
488
489     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
490                          async):
491         pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
492         rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
493         if rv != 0:
494             raise IOError(2, 'Connect failed')
495         self.connected = True
496
497         self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
498         self._register_functions(async=async)
499
500         # Initialise control ping
501         crc = self.messages['control_ping']['crc']
502         self.control_ping_index = vpp_api.vac_get_msg_index(
503             ('control_ping' + '_' + crc[2:]).encode())
504         self.control_ping_msgdef = self.messages['control_ping']
505         return rv
506
507     def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
508         """Attach to VPP.
509
510         name - the name of the client.
511         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
512         async - if true, messages are sent without waiting for a reply
513         rx_qlen - the length of the VPP message receive queue between
514         client and server.
515         """
516         msg_handler = vac_callback_sync if not async else vac_callback_async
517         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
518                                      async)
519
520     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
521         """Attach to VPP in synchronous mode. Application must poll for events.
522
523         name - the name of the client.
524         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
525         rx_qlen - the length of the VPP message receive queue between
526         client and server.
527         """
528
529         return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
530                                      async=False)
531
532     def disconnect(self):
533         """Detach from VPP."""
534         rv = vpp_api.vac_disconnect()
535         self.connected = False
536         return rv
537
538     def msg_handler_sync(self, msg):
539         """Process an incoming message from VPP in sync mode.
540
541         The message may be a reply or it may be an async notification.
542         """
543         r = self.decode_incoming_msg(msg)
544         if r is None:
545             return
546
547         # If we have a context, then use the context to find any
548         # request waiting for a reply
549         context = 0
550         if hasattr(r, 'context') and r.context > 0:
551             context = r.context
552
553         msgname = type(r).__name__
554
555         if context == 0:
556             # No context -> async notification that we feed to the callback
557             self.message_queue.put_nowait(r)
558         else:
559             raise IOError(2, 'RPC reply message received in event handler')
560
561     def decode_incoming_msg(self, msg):
562         if not msg:
563             self.logger.warning('vpp_api.read failed')
564             return
565
566         i, ci = self.header.unpack_from(msg, 0)
567         if self.id_names[i] == 'rx_thread_exit':
568             return
569
570         #
571         # Decode message and returns a tuple.
572         #
573         msgdef = self.id_msgdef[i]
574         if not msgdef:
575             raise IOError(2, 'Reply message undefined')
576
577         r = self.decode(msgdef, msg)
578
579         return r
580
581     def msg_handler_async(self, msg):
582         """Process a message from VPP in async mode.
583
584         In async mode, all messages are returned to the callback.
585         """
586         r = self.decode_incoming_msg(msg)
587         if r is None:
588             return
589
590         msgname = type(r).__name__
591
592         if self.event_callback:
593             self.event_callback(msgname, r)
594
595     def _control_ping(self, context):
596         """Send a ping command."""
597         self._call_vpp_async(self.control_ping_index,
598                              self.control_ping_msgdef,
599                              context=context)
600
601     def _call_vpp(self, i, msgdef, multipart, **kwargs):
602         """Given a message, send the message and await a reply.
603
604         msgdef - the message packing definition
605         i - the message type index
606         multipart - True if the message returns multiple
607         messages in return.
608         context - context number - chosen at random if not
609         supplied.
610         The remainder of the kwargs are the arguments to the API call.
611
612         The return value is the message or message array containing
613         the response.  It will raise an IOError exception if there was
614         no response within the timeout window.
615         """
616
617         if 'context' not in kwargs:
618             context = self.get_context()
619             kwargs['context'] = context
620         else:
621             context = kwargs['context']
622         kwargs['_vl_msg_id'] = i
623         b = self.encode(msgdef, kwargs)
624
625         vpp_api.vac_rx_suspend()
626         self._write(b)
627
628         if multipart:
629             # Send a ping after the request - we use its response
630             # to detect that we have seen all results.
631             self._control_ping(context)
632
633         # Block until we get a reply.
634         rl = []
635         while (True):
636             msg = self._read()
637             if not msg:
638                 raise IOError(2, 'VPP API client: read failed')
639
640             r = self.decode_incoming_msg(msg)
641             msgname = type(r).__name__
642             if context not in r or r.context == 0 or context != r.context:
643                 self.message_queue.put_nowait(r)
644                 continue
645
646             if not multipart:
647                 rl = r
648                 break
649             if msgname == 'control_ping_reply':
650                 break
651
652             rl.append(r)
653
654         vpp_api.vac_rx_resume()
655
656         return rl
657
658     def _call_vpp_async(self, i, msgdef, **kwargs):
659         """Given a message, send the message and await a reply.
660
661         msgdef - the message packing definition
662         i - the message type index
663         context - context number - chosen at random if not
664         supplied.
665         The remainder of the kwargs are the arguments to the API call.
666         """
667         if 'context' not in kwargs:
668             context = self.get_context()
669             kwargs['context'] = context
670         else:
671             context = kwargs['context']
672         kwargs['_vl_msg_id'] = i
673         b = self.encode(msgdef, kwargs)
674
675         self._write(b)
676
677     def register_event_callback(self, callback):
678         """Register a callback for async messages.
679
680         This will be called for async notifications in sync mode,
681         and all messages in async mode.  In sync mode, replies to
682         requests will not come here.
683
684         callback is a fn(msg_type_name, msg_type) that will be
685         called when a message comes in.  While this function is
686         executing, note that (a) you are in a background thread and
687         may wish to use threading.Lock to protect your datastructures,
688         and (b) message processing from VPP will stop (so if you take
689         a long while about it you may provoke reply timeouts or cause
690         VPP to fill the RX buffer).  Passing None will disable the
691         callback.
692         """
693         self.event_callback = callback
694
695     def thread_msg_handler(self):
696         """Python thread calling the user registerd message handler.
697
698         This is to emulate the old style event callback scheme. Modern
699         clients should provide their own thread to poll the event
700         queue.
701         """
702         while True:
703             r = self.message_queue.get()
704             msgname = type(r).__name__
705             if self.event_callback:
706                 self.event_callback(msgname, r)