VPP PAPI: Error in unserializer for non-array compound types.
[vpp.git] / src / vpp-api / python / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 import sys
19 import os
20 import logging
21 import collections
22 import struct
23 import json
24 import threading
25 import glob
26 import atexit
27 from cffi import FFI
28
29 if sys.version[0] == '2':
30     import Queue as queue
31 else:
32     import queue as queue
33
34 ffi = FFI()
35 ffi.cdef("""
36 typedef void (*vac_callback_t)(unsigned char * data, int len);
37 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
38 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
39     int rx_qlen);
40 int vac_disconnect(void);
41 int vac_read(char **data, int *l, unsigned short timeout);
42 int vac_write(char *data, int len);
43 void vac_free(void * msg);
44
45 int vac_get_msg_index(unsigned char * name);
46 int vac_msg_table_size(void);
47 int vac_msg_table_max_index(void);
48
49 void vac_rx_suspend (void);
50 void vac_rx_resume (void);
51 void vac_set_error_handler(vac_error_callback_t);
52  """)
53
54 # Barfs on failure, no need to check success.
55 vpp_api = ffi.dlopen('libvppapiclient.so')
56
57
58 def vpp_atexit(self):
59     """Clean up VPP connection on shutdown."""
60     if self.connected:
61         self.logger.debug('Cleaning up VPP on exit')
62         self.disconnect()
63
64 vpp_object = None
65
66
67 def vpp_iterator(d):
68     if sys.version[0] == '2':
69         return d.iteritems()
70     else:
71         return d.items()
72
73
74 @ffi.callback("void(unsigned char *, int)")
75 def vac_callback_sync(data, len):
76     vpp_object.msg_handler_sync(ffi.buffer(data, len))
77
78
79 @ffi.callback("void(unsigned char *, int)")
80 def vac_callback_async(data, len):
81     vpp_object.msg_handler_async(ffi.buffer(data, len))
82
83
84 @ffi.callback("void(void *, unsigned char *, int)")
85 def vac_error_handler(arg, msg, msg_len):
86     vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
87
88
89 class Empty(object):
90     pass
91
92
93 class FuncWrapper(object):
94     def __init__(self, func):
95         self._func = func
96         self.__name__ = func.__name__
97
98     def __call__(self, **kwargs):
99         return self._func(**kwargs)
100
101
102 class VPP():
103     """VPP interface.
104
105     This class provides the APIs to VPP.  The APIs are loaded
106     from provided .api.json files and makes functions accordingly.
107     These functions are documented in the VPP .api files, as they
108     are dynamically created.
109
110     Additionally, VPP can send callback messages; this class
111     provides a means to register a callback function to receive
112     these messages in a background thread.
113     """
114     def __init__(self, apifiles=None, testmode=False, async_thread=True,
115                  logger=logging.getLogger('vpp_papi'), loglevel='debug'):
116         """Create a VPP API object.
117
118         apifiles is a list of files containing API
119         descriptions that will be loaded - methods will be
120         dynamically created reflecting these APIs.  If not
121         provided this will load the API files from VPP's
122         default install location.
123         """
124         global vpp_object
125         vpp_object = self
126         self.logger = logger
127         logging.basicConfig(level=getattr(logging, loglevel.upper()))
128
129         self.messages = {}
130         self.id_names = []
131         self.id_msgdef = []
132         self.connected = False
133         self.header = struct.Struct('>HI')
134         self.apifiles = []
135         self.event_callback = None
136         self.message_queue = queue.Queue()
137         self.read_timeout = 0
138         self.vpp_api = vpp_api
139         if async_thread:
140             self.event_thread = threading.Thread(
141                 target=self.thread_msg_handler)
142             self.event_thread.daemon = True
143             self.event_thread.start()
144
145         if not apifiles:
146             # Pick up API definitions from default directory
147             apifiles = glob.glob('/usr/share/vpp/api/*.api.json')
148
149         for file in apifiles:
150             with open(file) as apidef_file:
151                 api = json.load(apidef_file)
152                 for t in api['types']:
153                     self.add_type(t[0], t[1:])
154
155                 for m in api['messages']:
156                     self.add_message(m[0], m[1:])
157         self.apifiles = apifiles
158
159         # Basic sanity check
160         if len(self.messages) == 0 and not testmode:
161             raise ValueError(1, 'Missing JSON message definitions')
162
163         # Make sure we allow VPP to clean up the message rings.
164         atexit.register(vpp_atexit, self)
165
166         # Register error handler
167         vpp_api.vac_set_error_handler(vac_error_handler)
168
169     class ContextId(object):
170         """Thread-safe provider of unique context IDs."""
171         def __init__(self):
172             self.context = 0
173             self.lock = threading.Lock()
174
175         def __call__(self):
176             """Get a new unique (or, at least, not recently used) context."""
177             with self.lock:
178                 self.context += 1
179                 return self.context
180     get_context = ContextId()
181
182     def status(self):
183         """Debug function: report current VPP API status to stdout."""
184         print('Connected') if self.connected else print('Not Connected')
185         print('Read API definitions from', ', '.join(self.apifiles))
186
187     def __struct(self, t, n=None, e=-1, vl=None):
188         """Create a packing structure for a message."""
189         base_types = {'u8': 'B',
190                       'u16': 'H',
191                       'u32': 'I',
192                       'i32': 'i',
193                       'u64': 'Q',
194                       'f64': 'd', }
195         pack = None
196         if t in base_types:
197             pack = base_types[t]
198             if not vl:
199                 if e > 0 and t == 'u8':
200                     # Fixed byte array
201                     s = struct.Struct('>' + str(e) + 's')
202                     return s.size, s
203                 if e > 0:
204                     # Fixed array of base type
205                     s = struct.Struct('>' + base_types[t])
206                     return s.size, [e, s]
207                 elif e == 0:
208                     # Old style variable array
209                     s = struct.Struct('>' + base_types[t])
210                     return s.size, [-1, s]
211             else:
212                 # Variable length array
213                 if t == 'u8':
214                     s = struct.Struct('>s')
215                     return s.size, [vl, s]
216                 else:
217                     s = struct.Struct('>' + base_types[t])
218                 return s.size, [vl, s]
219
220             s = struct.Struct('>' + base_types[t])
221             return s.size, s
222
223         if t in self.messages:
224             size = self.messages[t]['sizes'][0]
225
226             # Return a list in case of array
227             if e > 0 and not vl:
228                 return size, [e, lambda self, encode, buf, offset, args: (
229                     self.__struct_type(encode, self.messages[t], buf, offset,
230                                        args))]
231             if vl:
232                 return size, [vl, lambda self, encode, buf, offset, args: (
233                     self.__struct_type(encode, self.messages[t], buf, offset,
234                                        args))]
235             elif e == 0:
236                 # Old style VLA
237                 raise NotImplementedError(1,
238                                           'No support for compound types ' + t)
239             return size, lambda self, encode, buf, offset, args: (
240                 self.__struct_type(encode, self.messages[t], buf, offset, args)
241             )
242
243         raise ValueError(1, 'Invalid message type: ' + t)
244
245     def __struct_type(self, encode, msgdef, buf, offset, kwargs):
246         """Get a message packer or unpacker."""
247         if encode:
248             return self.__struct_type_encode(msgdef, buf, offset, kwargs)
249         else:
250             return self.__struct_type_decode(msgdef, buf, offset)
251
252     def __struct_type_encode(self, msgdef, buf, offset, kwargs):
253         off = offset
254         size = 0
255
256         for k in kwargs:
257             if k not in msgdef['args']:
258                 raise ValueError(1,'Non existing argument [' + k + ']' + \
259                                  ' used in call to: ' + \
260                                  self.id_names[kwargs['_vl_msg_id']] + '()' )
261
262         for k, v in vpp_iterator(msgdef['args']):
263             off += size
264             if k in kwargs:
265                 if type(v) is list:
266                     if callable(v[1]):
267                         e = kwargs[v[0]] if v[0] in kwargs else v[0]
268                         if e != len(kwargs[k]):
269                             raise (ValueError(1, 'Input list length mismatch: %s (%s != %s)' %  (k, e, len(kwargs[k]))))
270                         size = 0
271                         for i in range(e):
272                             size += v[1](self, True, buf, off + size,
273                                          kwargs[k][i])
274                     else:
275                         if v[0] in kwargs:
276                             l = kwargs[v[0]]
277                             if l != len(kwargs[k]):
278                                 raise ValueError(1, 'Input list length mistmatch: %s (%s != %s)' % (k, l, len(kwargs[k])))
279                         else:
280                             l = len(kwargs[k])
281                         if v[1].size == 1:
282                             buf[off:off + l] = bytearray(kwargs[k])
283                             size = l
284                         else:
285                             size = 0
286                             for i in kwargs[k]:
287                                 v[1].pack_into(buf, off + size, i)
288                                 size += v[1].size
289                 else:
290                     if callable(v):
291                         size = v(self, True, buf, off, kwargs[k])
292                     else:
293                         if type(kwargs[k]) is str and v.size < len(kwargs[k]):
294                             raise ValueError(1, 'Input list length mistmatch: %s (%s < %s)' % (k, v.size, len(kwargs[k])))
295                         v.pack_into(buf, off, kwargs[k])
296                         size = v.size
297             else:
298                 size = v.size if not type(v) is list else 0
299
300         return off + size - offset
301
302     def __getitem__(self, name):
303         if name in self.messages:
304             return self.messages[name]
305         return None
306
307     def get_size(self, sizes, kwargs):
308         total_size = sizes[0]
309         for e in sizes[1]:
310             if e in kwargs and type(kwargs[e]) is list:
311                 total_size += len(kwargs[e]) * sizes[1][e]
312         return total_size
313
314     def encode(self, msgdef, kwargs):
315         # Make suitably large buffer
316         size = self.get_size(msgdef['sizes'], kwargs)
317         buf = bytearray(size)
318         offset = 0
319         size = self.__struct_type(True, msgdef, buf, offset, kwargs)
320         return buf[:offset + size]
321
322     def decode(self, msgdef, buf):
323         return self.__struct_type(False, msgdef, buf, 0, None)[1]
324
325     def __struct_type_decode(self, msgdef, buf, offset):
326         res = []
327         off = offset
328         size = 0
329         for k, v in vpp_iterator(msgdef['args']):
330             off += size
331             if type(v) is list:
332                 lst = []
333                 if callable(v[1]):  # compound type
334                     size = 0
335                     if v[0] in msgdef['args']:  # vla
336                         e = res[v[2]]
337                     else:  # fixed array
338                         e = v[0]
339                     res.append(lst)
340                     for i in range(e):
341                         (s, l) = v[1](self, False, buf, off + size, None)
342                         lst.append(l)
343                         size += s
344                     continue
345                 if v[1].size == 1:
346                     if type(v[0]) is int:
347                         size = len(buf) - off
348                     else:
349                         size = res[v[2]]
350                     res.append(buf[off:off + size])
351                 else:
352                     e = v[0] if type(v[0]) is int else res[v[2]]
353                     if e == -1:
354                         e = (len(buf) - off) / v[1].size
355                     lst = []
356                     res.append(lst)
357                     size = 0
358                     for i in range(e):
359                         lst.append(v[1].unpack_from(buf, off + size)[0])
360                         size += v[1].size
361             else:
362                 if callable(v):
363                     size = 0
364                     (s, l) = v(self, False, buf, off, None)
365                     res.append(l)
366                     size += s
367                 else:
368                     res.append(v.unpack_from(buf, off)[0])
369                     size = v.size
370
371         return off + size - offset, msgdef['return_tuple']._make(res)
372
373     def ret_tup(self, name):
374         if name in self.messages and 'return_tuple' in self.messages[name]:
375             return self.messages[name]['return_tuple']
376         return None
377
378     def add_message(self, name, msgdef, typeonly=False):
379         if name in self.messages:
380             raise ValueError('Duplicate message name: ' + name)
381
382         args = collections.OrderedDict()
383         argtypes = collections.OrderedDict()
384         fields = []
385         msg = {}
386         total_size = 0
387         sizes = {}
388         for i, f in enumerate(msgdef):
389             if type(f) is dict and 'crc' in f:
390                 msg['crc'] = f['crc']
391                 continue
392             field_type = f[0]
393             field_name = f[1]
394             if len(f) == 3 and f[2] == 0 and i != len(msgdef) - 2:
395                 raise ValueError('Variable Length Array must be last: ' + name)
396             size, s = self.__struct(*f)
397             args[field_name] = s
398             if type(s) == list and type(s[0]) == int and type(s[1]) == struct.Struct:
399                 if s[0] < 0:
400                     sizes[field_name] = size
401                 else:
402                     sizes[field_name] = size
403                     total_size += s[0] * size
404             else:
405                 sizes[field_name] = size
406                 total_size += size
407
408             argtypes[field_name] = field_type
409             if len(f) == 4:  # Find offset to # elements field
410                 idx = list(args.keys()).index(f[3]) - i
411                 args[field_name].append(idx)
412             fields.append(field_name)
413         msg['return_tuple'] = collections.namedtuple(name, fields,
414                                                      rename=True)
415         self.messages[name] = msg
416         self.messages[name]['args'] = args
417         self.messages[name]['argtypes'] = argtypes
418         self.messages[name]['typeonly'] = typeonly
419         self.messages[name]['sizes'] = [total_size, sizes]
420         return self.messages[name]
421
422     def add_type(self, name, typedef):
423         return self.add_message('vl_api_' + name + '_t', typedef,
424                                 typeonly=True)
425
426     def make_function(self, name, i, msgdef, multipart, async):
427         if (async):
428             f = lambda **kwargs: (self._call_vpp_async(i, msgdef, **kwargs))
429         else:
430             f = lambda **kwargs: (self._call_vpp(i, msgdef, multipart,
431                                                  **kwargs))
432         args = self.messages[name]['args']
433         argtypes = self.messages[name]['argtypes']
434         f.__name__ = str(name)
435         f.__doc__ = ", ".join(["%s %s" %
436                                (argtypes[k], k) for k in args.keys()])
437         return f
438
439     @property
440     def api(self):
441         if not hasattr(self, "_api"):
442             raise Exception("Not connected, api definitions not available")
443         return self._api
444
445     def _register_functions(self, async=False):
446         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
447         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
448         self._api = Empty()
449         for name, msgdef in vpp_iterator(self.messages):
450             if self.messages[name]['typeonly']:
451                 continue
452             crc = self.messages[name]['crc']
453             n = name + '_' + crc[2:]
454             i = vpp_api.vac_get_msg_index(n.encode())
455             if i > 0:
456                 self.id_msgdef[i] = msgdef
457                 self.id_names[i] = name
458                 multipart = True if name.find('_dump') > 0 else False
459                 f = self.make_function(name, i, msgdef, multipart, async)
460                 setattr(self._api, name, FuncWrapper(f))
461
462                 # old API stuff starts here - will be removed in 17.07
463                 if hasattr(self, name):
464                     raise NameError(
465                         3, "Conflicting name in JSON definition: `%s'" % name)
466                 setattr(self, name, f)
467                 # old API stuff ends here
468             else:
469                 self.logger.debug(
470                     'No such message type or failed CRC checksum: %s', n)
471
472     def _write(self, buf):
473         """Send a binary-packed message to VPP."""
474         if not self.connected:
475             raise IOError(1, 'Not connected')
476         return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
477
478     def _read(self):
479         if not self.connected:
480             raise IOError(1, 'Not connected')
481         mem = ffi.new("char **")
482         size = ffi.new("int *")
483         rv = vpp_api.vac_read(mem, size, self.read_timeout)
484         if rv:
485             raise IOError(rv, 'vac_read failed')
486         msg = bytes(ffi.buffer(mem[0], size[0]))
487         vpp_api.vac_free(mem[0])
488         return msg
489
490     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
491                          async):
492         pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
493         rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
494         if rv != 0:
495             raise IOError(2, 'Connect failed')
496         self.connected = True
497
498         self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
499         self._register_functions(async=async)
500
501         # Initialise control ping
502         crc = self.messages['control_ping']['crc']
503         self.control_ping_index = vpp_api.vac_get_msg_index(
504             ('control_ping' + '_' + crc[2:]).encode())
505         self.control_ping_msgdef = self.messages['control_ping']
506         return rv
507
508     def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
509         """Attach to VPP.
510
511         name - the name of the client.
512         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
513         async - if true, messages are sent without waiting for a reply
514         rx_qlen - the length of the VPP message receive queue between
515         client and server.
516         """
517         msg_handler = vac_callback_sync if not async else vac_callback_async
518         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
519                                      async)
520
521     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
522         """Attach to VPP in synchronous mode. Application must poll for events.
523
524         name - the name of the client.
525         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
526         rx_qlen - the length of the VPP message receive queue between
527         client and server.
528         """
529
530         return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
531                                      async=False)
532
533     def disconnect(self):
534         """Detach from VPP."""
535         rv = vpp_api.vac_disconnect()
536         self.connected = False
537         return rv
538
539     def msg_handler_sync(self, msg):
540         """Process an incoming message from VPP in sync mode.
541
542         The message may be a reply or it may be an async notification.
543         """
544         r = self.decode_incoming_msg(msg)
545         if r is None:
546             return
547
548         # If we have a context, then use the context to find any
549         # request waiting for a reply
550         context = 0
551         if hasattr(r, 'context') and r.context > 0:
552             context = r.context
553
554         msgname = type(r).__name__
555
556         if context == 0:
557             # No context -> async notification that we feed to the callback
558             self.message_queue.put_nowait(r)
559         else:
560             raise IOError(2, 'RPC reply message received in event handler')
561
562     def decode_incoming_msg(self, msg):
563         if not msg:
564             self.logger.warning('vpp_api.read failed')
565             return
566
567         i, ci = self.header.unpack_from(msg, 0)
568         if self.id_names[i] == 'rx_thread_exit':
569             return
570
571         #
572         # Decode message and returns a tuple.
573         #
574         msgdef = self.id_msgdef[i]
575         if not msgdef:
576             raise IOError(2, 'Reply message undefined')
577
578         r = self.decode(msgdef, msg)
579
580         return r
581
582     def msg_handler_async(self, msg):
583         """Process a message from VPP in async mode.
584
585         In async mode, all messages are returned to the callback.
586         """
587         r = self.decode_incoming_msg(msg)
588         if r is None:
589             return
590
591         msgname = type(r).__name__
592
593         if self.event_callback:
594             self.event_callback(msgname, r)
595
596     def _control_ping(self, context):
597         """Send a ping command."""
598         self._call_vpp_async(self.control_ping_index,
599                              self.control_ping_msgdef,
600                              context=context)
601
602     def _call_vpp(self, i, msgdef, multipart, **kwargs):
603         """Given a message, send the message and await a reply.
604
605         msgdef - the message packing definition
606         i - the message type index
607         multipart - True if the message returns multiple
608         messages in return.
609         context - context number - chosen at random if not
610         supplied.
611         The remainder of the kwargs are the arguments to the API call.
612
613         The return value is the message or message array containing
614         the response.  It will raise an IOError exception if there was
615         no response within the timeout window.
616         """
617
618         if 'context' not in kwargs:
619             context = self.get_context()
620             kwargs['context'] = context
621         else:
622             context = kwargs['context']
623         kwargs['_vl_msg_id'] = i
624         b = self.encode(msgdef, kwargs)
625
626         vpp_api.vac_rx_suspend()
627         self._write(b)
628
629         if multipart:
630             # Send a ping after the request - we use its response
631             # to detect that we have seen all results.
632             self._control_ping(context)
633
634         # Block until we get a reply.
635         rl = []
636         while (True):
637             msg = self._read()
638             if not msg:
639                 raise IOError(2, 'VPP API client: read failed')
640
641             r = self.decode_incoming_msg(msg)
642             msgname = type(r).__name__
643             if context not in r or r.context == 0 or context != r.context:
644                 self.message_queue.put_nowait(r)
645                 continue
646
647             if not multipart:
648                 rl = r
649                 break
650             if msgname == 'control_ping_reply':
651                 break
652
653             rl.append(r)
654
655         vpp_api.vac_rx_resume()
656
657         return rl
658
659     def _call_vpp_async(self, i, msgdef, **kwargs):
660         """Given a message, send the message and await a reply.
661
662         msgdef - the message packing definition
663         i - the message type index
664         context - context number - chosen at random if not
665         supplied.
666         The remainder of the kwargs are the arguments to the API call.
667         """
668         if 'context' not in kwargs:
669             context = self.get_context()
670             kwargs['context'] = context
671         else:
672             context = kwargs['context']
673         kwargs['_vl_msg_id'] = i
674         b = self.encode(msgdef, kwargs)
675
676         self._write(b)
677
678     def register_event_callback(self, callback):
679         """Register a callback for async messages.
680
681         This will be called for async notifications in sync mode,
682         and all messages in async mode.  In sync mode, replies to
683         requests will not come here.
684
685         callback is a fn(msg_type_name, msg_type) that will be
686         called when a message comes in.  While this function is
687         executing, note that (a) you are in a background thread and
688         may wish to use threading.Lock to protect your datastructures,
689         and (b) message processing from VPP will stop (so if you take
690         a long while about it you may provoke reply timeouts or cause
691         VPP to fill the RX buffer).  Passing None will disable the
692         callback.
693         """
694         self.event_callback = callback
695
696     def thread_msg_handler(self):
697         """Python thread calling the user registerd message handler.
698
699         This is to emulate the old style event callback scheme. Modern
700         clients should provide their own thread to poll the event
701         queue.
702         """
703         while True:
704             r = self.message_queue.get()
705             msgname = type(r).__name__
706             if self.event_callback:
707                 self.event_callback(msgname, r)