Python API: Add enum and union support.
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 import sys
19 import os
20 import logging
21 import collections
22 import struct
23 import json
24 import threading
25 import fnmatch
26 import weakref
27 import atexit
28 from cffi import FFI
29 import cffi
30 from vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
31
32 if sys.version[0] == '2':
33     import Queue as queue
34 else:
35     import queue as queue
36
37 ffi = FFI()
38 ffi.cdef("""
39 typedef void (*vac_callback_t)(unsigned char * data, int len);
40 typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
41 int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
42     int rx_qlen);
43 int vac_disconnect(void);
44 int vac_read(char **data, int *l, unsigned short timeout);
45 int vac_write(char *data, int len);
46 void vac_free(void * msg);
47
48 int vac_get_msg_index(unsigned char * name);
49 int vac_msg_table_size(void);
50 int vac_msg_table_max_index(void);
51
52 void vac_rx_suspend (void);
53 void vac_rx_resume (void);
54 void vac_set_error_handler(vac_error_callback_t);
55  """)
56
57 # Barfs on failure, no need to check success.
58 vpp_api = ffi.dlopen('libvppapiclient.so')
59
60
61 def vpp_atexit(vpp_weakref):
62     """Clean up VPP connection on shutdown."""
63     vpp_instance = vpp_weakref()
64     if vpp_instance.connected:
65         vpp_instance.logger.debug('Cleaning up VPP on exit')
66         vpp_instance.disconnect()
67
68
69 vpp_object = None
70
71
72 def vpp_iterator(d):
73     if sys.version[0] == '2':
74         return d.iteritems()
75     else:
76         return d.items()
77
78
79 @ffi.callback("void(unsigned char *, int)")
80 def vac_callback_sync(data, len):
81     vpp_object.msg_handler_sync(ffi.buffer(data, len))
82
83
84 @ffi.callback("void(unsigned char *, int)")
85 def vac_callback_async(data, len):
86     vpp_object.msg_handler_async(ffi.buffer(data, len))
87
88
89 @ffi.callback("void(void *, unsigned char *, int)")
90 def vac_error_handler(arg, msg, msg_len):
91     vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
92
93
94 class Empty(object):
95     pass
96
97
98 class FuncWrapper(object):
99     def __init__(self, func):
100         self._func = func
101         self.__name__ = func.__name__
102
103     def __call__(self, **kwargs):
104         return self._func(**kwargs)
105
106
107 class VPPMessage(VPPType):
108     pass
109
110 class VPP():
111     """VPP interface.
112
113     This class provides the APIs to VPP.  The APIs are loaded
114     from provided .api.json files and makes functions accordingly.
115     These functions are documented in the VPP .api files, as they
116     are dynamically created.
117
118     Additionally, VPP can send callback messages; this class
119     provides a means to register a callback function to receive
120     these messages in a background thread.
121     """
122
123     def process_json_file(self, apidef_file):
124         api = json.load(apidef_file)
125         types = {}
126         for t in api['enums']:
127             t[0] = 'vl_api_' + t[0] + '_t'
128             types[t[0]] = {'type': 'enum', 'data': t}
129         for t in api['unions']:
130             t[0] = 'vl_api_' + t[0] + '_t'
131             types[t[0]] = {'type': 'union', 'data': t}
132         for t in api['types']:
133             t[0] = 'vl_api_' + t[0] + '_t'
134             types[t[0]] = {'type': 'type', 'data': t}
135
136         i = 0
137         while True:
138             unresolved = {}
139             for k, v in types.items():
140                 t = v['data']
141                 if v['type'] == 'enum':
142                     try:
143                         VPPEnumType(t[0], t[1:])
144                     except ValueError:
145                         unresolved[k] = v
146                 elif v['type'] == 'union':
147                     try:
148                         VPPUnionType(t[0], t[1:])
149                     except ValueError:
150                         unresolved[k] = v
151                 elif v['type'] == 'type':
152                     try:
153                         VPPType(t[0], t[1:])
154                     except ValueError:
155                         unresolved[k] = v
156             if len(unresolved) == 0:
157                 break
158             if i > 3:
159                 raise ValueError('Unresolved type definitions {}'
160                                  .format(unresolved))
161             types = unresolved
162             i += 1
163
164         for m in api['messages']:
165             try:
166                 self.messages[m[0]] = VPPMessage(m[0], m[1:])
167             except NotImplementedError:
168                 self.logger.error('Not implemented error for {}'.format(m[0]))
169
170     def __init__(self, apifiles=None, testmode=False, async_thread=True,
171                  logger=logging.getLogger('vpp_papi'), loglevel='debug',
172                  read_timeout=0):
173         """Create a VPP API object.
174
175         apifiles is a list of files containing API
176         descriptions that will be loaded - methods will be
177         dynamically created reflecting these APIs.  If not
178         provided this will load the API files from VPP's
179         default install location.
180
181         logger, if supplied, is the logging logger object to log to.
182         loglevel, if supplied, is the log level this logger is set
183         to report at (from the loglevels in the logging module).
184         """
185         global vpp_object
186         vpp_object = self
187
188         if logger is None:
189             logger = logging.getLogger(__name__)
190             if loglevel is not None:
191                 logger.setLevel(loglevel)
192         self.logger = logger
193
194         self.messages = {}
195         self.id_names = []
196         self.id_msgdef = []
197         self.connected = False
198         self.header = VPPType('header', [['u16', 'msgid'],
199                                          ['u32', 'client_index']])
200         self.apifiles = []
201         self.event_callback = None
202         self.message_queue = queue.Queue()
203         self.read_timeout = read_timeout
204         self.vpp_api = vpp_api
205         self.async_thread = async_thread
206
207         if not apifiles:
208             # Pick up API definitions from default directory
209             try:
210                 apifiles = self.find_api_files()
211             except RuntimeError:
212                 # In test mode we don't care that we can't find the API files
213                 if testmode:
214                     apifiles = []
215                 else:
216                     raise
217
218         for file in apifiles:
219             with open(file) as apidef_file:
220                 self.process_json_file(apidef_file)
221
222         self.apifiles = apifiles
223
224         # Basic sanity check
225         if len(self.messages) == 0 and not testmode:
226             raise ValueError(1, 'Missing JSON message definitions')
227
228         # Make sure we allow VPP to clean up the message rings.
229         atexit.register(vpp_atexit, weakref.ref(self))
230
231         # Register error handler
232         if not testmode:
233             vpp_api.vac_set_error_handler(vac_error_handler)
234
235         # Support legacy CFFI
236         # from_buffer supported from 1.8.0
237         (major, minor, patch) = [int(s) for s in
238                                  cffi.__version__.split('.', 3)]
239         if major >= 1 and minor >= 8:
240             self._write = self._write_new_cffi
241         else:
242             self._write = self._write_legacy_cffi
243
244     class ContextId(object):
245         """Thread-safe provider of unique context IDs."""
246         def __init__(self):
247             self.context = 0
248             self.lock = threading.Lock()
249
250         def __call__(self):
251             """Get a new unique (or, at least, not recently used) context."""
252             with self.lock:
253                 self.context += 1
254                 return self.context
255     get_context = ContextId()
256
257     @classmethod
258     def find_api_dir(cls):
259         """Attempt to find the best directory in which API definition
260         files may reside. If the value VPP_API_DIR exists in the environment
261         then it is first on the search list. If we're inside a recognized
262         location in a VPP source tree (src/scripts and src/vpp-api/python)
263         then entries from there to the likely locations in build-root are
264         added. Finally the location used by system packages is added.
265
266         :returns: A single directory name, or None if no such directory
267             could be found.
268         """
269         dirs = []
270
271         if 'VPP_API_DIR' in os.environ:
272             dirs.append(os.environ['VPP_API_DIR'])
273
274         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
275         # in which case, plot a course to likely places in the src tree
276         import __main__ as main
277         if hasattr(main, '__file__'):
278             # get the path of the calling script
279             localdir = os.path.dirname(os.path.realpath(main.__file__))
280         else:
281             # use cwd if there is no calling script
282             localdir = os.getcwd()
283         localdir_s = localdir.split(os.path.sep)
284
285         def dmatch(dir):
286             """Match dir against right-hand components of the script dir"""
287             d = dir.split('/')  # param 'dir' assumes a / separator
288             length = len(d)
289             return len(localdir_s) > length and localdir_s[-length:] == d
290
291         def sdir(srcdir, variant):
292             """Build a path from srcdir to the staged API files of
293             'variant'  (typically '' or '_debug')"""
294             # Since 'core' and 'plugin' files are staged
295             # in separate directories, we target the parent dir.
296             return os.path.sep.join((
297                 srcdir,
298                 'build-root',
299                 'install-vpp%s-native' % variant,
300                 'vpp',
301                 'share',
302                 'vpp',
303                 'api',
304             ))
305
306         srcdir = None
307         if dmatch('src/scripts'):
308             srcdir = os.path.sep.join(localdir_s[:-2])
309         elif dmatch('src/vpp-api/python'):
310             srcdir = os.path.sep.join(localdir_s[:-3])
311         elif dmatch('test'):
312             # we're apparently running tests
313             srcdir = os.path.sep.join(localdir_s[:-1])
314
315         if srcdir:
316             # we're in the source tree, try both the debug and release
317             # variants.
318             dirs.append(sdir(srcdir, '_debug'))
319             dirs.append(sdir(srcdir, ''))
320
321         # Test for staged copies of the scripts
322         # For these, since we explicitly know if we're running a debug versus
323         # release variant, target only the relevant directory
324         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
325             srcdir = os.path.sep.join(localdir_s[:-4])
326             dirs.append(sdir(srcdir, '_debug'))
327         if dmatch('build-root/install-vpp-native/vpp/bin'):
328             srcdir = os.path.sep.join(localdir_s[:-4])
329             dirs.append(sdir(srcdir, ''))
330
331         # finally, try the location system packages typically install into
332         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
333
334         # check the directories for existance; first one wins
335         for dir in dirs:
336             if os.path.isdir(dir):
337                 return dir
338
339         return None
340
341     @classmethod
342     def find_api_files(cls, api_dir=None, patterns='*'):
343         """Find API definition files from the given directory tree with the
344         given pattern. If no directory is given then find_api_dir() is used
345         to locate one. If no pattern is given then all definition files found
346         in the directory tree are used.
347
348         :param api_dir: A directory tree in which to locate API definition
349             files; subdirectories are descended into.
350             If this is None then find_api_dir() is called to discover it.
351         :param patterns: A list of patterns to use in each visited directory
352             when looking for files.
353             This can be a list/tuple object or a comma-separated string of
354             patterns. Each value in the list will have leading/trialing
355             whitespace stripped.
356             The pattern specifies the first part of the filename, '.api.json'
357             is appended.
358             The results are de-duplicated, thus overlapping patterns are fine.
359             If this is None it defaults to '*' meaning "all API files".
360         :returns: A list of file paths for the API files found.
361         """
362         if api_dir is None:
363             api_dir = cls.find_api_dir()
364             if api_dir is None:
365                 raise RuntimeError("api_dir cannot be located")
366
367         if isinstance(patterns, list) or isinstance(patterns, tuple):
368             patterns = [p.strip() + '.api.json' for p in patterns]
369         else:
370             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
371
372         api_files = []
373         for root, dirnames, files in os.walk(api_dir):
374             # iterate all given patterns and de-dup the result
375             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
376             for filename in files:
377                 api_files.append(os.path.join(root, filename))
378
379         return api_files
380
381     def status(self):
382         """Debug function: report current VPP API status to stdout."""
383         print('Connected') if self.connected else print('Not Connected')
384         print('Read API definitions from', ', '.join(self.apifiles))
385
386     @property
387     def api(self):
388         if not hasattr(self, "_api"):
389             raise Exception("Not connected, api definitions not available")
390         return self._api
391
392     def make_function(self, msg, i, multipart, async):
393         if (async):
394             def f(**kwargs):
395                 return self._call_vpp_async(i, msg, **kwargs)
396         else:
397             def f(**kwargs):
398                 return self._call_vpp(i, msg, multipart, **kwargs)
399
400         f.__name__ = str(msg.name)
401         f.__doc__ = ", ".join(["%s %s" %
402                                (msg.fieldtypes[j], k) for j, k in enumerate(msg.fields)])
403         return f
404
405     def _register_functions(self, async=False):
406         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
407         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
408         self._api = Empty()
409         for name, msg in vpp_iterator(self.messages):
410             n = name + '_' + msg.crc[2:]
411             i = vpp_api.vac_get_msg_index(n.encode())
412             if i > 0:
413                 self.id_msgdef[i] = msg
414                 self.id_names[i] = name
415                 # TODO: Fix multipart (use services)
416                 multipart = True if name.find('_dump') > 0 else False
417                 f = self.make_function(msg, i, multipart, async)
418                 setattr(self._api, name, FuncWrapper(f))
419             else:
420                 self.logger.debug(
421                     'No such message type or failed CRC checksum: %s', n)
422
423     def _write_new_cffi(self, buf):
424         """Send a binary-packed message to VPP."""
425         if not self.connected:
426             raise IOError(1, 'Not connected')
427         return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
428
429     def _write_legacy_cffi(self, buf):
430         """Send a binary-packed message to VPP."""
431         if not self.connected:
432             raise IOError(1, 'Not connected')
433         return vpp_api.vac_write(bytes(buf), len(buf))
434
435     def _read(self):
436         if not self.connected:
437             raise IOError(1, 'Not connected')
438         mem = ffi.new("char **")
439         size = ffi.new("int *")
440         rv = vpp_api.vac_read(mem, size, self.read_timeout)
441         if rv:
442             raise IOError(rv, 'vac_read failed')
443         msg = bytes(ffi.buffer(mem[0], size[0]))
444         vpp_api.vac_free(mem[0])
445         return msg
446
447     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
448                          async):
449         pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
450         rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
451         if rv != 0:
452             raise IOError(2, 'Connect failed')
453         self.connected = True
454         self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
455         self._register_functions(async=async)
456
457         # Initialise control ping
458         crc = self.messages['control_ping'].crc
459         self.control_ping_index = vpp_api.vac_get_msg_index(
460             ('control_ping' + '_' + crc[2:]).encode())
461         self.control_ping_msgdef = self.messages['control_ping']
462         if self.async_thread:
463             self.event_thread = threading.Thread(
464                 target=self.thread_msg_handler)
465             self.event_thread.daemon = True
466             self.event_thread.start()
467         return rv
468
469     def connect(self, name, chroot_prefix=None, async=False, rx_qlen=32):
470         """Attach to VPP.
471
472         name - the name of the client.
473         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
474         async - if true, messages are sent without waiting for a reply
475         rx_qlen - the length of the VPP message receive queue between
476         client and server.
477         """
478         msg_handler = vac_callback_sync if not async else vac_callback_async
479         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
480                                      async)
481
482     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
483         """Attach to VPP in synchronous mode. Application must poll for events.
484
485         name - the name of the client.
486         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
487         rx_qlen - the length of the VPP message receive queue between
488         client and server.
489         """
490
491         return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
492                                      async=False)
493
494     def disconnect(self):
495         """Detach from VPP."""
496         rv = vpp_api.vac_disconnect()
497         self.connected = False
498         self.message_queue.put("terminate event thread")
499         return rv
500
501     def msg_handler_sync(self, msg):
502         """Process an incoming message from VPP in sync mode.
503
504         The message may be a reply or it may be an async notification.
505         """
506         r = self.decode_incoming_msg(msg)
507         if r is None:
508             return
509
510         # If we have a context, then use the context to find any
511         # request waiting for a reply
512         context = 0
513         if hasattr(r, 'context') and r.context > 0:
514             context = r.context
515
516         if context == 0:
517             # No context -> async notification that we feed to the callback
518             self.message_queue.put_nowait(r)
519         else:
520             raise IOError(2, 'RPC reply message received in event handler')
521
522     def decode_incoming_msg(self, msg):
523         if not msg:
524             self.logger.warning('vpp_api.read failed')
525             return
526
527         i, ci = self.header.unpack(msg, 0)
528         if self.id_names[i] == 'rx_thread_exit':
529             return
530
531         #
532         # Decode message and returns a tuple.
533         #
534         msgobj = self.id_msgdef[i]
535         if not msgobj:
536             raise IOError(2, 'Reply message undefined')
537
538         r = msgobj.unpack(msg)
539
540         return r
541
542     def msg_handler_async(self, msg):
543         """Process a message from VPP in async mode.
544
545         In async mode, all messages are returned to the callback.
546         """
547         r = self.decode_incoming_msg(msg)
548         if r is None:
549             return
550
551         msgname = type(r).__name__
552
553         if self.event_callback:
554             self.event_callback(msgname, r)
555
556     def _control_ping(self, context):
557         """Send a ping command."""
558         self._call_vpp_async(self.control_ping_index,
559                              self.control_ping_msgdef,
560                              context=context)
561
562     def validate_args(self, msg, kwargs):
563         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
564         if d:
565             raise ValueError('Invalid argument {} to {}'.format(list(d), msg.name))
566
567     def _call_vpp(self, i, msg, multipart, **kwargs):
568         """Given a message, send the message and await a reply.
569
570         msgdef - the message packing definition
571         i - the message type index
572         multipart - True if the message returns multiple
573         messages in return.
574         context - context number - chosen at random if not
575         supplied.
576         The remainder of the kwargs are the arguments to the API call.
577
578         The return value is the message or message array containing
579         the response.  It will raise an IOError exception if there was
580         no response within the timeout window.
581         """
582
583         if 'context' not in kwargs:
584             context = self.get_context()
585             kwargs['context'] = context
586         else:
587             context = kwargs['context']
588         kwargs['_vl_msg_id'] = i
589
590         self.validate_args(msg, kwargs)
591         b = msg.pack(kwargs)
592         vpp_api.vac_rx_suspend()
593         self._write(b)
594
595         if multipart:
596             # Send a ping after the request - we use its response
597             # to detect that we have seen all results.
598             self._control_ping(context)
599
600         # Block until we get a reply.
601         rl = []
602         while (True):
603             msg = self._read()
604             if not msg:
605                 raise IOError(2, 'VPP API client: read failed')
606             r = self.decode_incoming_msg(msg)
607             msgname = type(r).__name__
608             if context not in r or r.context == 0 or context != r.context:
609                 self.message_queue.put_nowait(r)
610                 continue
611
612             if not multipart:
613                 rl = r
614                 break
615             if msgname == 'control_ping_reply':
616                 break
617
618             rl.append(r)
619
620         vpp_api.vac_rx_resume()
621
622         return rl
623
624     def _call_vpp_async(self, i, msg, **kwargs):
625         """Given a message, send the message and await a reply.
626
627         msgdef - the message packing definition
628         i - the message type index
629         context - context number - chosen at random if not
630         supplied.
631         The remainder of the kwargs are the arguments to the API call.
632         """
633         if 'context' not in kwargs:
634             context = self.get_context()
635             kwargs['context'] = context
636         else:
637             context = kwargs['context']
638         kwargs['client_index'] = 0
639         kwargs['_vl_msg_id'] = i
640         b = msg.pack(kwargs)
641
642         self._write(b)
643
644     def register_event_callback(self, callback):
645         """Register a callback for async messages.
646
647         This will be called for async notifications in sync mode,
648         and all messages in async mode.  In sync mode, replies to
649         requests will not come here.
650
651         callback is a fn(msg_type_name, msg_type) that will be
652         called when a message comes in.  While this function is
653         executing, note that (a) you are in a background thread and
654         may wish to use threading.Lock to protect your datastructures,
655         and (b) message processing from VPP will stop (so if you take
656         a long while about it you may provoke reply timeouts or cause
657         VPP to fill the RX buffer).  Passing None will disable the
658         callback.
659         """
660         self.event_callback = callback
661
662     def thread_msg_handler(self):
663         """Python thread calling the user registerd message handler.
664
665         This is to emulate the old style event callback scheme. Modern
666         clients should provide their own thread to poll the event
667         queue.
668         """
669         while True:
670             r = self.message_queue.get()
671             if r == "terminate event thread":
672                 break
673             msgname = type(r).__name__
674             if self.event_callback:
675                 self.event_callback(msgname, r)
676
677
678 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4