VPP-960: Python API add more information in exception for invalid arguments to API...
[vpp.git] / src / vpp-api / python / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 import sys
19 import os
20 import logging
21 import collections
22 import struct
23 import json
24 import threading
25 import glob
26 import atexit
27 from cffi import FFI
28
29 if sys.version[0] == '2':
30     import Queue as queue
31 else:
32     import queue as queue
33
34 ffi = FFI()
35 ffi.cdef("""
36 typedef void (*vac_callback_t)(unsigned char * data, int len);
37 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
38 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
39     int rx_qlen);
40 int vac_disconnect(void);
41 int vac_read(char **data, int *l, unsigned short timeout);
42 int vac_write(char *data, int len);
43 void vac_free(void * msg);
44
45 int vac_get_msg_index(unsigned char * name);
46 int vac_msg_table_size(void);
47 int vac_msg_table_max_index(void);
48
49 void vac_rx_suspend (void);
50 void vac_rx_resume (void);
51 void vac_set_error_handler(vac_error_callback_t);
52  """)
53
54 # Barfs on failure, no need to check success.
55 vpp_api = ffi.dlopen('libvppapiclient.so')
56
57
58 def vpp_atexit(self):
59     """Clean up VPP connection on shutdown."""
60     if self.connected:
61         self.logger.debug('Cleaning up VPP on exit')
62         self.disconnect()
63
64 vpp_object = None
65
66
67 def vpp_iterator(d):
68     if sys.version[0] == '2':
69         return d.iteritems()
70     else:
71         return d.items()
72
73
74 @ffi.callback("void(unsigned char *, int)")
75 def vac_callback_sync(data, len):
76     vpp_object.msg_handler_sync(ffi.buffer(data, len))
77
78
79 @ffi.callback("void(unsigned char *, int)")
80 def vac_callback_async(data, len):
81     vpp_object.msg_handler_async(ffi.buffer(data, len))
82
83
84 @ffi.callback("void(void *, unsigned char *, int)")
85 def vac_error_handler(arg, msg, msg_len):
86     vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
87
88
89 class Empty(object):
90     pass
91
92
93 class FuncWrapper(object):
94     def __init__(self, func):
95         self._func = func
96         self.__name__ = func.__name__
97
98     def __call__(self, **kwargs):
99         return self._func(**kwargs)
100
101
102 class VPP():
103     """VPP interface.
104
105     This class provides the APIs to VPP.  The APIs are loaded
106     from provided .api.json files and makes functions accordingly.
107     These functions are documented in the VPP .api files, as they
108     are dynamically created.
109
110     Additionally, VPP can send callback messages; this class
111     provides a means to register a callback function to receive
112     these messages in a background thread.
113     """
114     def __init__(self, apifiles=None, testmode=False, async_thread=True,
115                  logger=logging.getLogger('vpp_papi'), loglevel='debug'):
116         """Create a VPP API object.
117
118         apifiles is a list of files containing API
119         descriptions that will be loaded - methods will be
120         dynamically created reflecting these APIs.  If not
121         provided this will load the API files from VPP's
122         default install location.
123         """
124         global vpp_object
125         vpp_object = self
126         self.logger = logger
127         logging.basicConfig(level=getattr(logging, loglevel.upper()))
128
129         self.messages = {}
130         self.id_names = []
131         self.id_msgdef = []
132         self.buffersize = 10000
133         self.connected = False
134         self.header = struct.Struct('>HI')
135         self.apifiles = []
136         self.event_callback = None
137         self.message_queue = queue.Queue()
138         self.read_timeout = 0
139         self.vpp_api = vpp_api
140         if async_thread:
141             self.event_thread = threading.Thread(
142                 target=self.thread_msg_handler)
143             self.event_thread.daemon = True
144             self.event_thread.start()
145
146         if not apifiles:
147             # Pick up API definitions from default directory
148             apifiles = glob.glob('/usr/share/vpp/api/*.api.json')
149
150         for file in apifiles:
151             with open(file) as apidef_file:
152                 api = json.load(apidef_file)
153                 for t in api['types']:
154                     self.add_type(t[0], t[1:])
155
156                 for m in api['messages']:
157                     self.add_message(m[0], m[1:])
158         self.apifiles = apifiles
159
160         # Basic sanity check
161         if len(self.messages) == 0 and not testmode:
162             raise ValueError(1, 'Missing JSON message definitions')
163
164         # Make sure we allow VPP to clean up the message rings.
165         atexit.register(vpp_atexit, self)
166
167         # Register error handler
168         vpp_api.vac_set_error_handler(vac_error_handler)
169
170     class ContextId(object):
171         """Thread-safe provider of unique context IDs."""
172         def __init__(self):
173             self.context = 0
174             self.lock = threading.Lock()
175
176         def __call__(self):
177             """Get a new unique (or, at least, not recently used) context."""
178             with self.lock:
179                 self.context += 1
180                 return self.context
181     get_context = ContextId()
182
183     def status(self):
184         """Debug function: report current VPP API status to stdout."""
185         print('Connected') if self.connected else print('Not Connected')
186         print('Read API definitions from', ', '.join(self.apifiles))
187
188     def __struct(self, t, n=None, e=-1, vl=None):
189         """Create a packing structure for a message."""
190         base_types = {'u8': 'B',
191                       'u16': 'H',
192                       'u32': 'I',
193                       'i32': 'i',
194                       'u64': 'Q',
195                       'f64': 'd', }
196         pack = None
197         if t in base_types:
198             pack = base_types[t]
199             if not vl:
200                 if e > 0 and t == 'u8':
201                     # Fixed byte array
202                     return struct.Struct('>' + str(e) + 's')
203                 if e > 0:
204                     # Fixed array of base type
205                     return [e, struct.Struct('>' + base_types[t])]
206                 elif e == 0:
207                     # Old style variable array
208                     return [-1, struct.Struct('>' + base_types[t])]
209             else:
210                 # Variable length array
211                 return [vl, struct.Struct('>s')] if t == 'u8' else \
212                     [vl, struct.Struct('>' + base_types[t])]
213
214             return struct.Struct('>' + base_types[t])
215
216         if t in self.messages:
217             # Return a list in case of array
218             if e > 0 and not vl:
219                 return [e, lambda self, encode, buf, offset, args: (
220                     self.__struct_type(encode, self.messages[t], buf, offset,
221                                        args))]
222             if vl:
223                 return [vl, lambda self, encode, buf, offset, args: (
224                     self.__struct_type(encode, self.messages[t], buf, offset,
225                                        args))]
226             elif e == 0:
227                 # Old style VLA
228                 raise NotImplementedError(1,
229                                           'No support for compound types ' + t)
230             return lambda self, encode, buf, offset, args: (
231                 self.__struct_type(encode, self.messages[t], buf, offset, args)
232             )
233
234         raise ValueError(1, 'Invalid message type: ' + t)
235
236     def __struct_type(self, encode, msgdef, buf, offset, kwargs):
237         """Get a message packer or unpacker."""
238         if encode:
239             return self.__struct_type_encode(msgdef, buf, offset, kwargs)
240         else:
241             return self.__struct_type_decode(msgdef, buf, offset)
242
243     def __struct_type_encode(self, msgdef, buf, offset, kwargs):
244         off = offset
245         size = 0
246
247         for k in kwargs:
248             if k not in msgdef['args']:
249                 raise ValueError(1,'Non existing argument [' + k + ']' + \
250                                  ' used in call to: ' + \
251                                  self.id_names[kwargs['_vl_msg_id']] + '()' )
252
253
254         for k, v in vpp_iterator(msgdef['args']):
255             off += size
256             if k in kwargs:
257                 if type(v) is list:
258                     if callable(v[1]):
259                         e = kwargs[v[0]] if v[0] in kwargs else v[0]
260                         size = 0
261                         for i in range(e):
262                             size += v[1](self, True, buf, off + size,
263                                          kwargs[k][i])
264                     else:
265                         if v[0] in kwargs:
266                             l = kwargs[v[0]]
267                         else:
268                             l = len(kwargs[k])
269                         if v[1].size == 1:
270                             buf[off:off + l] = bytearray(kwargs[k])
271                             size = l
272                         else:
273                             size = 0
274                             for i in kwargs[k]:
275                                 v[1].pack_into(buf, off + size, i)
276                                 size += v[1].size
277                 else:
278                     if callable(v):
279                         size = v(self, True, buf, off, kwargs[k])
280                     else:
281                         v.pack_into(buf, off, kwargs[k])
282                         size = v.size
283             else:
284                 size = v.size if not type(v) is list else 0
285
286         return off + size - offset
287
288     def __getitem__(self, name):
289         if name in self.messages:
290             return self.messages[name]
291         return None
292
293     def encode(self, msgdef, kwargs):
294         # Make suitably large buffer
295         buf = bytearray(self.buffersize)
296         offset = 0
297         size = self.__struct_type(True, msgdef, buf, offset, kwargs)
298         return buf[:offset + size]
299
300     def decode(self, msgdef, buf):
301         return self.__struct_type(False, msgdef, buf, 0, None)[1]
302
303     def __struct_type_decode(self, msgdef, buf, offset):
304         res = []
305         off = offset
306         size = 0
307         for k, v in vpp_iterator(msgdef['args']):
308             off += size
309             if type(v) is list:
310                 lst = []
311                 if callable(v[1]):  # compound type
312                     size = 0
313                     if v[0] in msgdef['args']:  # vla
314                         e = res[v[2]]
315                     else:  # fixed array
316                         e = v[0]
317                     res.append(lst)
318                     for i in range(e):
319                         (s, l) = v[1](self, False, buf, off + size, None)
320                         lst.append(l)
321                         size += s
322                     continue
323                 if v[1].size == 1:
324                     if type(v[0]) is int:
325                         size = len(buf) - off
326                     else:
327                         size = res[v[2]]
328                     res.append(buf[off:off + size])
329                 else:
330                     e = v[0] if type(v[0]) is int else res[v[2]]
331                     if e == -1:
332                         e = (len(buf) - off) / v[1].size
333                     lst = []
334                     res.append(lst)
335                     size = 0
336                     for i in range(e):
337                         lst.append(v[1].unpack_from(buf, off + size)[0])
338                         size += v[1].size
339             else:
340                 if callable(v):
341                     (s, l) = v(self, False, buf, off, None)
342                     res.append(l)
343                     size += s
344                 else:
345                     res.append(v.unpack_from(buf, off)[0])
346                     size = v.size
347
348         return off + size - offset, msgdef['return_tuple']._make(res)
349
350     def ret_tup(self, name):
351         if name in self.messages and 'return_tuple' in self.messages[name]:
352             return self.messages[name]['return_tuple']
353         return None
354
355     def add_message(self, name, msgdef, typeonly=False):
356         if name in self.messages:
357             raise ValueError('Duplicate message name: ' + name)
358
359         args = collections.OrderedDict()
360         argtypes = collections.OrderedDict()
361         fields = []
362         msg = {}
363         for i, f in enumerate(msgdef):
364             if type(f) is dict and 'crc' in f:
365                 msg['crc'] = f['crc']
366                 continue
367             field_type = f[0]
368             field_name = f[1]
369             if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
370                 raise ValueError('Variable Length Array must be last: ' + name)
371             args[field_name] = self.__struct(*f)
372             argtypes[field_name] = field_type
373             if len(f) == 4:  # Find offset to # elements field
374                 idx = list(args.keys()).index(f[3]) - i
375                 args[field_name].append(idx)
376             fields.append(field_name)
377         msg['return_tuple'] = collections.namedtuple(name, fields,
378                                                      rename=True)
379         self.messages[name] = msg
380         self.messages[name]['args'] = args
381         self.messages[name]['argtypes'] = argtypes
382         self.messages[name]['typeonly'] = typeonly
383         return self.messages[name]
384
385     def add_type(self, name, typedef):
386         return self.add_message('vl_api_' + name + '_t', typedef,
387                                 typeonly=True)
388
389     def make_function(self, name, i, msgdef, multipart, async):
390         if (async):
391             f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
392         else:
393             f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart,
394                                                  **kwargs))
395         args = self.messages[name]['args']
396         argtypes = self.messages[name]['argtypes']
397         f.__name__ = str(name)
398         f.__doc__ = ", ".join(["%s %s" %
399                                (argtypes[k], k) for k in args.keys()])
400         return f
401
402     @property
403     def api(self):
404         if not hasattr(self, "_api"):
405             raise Exception("Not connected, api definitions not available")
406         return self._api
407
408     def _register_functions(self, async=False):
409         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
410         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
411         self._api = Empty()
412         for name, msgdef in vpp_iterator(self.messages):
413             if self.messages[name]['typeonly']:
414                 continue
415             crc = self.messages[name]['crc']
416             n = name + '_' + crc[2:]
417             i = vpp_api.vac_get_msg_index(n.encode())
418             if i > 0:
419                 self.id_msgdef[i] = msgdef
420                 self.id_names[i] = name
421                 multipart = True if name.find('_dump') > 0 else False
422                 f = self.make_function(name, i, msgdef, multipart, async)
423                 setattr(self._api, name, FuncWrapper(f))
424
425                 # old API stuff starts here - will be removed in 17.07
426                 if hasattr(self, name):
427                     raise NameError(
428                         3, "Conflicting name in JSON definition: `%s'" % name)
429                 setattr(self, name, f)
430                 # old API stuff ends here
431             else:
432                 self.logger.debug(
433                     'No such message type or failed CRC checksum: %s', n)
434
435     def _write(self, buf):
436         """Send a binary-packed message to VPP."""
437         if not self.connected:
438             raise IOError(1, 'Not connected')
439         return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
440
441     def _read(self):
442         if not self.connected:
443             raise IOError(1, 'Not connected')
444         mem = ffi.new("char **")
445         size = ffi.new("int *")
446         rv = vpp_api.vac_read(mem, size, self.read_timeout)
447         if rv:
448             raise IOError(rv, 'vac_read failed')
449         msg = bytes(ffi.buffer(mem[0], size[0]))
450         vpp_api.vac_free(mem[0])
451         return msg
452
453     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
454                          async):
455         pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
456         rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
457         if rv != 0:
458             raise IOError(2, 'Connect failed')
459         self.connected = True
460
461         self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
462         self._register_functions(async=async)
463
464         # Initialise control ping
465         crc = self.messages['control_ping']['crc']
466         self.control_ping_index = vpp_api.vac_get_msg_index(
467             ('control_ping' + '_' + crc[2:]).encode())
468         self.control_ping_msgdef = self.messages['control_ping']
469         return rv
470
471     def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
472         """Attach to VPP.
473
474         name - the name of the client.
475         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
476         async - if true, messages are sent without waiting for a reply
477         rx_qlen - the length of the VPP message receive queue between
478         client and server.
479         """
480         msg_handler = vac_callback_sync if not async else vac_callback_async
481         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
482                                      async)
483
484     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
485         """Attach to VPP in synchronous mode. Application must poll for events.
486
487         name - the name of the client.
488         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
489         rx_qlen - the length of the VPP message receive queue between
490         client and server.
491         """
492
493         return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
494                                      async=False)
495
496     def disconnect(self):
497         """Detach from VPP."""
498         rv = vpp_api.vac_disconnect()
499         self.connected = False
500         return rv
501
502     def msg_handler_sync(self, msg):
503         """Process an incoming message from VPP in sync mode.
504
505         The message may be a reply or it may be an async notification.
506         """
507         r = self.decode_incoming_msg(msg)
508         if r is None:
509             return
510
511         # If we have a context, then use the context to find any
512         # request waiting for a reply
513         context = 0
514         if hasattr(r, 'context') and r.context > 0:
515             context = r.context
516
517         msgname = type(r).__name__
518
519         if context == 0:
520             # No context -> async notification that we feed to the callback
521             self.message_queue.put_nowait(r)
522         else:
523             raise IOError(2, 'RPC reply message received in event handler')
524
525     def decode_incoming_msg(self, msg):
526         if not msg:
527             self.logger.warning('vpp_api.read failed')
528             return
529
530         i, ci = self.header.unpack_from(msg, 0)
531         if self.id_names[i] == 'rx_thread_exit':
532             return
533
534         #
535         # Decode message and returns a tuple.
536         #
537         msgdef = self.id_msgdef[i]
538         if not msgdef:
539             raise IOError(2, 'Reply message undefined')
540
541         r = self.decode(msgdef, msg)
542
543         return r
544
545     def msg_handler_async(self, msg):
546         """Process a message from VPP in async mode.
547
548         In async mode, all messages are returned to the callback.
549         """
550         r = self.decode_incoming_msg(msg)
551         if r is None:
552             return
553
554         msgname = type(r).__name__
555
556         if self.event_callback:
557             self.event_callback(msgname, r)
558
559     def _control_ping(self, context):
560         """Send a ping command."""
561         self._call_vpp_async(self.control_ping_index,
562                              self.control_ping_msgdef,
563                              context=context)
564
565     def _call_vpp(self, i, msgdef, multipart, **kwargs):
566         """Given a message, send the message and await a reply.
567
568         msgdef - the message packing definition
569         i - the message type index
570         multipart - True if the message returns multiple
571         messages in return.
572         context - context number - chosen at random if not
573         supplied.
574         The remainder of the kwargs are the arguments to the API call.
575
576         The return value is the message or message array containing
577         the response.  It will raise an IOError exception if there was
578         no response within the timeout window.
579         """
580
581         if 'context' not in kwargs:
582             context = self.get_context()
583             kwargs['context'] = context
584         else:
585             context = kwargs['context']
586         kwargs['_vl_msg_id'] = i
587         b = self.encode(msgdef, kwargs)
588
589         vpp_api.vac_rx_suspend()
590         self._write(b)
591
592         if multipart:
593             # Send a ping after the request - we use its response
594             # to detect that we have seen all results.
595             self._control_ping(context)
596
597         # Block until we get a reply.
598         rl = []
599         while (True):
600             msg = self._read()
601             if not msg:
602                 raise IOError(2, 'VPP API client: read failed')
603
604             r = self.decode_incoming_msg(msg)
605             msgname = type(r).__name__
606             if context not in r or r.context == 0 or context != r.context:
607                 self.message_queue.put_nowait(r)
608                 continue
609
610             if not multipart:
611                 rl = r
612                 break
613             if msgname == 'control_ping_reply':
614                 break
615
616             rl.append(r)
617
618         vpp_api.vac_rx_resume()
619
620         return rl
621
622     def _call_vpp_async(self, i, msgdef, **kwargs):
623         """Given a message, send the message and await a reply.
624
625         msgdef - the message packing definition
626         i - the message type index
627         context - context number - chosen at random if not
628         supplied.
629         The remainder of the kwargs are the arguments to the API call.
630         """
631         if 'context' not in kwargs:
632             context = self.get_context()
633             kwargs['context'] = context
634         else:
635             context = kwargs['context']
636         kwargs['_vl_msg_id'] = i
637         b = self.encode(msgdef, kwargs)
638
639         self._write(b)
640
641     def register_event_callback(self, callback):
642         """Register a callback for async messages.
643
644         This will be called for async notifications in sync mode,
645         and all messages in async mode.  In sync mode, replies to
646         requests will not come here.
647
648         callback is a fn(msg_type_name, msg_type) that will be
649         called when a message comes in.  While this function is
650         executing, note that (a) you are in a background thread and
651         may wish to use threading.Lock to protect your datastructures,
652         and (b) message processing from VPP will stop (so if you take
653         a long while about it you may provoke reply timeouts or cause
654         VPP to fill the RX buffer).  Passing None will disable the
655         callback.
656         """
657         self.event_callback = callback
658
659     def thread_msg_handler(self):
660         """Python thread calling the user registerd message handler.
661
662         This is to emulate the old style event callback scheme. Modern
663         clients should provide their own thread to poll the event
664         queue.
665         """
666         while True:
667             r = self.message_queue.get()
668             msgname = type(r).__name__
669             if self.event_callback:
670                 self.event_callback(msgname, r)