PAPI: Unpack embedded types with variable length arrays.
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import sys
20 import os
21 import logging
22 import collections
23 import struct
24 import json
25 import threading
26 import fnmatch
27 import weakref
28 import atexit
29 from cffi import FFI
30 import cffi
31 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
32 from . vpp_serializer import VPPMessage
33
34 if sys.version[0] == '2':
35     import Queue as queue
36 else:
37     import queue as queue
38
39 ffi = FFI()
40 ffi.cdef("""
41 typedef void (*vac_callback_t)(unsigned char * data, int len);
42 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
43 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
44     int rx_qlen);
45 int vac_disconnect(void);
46 int vac_read(char **data, int *l, unsigned short timeout);
47 int vac_write(char *data, int len);
48 void vac_free(void * msg);
49
50 int vac_get_msg_index(unsigned char * name);
51 int vac_msg_table_size(void);
52 int vac_msg_table_max_index(void);
53
54 void vac_rx_suspend (void);
55 void vac_rx_resume (void);
56 void vac_set_error_handler(vac_error_callback_t);
57  """)
58
59 # Barfs on failure, no need to check success.
60 vpp_api = ffi.dlopen('libvppapiclient.so')
61
62
63 def vpp_atexit(vpp_weakref):
64     """Clean up VPP connection on shutdown."""
65     vpp_instance = vpp_weakref()
66     if vpp_instance and vpp_instance.connected:
67         vpp_instance.logger.debug('Cleaning up VPP on exit')
68         vpp_instance.disconnect()
69
70
71 vpp_object = None
72
73
74 def vpp_iterator(d):
75     if sys.version[0] == '2':
76         return d.iteritems()
77     else:
78         return d.items()
79
80
81 @ffi.callback("void(unsigned char *, int)")
82 def vac_callback_sync(data, len):
83     vpp_object.msg_handler_sync(ffi.buffer(data, len))
84
85
86 @ffi.callback("void(unsigned char *, int)")
87 def vac_callback_async(data, len):
88     vpp_object.msg_handler_async(ffi.buffer(data, len))
89
90
91 @ffi.callback("void(void *, unsigned char *, int)")
92 def vac_error_handler(arg, msg, msg_len):
93     vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
94
95
96 class VppApiDynamicMethodHolder(object):
97     pass
98
99
100 class FuncWrapper(object):
101     def __init__(self, func):
102         self._func = func
103         self.__name__ = func.__name__
104
105     def __call__(self, **kwargs):
106         return self._func(**kwargs)
107
108
109 class VPP():
110     """VPP interface.
111
112     This class provides the APIs to VPP.  The APIs are loaded
113     from provided .api.json files and makes functions accordingly.
114     These functions are documented in the VPP .api files, as they
115     are dynamically created.
116
117     Additionally, VPP can send callback messages; this class
118     provides a means to register a callback function to receive
119     these messages in a background thread.
120     """
121
122     def process_json_file(self, apidef_file):
123         api = json.load(apidef_file)
124         types = {}
125         for t in api['enums']:
126             t[0] = 'vl_api_' + t[0] + '_t'
127             types[t[0]] = {'type': 'enum', 'data': t}
128         for t in api['unions']:
129             t[0] = 'vl_api_' + t[0] + '_t'
130             types[t[0]] = {'type': 'union', 'data': t}
131         for t in api['types']:
132             t[0] = 'vl_api_' + t[0] + '_t'
133             types[t[0]] = {'type': 'type', 'data': t}
134
135         i = 0
136         while True:
137             unresolved = {}
138             for k, v in types.items():
139                 t = v['data']
140                 if v['type'] == 'enum':
141                     try:
142                         VPPEnumType(t[0], t[1:])
143                     except ValueError:
144                         unresolved[k] = v
145                 elif v['type'] == 'union':
146                     try:
147                         VPPUnionType(t[0], t[1:])
148                     except ValueError:
149                         unresolved[k] = v
150                 elif v['type'] == 'type':
151                     try:
152                         VPPType(t[0], t[1:])
153                     except ValueError:
154                         unresolved[k] = v
155             if len(unresolved) == 0:
156                 break
157             if i > 3:
158                 raise ValueError('Unresolved type definitions {}'
159                                  .format(unresolved))
160             types = unresolved
161             i += 1
162
163         for m in api['messages']:
164             try:
165                 self.messages[m[0]] = VPPMessage(m[0], m[1:])
166             except NotImplementedError:
167                 self.logger.error('Not implemented error for {}'.format(m[0]))
168
169     def __init__(self, apifiles=None, testmode=False, async_thread=True,
170                  logger=logging.getLogger('vpp_papi'), loglevel='debug',
171                  read_timeout=0):
172         """Create a VPP API object.
173
174         apifiles is a list of files containing API
175         descriptions that will be loaded - methods will be
176         dynamically created reflecting these APIs.  If not
177         provided this will load the API files from VPP's
178         default install location.
179
180         logger, if supplied, is the logging logger object to log to.
181         loglevel, if supplied, is the log level this logger is set
182         to report at (from the loglevels in the logging module).
183         """
184         global vpp_object
185         vpp_object = self
186
187         if logger is None:
188             logger = logging.getLogger(__name__)
189             if loglevel is not None:
190                 logger.setLevel(loglevel)
191         self.logger = logger
192
193         self.messages = {}
194         self.id_names = []
195         self.id_msgdef = []
196         self.connected = False
197         self.header = VPPType('header', [['u16', 'msgid'],
198                                          ['u32', 'client_index']])
199         self.apifiles = []
200         self.event_callback = None
201         self.message_queue = queue.Queue()
202         self.read_timeout = read_timeout
203         self.vpp_api = vpp_api
204         self.async_thread = async_thread
205
206         if not apifiles:
207             # Pick up API definitions from default directory
208             try:
209                 apifiles = self.find_api_files()
210             except RuntimeError:
211                 # In test mode we don't care that we can't find the API files
212                 if testmode:
213                     apifiles = []
214                 else:
215                     raise
216
217         for file in apifiles:
218             with open(file) as apidef_file:
219                 self.process_json_file(apidef_file)
220
221         self.apifiles = apifiles
222
223         # Basic sanity check
224         if len(self.messages) == 0 and not testmode:
225             raise ValueError(1, 'Missing JSON message definitions')
226
227         # Make sure we allow VPP to clean up the message rings.
228         atexit.register(vpp_atexit, weakref.ref(self))
229
230         # Register error handler
231         if not testmode:
232             vpp_api.vac_set_error_handler(vac_error_handler)
233
234         # Support legacy CFFI
235         # from_buffer supported from 1.8.0
236         (major, minor, patch) = [int(s) for s in
237                                  cffi.__version__.split('.', 3)]
238         if major >= 1 and minor >= 8:
239             self._write = self._write_new_cffi
240         else:
241             self._write = self._write_legacy_cffi
242
243     class ContextId(object):
244         """Thread-safe provider of unique context IDs."""
245         def __init__(self):
246             self.context = 0
247             self.lock = threading.Lock()
248
249         def __call__(self):
250             """Get a new unique (or, at least, not recently used) context."""
251             with self.lock:
252                 self.context += 1
253                 return self.context
254     get_context = ContextId()
255
256     @classmethod
257     def find_api_dir(cls):
258         """Attempt to find the best directory in which API definition
259         files may reside. If the value VPP_API_DIR exists in the environment
260         then it is first on the search list. If we're inside a recognized
261         location in a VPP source tree (src/scripts and src/vpp-api/python)
262         then entries from there to the likely locations in build-root are
263         added. Finally the location used by system packages is added.
264
265         :returns: A single directory name, or None if no such directory
266             could be found.
267         """
268         dirs = []
269
270         if 'VPP_API_DIR' in os.environ:
271             dirs.append(os.environ['VPP_API_DIR'])
272
273         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
274         # in which case, plot a course to likely places in the src tree
275         import __main__ as main
276         if hasattr(main, '__file__'):
277             # get the path of the calling script
278             localdir = os.path.dirname(os.path.realpath(main.__file__))
279         else:
280             # use cwd if there is no calling script
281             localdir = os.getcwd()
282         localdir_s = localdir.split(os.path.sep)
283
284         def dmatch(dir):
285             """Match dir against right-hand components of the script dir"""
286             d = dir.split('/')  # param 'dir' assumes a / separator
287             length = len(d)
288             return len(localdir_s) > length and localdir_s[-length:] == d
289
290         def sdir(srcdir, variant):
291             """Build a path from srcdir to the staged API files of
292             'variant'  (typically '' or '_debug')"""
293             # Since 'core' and 'plugin' files are staged
294             # in separate directories, we target the parent dir.
295             return os.path.sep.join((
296                 srcdir,
297                 'build-root',
298                 'install-vpp%s-native' % variant,
299                 'vpp',
300                 'share',
301                 'vpp',
302                 'api',
303             ))
304
305         srcdir = None
306         if dmatch('src/scripts'):
307             srcdir = os.path.sep.join(localdir_s[:-2])
308         elif dmatch('src/vpp-api/python'):
309             srcdir = os.path.sep.join(localdir_s[:-3])
310         elif dmatch('test'):
311             # we're apparently running tests
312             srcdir = os.path.sep.join(localdir_s[:-1])
313
314         if srcdir:
315             # we're in the source tree, try both the debug and release
316             # variants.
317             dirs.append(sdir(srcdir, '_debug'))
318             dirs.append(sdir(srcdir, ''))
319
320         # Test for staged copies of the scripts
321         # For these, since we explicitly know if we're running a debug versus
322         # release variant, target only the relevant directory
323         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
324             srcdir = os.path.sep.join(localdir_s[:-4])
325             dirs.append(sdir(srcdir, '_debug'))
326         if dmatch('build-root/install-vpp-native/vpp/bin'):
327             srcdir = os.path.sep.join(localdir_s[:-4])
328             dirs.append(sdir(srcdir, ''))
329
330         # finally, try the location system packages typically install into
331         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
332
333         # check the directories for existance; first one wins
334         for dir in dirs:
335             if os.path.isdir(dir):
336                 return dir
337
338         return None
339
340     @classmethod
341     def find_api_files(cls, api_dir=None, patterns='*'):
342         """Find API definition files from the given directory tree with the
343         given pattern. If no directory is given then find_api_dir() is used
344         to locate one. If no pattern is given then all definition files found
345         in the directory tree are used.
346
347         :param api_dir: A directory tree in which to locate API definition
348             files; subdirectories are descended into.
349             If this is None then find_api_dir() is called to discover it.
350         :param patterns: A list of patterns to use in each visited directory
351             when looking for files.
352             This can be a list/tuple object or a comma-separated string of
353             patterns. Each value in the list will have leading/trialing
354             whitespace stripped.
355             The pattern specifies the first part of the filename, '.api.json'
356             is appended.
357             The results are de-duplicated, thus overlapping patterns are fine.
358             If this is None it defaults to '*' meaning "all API files".
359         :returns: A list of file paths for the API files found.
360         """
361         if api_dir is None:
362             api_dir = cls.find_api_dir()
363             if api_dir is None:
364                 raise RuntimeError("api_dir cannot be located")
365
366         if isinstance(patterns, list) or isinstance(patterns, tuple):
367             patterns = [p.strip() + '.api.json' for p in patterns]
368         else:
369             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
370
371         api_files = []
372         for root, dirnames, files in os.walk(api_dir):
373             # iterate all given patterns and de-dup the result
374             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
375             for filename in files:
376                 api_files.append(os.path.join(root, filename))
377
378         return api_files
379
380     def status(self):
381         """Debug function: report current VPP API status to stdout."""
382         print('Connected') if self.connected else print('Not Connected')
383         print('Read API definitions from', ', '.join(self.apifiles))
384
385     @property
386     def api(self):
387         if not hasattr(self, "_api"):
388             raise Exception("Not connected, api definitions not available")
389         return self._api
390
391     def make_function(self, msg, i, multipart, do_async):
392         if (do_async):
393             def f(**kwargs):
394                 return self._call_vpp_async(i, msg, **kwargs)
395         else:
396             def f(**kwargs):
397                 return self._call_vpp(i, msg, multipart, **kwargs)
398
399         f.__name__ = str(msg.name)
400         f.__doc__ = ", ".join(["%s %s" %
401                                (msg.fieldtypes[j], k)
402                                for j, k in enumerate(msg.fields)])
403         return f
404
405     def _register_functions(self, do_async=False):
406         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
407         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
408         self._api = VppApiDynamicMethodHolder()
409         for name, msg in vpp_iterator(self.messages):
410             n = name + '_' + msg.crc[2:]
411             i = vpp_api.vac_get_msg_index(n.encode())
412             if i > 0:
413                 self.id_msgdef[i] = msg
414                 self.id_names[i] = name
415                 # TODO: Fix multipart (use services)
416                 multipart = True if name.find('_dump') > 0 else False
417                 f = self.make_function(msg, i, multipart, do_async)
418                 setattr(self._api, name, FuncWrapper(f))
419             else:
420                 self.logger.debug(
421                     'No such message type or failed CRC checksum: %s', n)
422
423     def _write_new_cffi(self, buf):
424         """Send a binary-packed message to VPP."""
425         if not self.connected:
426             raise IOError(1, 'Not connected')
427         return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
428
429     def _write_legacy_cffi(self, buf):
430         """Send a binary-packed message to VPP."""
431         if not self.connected:
432             raise IOError(1, 'Not connected')
433         return vpp_api.vac_write(bytes(buf), len(buf))
434
435     def _read(self):
436         if not self.connected:
437             raise IOError(1, 'Not connected')
438         mem = ffi.new("char **")
439         size = ffi.new("int *")
440         rv = vpp_api.vac_read(mem, size, self.read_timeout)
441         if rv:
442             raise IOError(rv, 'vac_read failed')
443         msg = bytes(ffi.buffer(mem[0], size[0]))
444         vpp_api.vac_free(mem[0])
445         return msg
446
447     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
448                          do_async):
449         pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
450         rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
451         if rv != 0:
452             raise IOError(2, 'Connect failed')
453         self.connected = True
454         self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
455         self._register_functions(do_async=do_async)
456
457         # Initialise control ping
458         crc = self.messages['control_ping'].crc
459         self.control_ping_index = vpp_api.vac_get_msg_index(
460             ('control_ping' + '_' + crc[2:]).encode())
461         self.control_ping_msgdef = self.messages['control_ping']
462         if self.async_thread:
463             self.event_thread = threading.Thread(
464                 target=self.thread_msg_handler)
465             self.event_thread.daemon = True
466             self.event_thread.start()
467         return rv
468
469     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
470         """Attach to VPP.
471
472         name - the name of the client.
473         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
474         do_async - if true, messages are sent without waiting for a reply
475         rx_qlen - the length of the VPP message receive queue between
476         client and server.
477         """
478         msg_handler = vac_callback_sync if not do_async else vac_callback_async
479         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
480                                      do_async)
481
482     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
483         """Attach to VPP in synchronous mode. Application must poll for events.
484
485         name - the name of the client.
486         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
487         rx_qlen - the length of the VPP message receive queue between
488         client and server.
489         """
490
491         return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
492                                      do_async=False)
493
494     def disconnect(self):
495         """Detach from VPP."""
496         rv = vpp_api.vac_disconnect()
497         self.connected = False
498         self.message_queue.put("terminate event thread")
499         return rv
500
501     def msg_handler_sync(self, msg):
502         """Process an incoming message from VPP in sync mode.
503
504         The message may be a reply or it may be an async notification.
505         """
506         r = self.decode_incoming_msg(msg)
507         if r is None:
508             return
509
510         # If we have a context, then use the context to find any
511         # request waiting for a reply
512         context = 0
513         if hasattr(r, 'context') and r.context > 0:
514             context = r.context
515
516         if context == 0:
517             # No context -> async notification that we feed to the callback
518             self.message_queue.put_nowait(r)
519         else:
520             raise IOError(2, 'RPC reply message received in event handler')
521
522     def decode_incoming_msg(self, msg):
523         if not msg:
524             self.logger.warning('vpp_api.read failed')
525             return
526         (i, ci), size = self.header.unpack(msg, 0)
527         if self.id_names[i] == 'rx_thread_exit':
528             return
529
530         #
531         # Decode message and returns a tuple.
532         #
533         msgobj = self.id_msgdef[i]
534         if not msgobj:
535             raise IOError(2, 'Reply message undefined')
536
537         r, size = msgobj.unpack(msg)
538         return r
539
540     def msg_handler_async(self, msg):
541         """Process a message from VPP in async mode.
542
543         In async mode, all messages are returned to the callback.
544         """
545         r = self.decode_incoming_msg(msg)
546         if r is None:
547             return
548
549         msgname = type(r).__name__
550
551         if self.event_callback:
552             self.event_callback(msgname, r)
553
554     def _control_ping(self, context):
555         """Send a ping command."""
556         self._call_vpp_async(self.control_ping_index,
557                              self.control_ping_msgdef,
558                              context=context)
559
560     def validate_args(self, msg, kwargs):
561         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
562         if d:
563             raise ValueError('Invalid argument {} to {}'
564                              .format(list(d), msg.name))
565
566     def _call_vpp(self, i, msg, multipart, **kwargs):
567         """Given a message, send the message and await a reply.
568
569         msgdef - the message packing definition
570         i - the message type index
571         multipart - True if the message returns multiple
572         messages in return.
573         context - context number - chosen at random if not
574         supplied.
575         The remainder of the kwargs are the arguments to the API call.
576
577         The return value is the message or message array containing
578         the response.  It will raise an IOError exception if there was
579         no response within the timeout window.
580         """
581
582         if 'context' not in kwargs:
583             context = self.get_context()
584             kwargs['context'] = context
585         else:
586             context = kwargs['context']
587         kwargs['_vl_msg_id'] = i
588
589         self.validate_args(msg, kwargs)
590         b = msg.pack(kwargs)
591         vpp_api.vac_rx_suspend()
592         self._write(b)
593
594         if multipart:
595             # Send a ping after the request - we use its response
596             # to detect that we have seen all results.
597             self._control_ping(context)
598
599         # Block until we get a reply.
600         rl = []
601         while (True):
602             msg = self._read()
603             if not msg:
604                 raise IOError(2, 'VPP API client: read failed')
605             r = self.decode_incoming_msg(msg)
606             msgname = type(r).__name__
607             if context not in r or r.context == 0 or context != r.context:
608                 self.message_queue.put_nowait(r)
609                 continue
610
611             if not multipart:
612                 rl = r
613                 break
614             if msgname == 'control_ping_reply':
615                 break
616
617             rl.append(r)
618
619         vpp_api.vac_rx_resume()
620
621         return rl
622
623     def _call_vpp_async(self, i, msg, **kwargs):
624         """Given a message, send the message and await a reply.
625
626         msgdef - the message packing definition
627         i - the message type index
628         context - context number - chosen at random if not
629         supplied.
630         The remainder of the kwargs are the arguments to the API call.
631         """
632         if 'context' not in kwargs:
633             context = self.get_context()
634             kwargs['context'] = context
635         else:
636             context = kwargs['context']
637         kwargs['client_index'] = 0
638         kwargs['_vl_msg_id'] = i
639         b = msg.pack(kwargs)
640
641         self._write(b)
642
643     def register_event_callback(self, callback):
644         """Register a callback for async messages.
645
646         This will be called for async notifications in sync mode,
647         and all messages in async mode.  In sync mode, replies to
648         requests will not come here.
649
650         callback is a fn(msg_type_name, msg_type) that will be
651         called when a message comes in.  While this function is
652         executing, note that (a) you are in a background thread and
653         may wish to use threading.Lock to protect your datastructures,
654         and (b) message processing from VPP will stop (so if you take
655         a long while about it you may provoke reply timeouts or cause
656         VPP to fill the RX buffer).  Passing None will disable the
657         callback.
658         """
659         self.event_callback = callback
660
661     def thread_msg_handler(self):
662         """Python thread calling the user registerd message handler.
663
664         This is to emulate the old style event callback scheme. Modern
665         clients should provide their own thread to poll the event
666         queue.
667         """
668         while True:
669             r = self.message_queue.get()
670             if r == "terminate event thread":
671                 break
672             msgname = type(r).__name__
673             if self.event_callback:
674                 self.event_callback(msgname, r)
675
676
677 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4