vpp_papi: Add custom exceptions.
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import sys
20 import os
21 import logging
22 import collections
23 import struct
24 import json
25 import threading
26 import fnmatch
27 import weakref
28 import atexit
29 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
30 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
31 from . vpp_format import VPPFormat
32
33 if sys.version[0] == '2':
34     import Queue as queue
35 else:
36     import queue as queue
37
38
39 class VppEnumType(type):
40     def __getattr__(cls, name):
41         t = vpp_get_type(name)
42         return t.enum
43
44
45 # Python3
46 # class VppEnum(metaclass=VppEnumType):
47 #    pass
48 class VppEnum(object):
49     __metaclass__ = VppEnumType
50
51
52 def vpp_atexit(vpp_weakref):
53     """Clean up VPP connection on shutdown."""
54     vpp_instance = vpp_weakref()
55     if vpp_instance and vpp_instance.transport.connected:
56         vpp_instance.logger.debug('Cleaning up VPP on exit')
57         vpp_instance.disconnect()
58
59
60 def vpp_iterator(d):
61     if sys.version[0] == '2':
62         return d.iteritems()
63     else:
64         return d.items()
65
66
67 class VppApiDynamicMethodHolder(object):
68     pass
69
70
71 class FuncWrapper(object):
72     def __init__(self, func):
73         self._func = func
74         self.__name__ = func.__name__
75
76     def __call__(self, **kwargs):
77         return self._func(**kwargs)
78
79
80 class VPPApiError(Exception):
81     pass
82
83
84 class VPPNotImplementedError(NotImplementedError):
85     pass
86
87
88 class VPPIOError(IOError):
89     pass
90
91
92 class VPPRuntimeError(RuntimeError):
93     pass
94
95
96 class VPPValueError(ValueError):
97     pass
98
99
100 class VPP(object):
101     """VPP interface.
102
103     This class provides the APIs to VPP.  The APIs are loaded
104     from provided .api.json files and makes functions accordingly.
105     These functions are documented in the VPP .api files, as they
106     are dynamically created.
107
108     Additionally, VPP can send callback messages; this class
109     provides a means to register a callback function to receive
110     these messages in a background thread.
111     """
112     VPPApiError = VPPApiError
113     VPPRuntimeError = VPPRuntimeError
114     VPPValueError = VPPValueError
115     VPPNotImplementedError = VPPNotImplementedError
116     VPPIOError = VPPIOError
117
118     def process_json_file(self, apidef_file):
119         api = json.load(apidef_file)
120         types = {}
121         for t in api['enums']:
122             t[0] = 'vl_api_' + t[0] + '_t'
123             types[t[0]] = {'type': 'enum', 'data': t}
124         for t in api['unions']:
125             t[0] = 'vl_api_' + t[0] + '_t'
126             types[t[0]] = {'type': 'union', 'data': t}
127         for t in api['types']:
128             t[0] = 'vl_api_' + t[0] + '_t'
129             types[t[0]] = {'type': 'type', 'data': t}
130         for t, v in api['aliases'].items():
131             types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
132
133         i = 0
134         while True:
135             unresolved = {}
136             for k, v in types.items():
137                 t = v['data']
138                 if not vpp_get_type(k):
139                     if v['type'] == 'enum':
140                         try:
141                             VPPEnumType(t[0], t[1:])
142                         except ValueError:
143                             unresolved[k] = v
144                     elif v['type'] == 'union':
145                         try:
146                             VPPUnionType(t[0], t[1:])
147                         except ValueError:
148                             unresolved[k] = v
149                     elif v['type'] == 'type':
150                         try:
151                             VPPType(t[0], t[1:])
152                         except ValueError:
153                             unresolved[k] = v
154                     elif v['type'] == 'alias':
155                         try:
156                             VPPTypeAlias(k, t)
157                         except ValueError:
158                             unresolved[k] = v
159             if len(unresolved) == 0:
160                 break
161             if i > 3:
162                 raise VPPValueError('Unresolved type definitions {}'
163                                     .format(unresolved))
164             types = unresolved
165             i += 1
166
167         for m in api['messages']:
168             try:
169                 self.messages[m[0]] = VPPMessage(m[0], m[1:])
170             except VPPNotImplementedError:
171                 self.logger.error('Not implemented error for {}'.format(m[0]))
172
173     def __init__(self, apifiles=None, testmode=False, async_thread=True,
174                  logger=None, loglevel=None,
175                  read_timeout=5, use_socket=False,
176                  server_address='/run/vpp-api.sock'):
177         """Create a VPP API object.
178
179         apifiles is a list of files containing API
180         descriptions that will be loaded - methods will be
181         dynamically created reflecting these APIs.  If not
182         provided this will load the API files from VPP's
183         default install location.
184
185         logger, if supplied, is the logging logger object to log to.
186         loglevel, if supplied, is the log level this logger is set
187         to report at (from the loglevels in the logging module).
188         """
189         if logger is None:
190             logger = logging.getLogger(__name__)
191             if loglevel is not None:
192                 logger.setLevel(loglevel)
193         self.logger = logger
194
195         self.messages = {}
196         self.id_names = []
197         self.id_msgdef = []
198         self.header = VPPType('header', [['u16', 'msgid'],
199                                          ['u32', 'client_index']])
200         self.apifiles = []
201         self.event_callback = None
202         self.message_queue = queue.Queue()
203         self.read_timeout = read_timeout
204         self.async_thread = async_thread
205
206         if use_socket:
207             from . vpp_transport_socket import VppTransport
208         else:
209             from . vpp_transport_shmem import VppTransport
210
211         if not apifiles:
212             # Pick up API definitions from default directory
213             try:
214                 apifiles = self.find_api_files()
215             except RuntimeError:
216                 # In test mode we don't care that we can't find the API files
217                 if testmode:
218                     apifiles = []
219                 else:
220                     raise VPPRuntimeError
221
222         for file in apifiles:
223             with open(file) as apidef_file:
224                 self.process_json_file(apidef_file)
225
226         self.apifiles = apifiles
227
228         # Basic sanity check
229         if len(self.messages) == 0 and not testmode:
230             raise VPPValueError(1, 'Missing JSON message definitions')
231
232         self.transport = VppTransport(self, read_timeout=read_timeout,
233                                       server_address=server_address)
234         # Make sure we allow VPP to clean up the message rings.
235         atexit.register(vpp_atexit, weakref.ref(self))
236
237     class ContextId(object):
238         """Thread-safe provider of unique context IDs."""
239         def __init__(self):
240             self.context = 0
241             self.lock = threading.Lock()
242
243         def __call__(self):
244             """Get a new unique (or, at least, not recently used) context."""
245             with self.lock:
246                 self.context += 1
247                 return self.context
248     get_context = ContextId()
249
250     def get_type(self, name):
251         return vpp_get_type(name)
252
253     @classmethod
254     def find_api_dir(cls):
255         """Attempt to find the best directory in which API definition
256         files may reside. If the value VPP_API_DIR exists in the environment
257         then it is first on the search list. If we're inside a recognized
258         location in a VPP source tree (src/scripts and src/vpp-api/python)
259         then entries from there to the likely locations in build-root are
260         added. Finally the location used by system packages is added.
261
262         :returns: A single directory name, or None if no such directory
263             could be found.
264         """
265         dirs = []
266
267         if 'VPP_API_DIR' in os.environ:
268             dirs.append(os.environ['VPP_API_DIR'])
269
270         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
271         # in which case, plot a course to likely places in the src tree
272         import __main__ as main
273         if hasattr(main, '__file__'):
274             # get the path of the calling script
275             localdir = os.path.dirname(os.path.realpath(main.__file__))
276         else:
277             # use cwd if there is no calling script
278             localdir = os.getcwd()
279         localdir_s = localdir.split(os.path.sep)
280
281         def dmatch(dir):
282             """Match dir against right-hand components of the script dir"""
283             d = dir.split('/')  # param 'dir' assumes a / separator
284             length = len(d)
285             return len(localdir_s) > length and localdir_s[-length:] == d
286
287         def sdir(srcdir, variant):
288             """Build a path from srcdir to the staged API files of
289             'variant'  (typically '' or '_debug')"""
290             # Since 'core' and 'plugin' files are staged
291             # in separate directories, we target the parent dir.
292             return os.path.sep.join((
293                 srcdir,
294                 'build-root',
295                 'install-vpp%s-native' % variant,
296                 'vpp',
297                 'share',
298                 'vpp',
299                 'api',
300             ))
301
302         srcdir = None
303         if dmatch('src/scripts'):
304             srcdir = os.path.sep.join(localdir_s[:-2])
305         elif dmatch('src/vpp-api/python'):
306             srcdir = os.path.sep.join(localdir_s[:-3])
307         elif dmatch('test'):
308             # we're apparently running tests
309             srcdir = os.path.sep.join(localdir_s[:-1])
310
311         if srcdir:
312             # we're in the source tree, try both the debug and release
313             # variants.
314             dirs.append(sdir(srcdir, '_debug'))
315             dirs.append(sdir(srcdir, ''))
316
317         # Test for staged copies of the scripts
318         # For these, since we explicitly know if we're running a debug versus
319         # release variant, target only the relevant directory
320         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
321             srcdir = os.path.sep.join(localdir_s[:-4])
322             dirs.append(sdir(srcdir, '_debug'))
323         if dmatch('build-root/install-vpp-native/vpp/bin'):
324             srcdir = os.path.sep.join(localdir_s[:-4])
325             dirs.append(sdir(srcdir, ''))
326
327         # finally, try the location system packages typically install into
328         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
329
330         # check the directories for existance; first one wins
331         for dir in dirs:
332             if os.path.isdir(dir):
333                 return dir
334
335         return None
336
337     @classmethod
338     def find_api_files(cls, api_dir=None, patterns='*'):
339         """Find API definition files from the given directory tree with the
340         given pattern. If no directory is given then find_api_dir() is used
341         to locate one. If no pattern is given then all definition files found
342         in the directory tree are used.
343
344         :param api_dir: A directory tree in which to locate API definition
345             files; subdirectories are descended into.
346             If this is None then find_api_dir() is called to discover it.
347         :param patterns: A list of patterns to use in each visited directory
348             when looking for files.
349             This can be a list/tuple object or a comma-separated string of
350             patterns. Each value in the list will have leading/trialing
351             whitespace stripped.
352             The pattern specifies the first part of the filename, '.api.json'
353             is appended.
354             The results are de-duplicated, thus overlapping patterns are fine.
355             If this is None it defaults to '*' meaning "all API files".
356         :returns: A list of file paths for the API files found.
357         """
358         if api_dir is None:
359             api_dir = cls.find_api_dir()
360             if api_dir is None:
361                 raise VPPApiError("api_dir cannot be located")
362
363         if isinstance(patterns, list) or isinstance(patterns, tuple):
364             patterns = [p.strip() + '.api.json' for p in patterns]
365         else:
366             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
367
368         api_files = []
369         for root, dirnames, files in os.walk(api_dir):
370             # iterate all given patterns and de-dup the result
371             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
372             for filename in files:
373                 api_files.append(os.path.join(root, filename))
374
375         return api_files
376
377     @property
378     def api(self):
379         if not hasattr(self, "_api"):
380             raise VPPApiError("Not connected, api definitions not available")
381         return self._api
382
383     def make_function(self, msg, i, multipart, do_async):
384         if (do_async):
385             def f(**kwargs):
386                 return self._call_vpp_async(i, msg, **kwargs)
387         else:
388             def f(**kwargs):
389                 return self._call_vpp(i, msg, multipart, **kwargs)
390
391         f.__name__ = str(msg.name)
392         f.__doc__ = ", ".join(["%s %s" %
393                                (msg.fieldtypes[j], k)
394                                for j, k in enumerate(msg.fields)])
395         return f
396
397     def _register_functions(self, do_async=False):
398         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
399         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
400         self._api = VppApiDynamicMethodHolder()
401         for name, msg in vpp_iterator(self.messages):
402             n = name + '_' + msg.crc[2:]
403             i = self.transport.get_msg_index(n.encode())
404             if i > 0:
405                 self.id_msgdef[i] = msg
406                 self.id_names[i] = name
407                 # TODO: Fix multipart (use services)
408                 multipart = True if name.find('_dump') > 0 else False
409                 f = self.make_function(msg, i, multipart, do_async)
410                 setattr(self._api, name, FuncWrapper(f))
411             else:
412                 self.logger.debug(
413                     'No such message type or failed CRC checksum: %s', n)
414
415     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
416                          do_async):
417         pfx = chroot_prefix.encode() if chroot_prefix else None
418
419         rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
420         if rv != 0:
421             raise VPPIOError(2, 'Connect failed')
422         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
423         self._register_functions(do_async=do_async)
424
425         # Initialise control ping
426         crc = self.messages['control_ping'].crc
427         self.control_ping_index = self.transport.get_msg_index(
428             ('control_ping' + '_' + crc[2:]).encode())
429         self.control_ping_msgdef = self.messages['control_ping']
430         if self.async_thread:
431             self.event_thread = threading.Thread(
432                 target=self.thread_msg_handler)
433             self.event_thread.daemon = True
434             self.event_thread.start()
435         return rv
436
437     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
438         """Attach to VPP.
439
440         name - the name of the client.
441         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
442         do_async - if true, messages are sent without waiting for a reply
443         rx_qlen - the length of the VPP message receive queue between
444         client and server.
445         """
446         msg_handler = self.transport.get_callback(do_async)
447         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
448                                      do_async)
449
450     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
451         """Attach to VPP in synchronous mode. Application must poll for events.
452
453         name - the name of the client.
454         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
455         rx_qlen - the length of the VPP message receive queue between
456         client and server.
457         """
458
459         return self.connect_internal(name, None, chroot_prefix, rx_qlen,
460                                      do_async=False)
461
462     def disconnect(self):
463         """Detach from VPP."""
464         rv = self.transport.disconnect()
465         self.message_queue.put("terminate event thread")
466         return rv
467
468     def msg_handler_sync(self, msg):
469         """Process an incoming message from VPP in sync mode.
470
471         The message may be a reply or it may be an async notification.
472         """
473         r = self.decode_incoming_msg(msg)
474         if r is None:
475             return
476
477         # If we have a context, then use the context to find any
478         # request waiting for a reply
479         context = 0
480         if hasattr(r, 'context') and r.context > 0:
481             context = r.context
482
483         if context == 0:
484             # No context -> async notification that we feed to the callback
485             self.message_queue.put_nowait(r)
486         else:
487             raise VPPIOError(2, 'RPC reply message received in event handler')
488
489     def decode_incoming_msg(self, msg):
490         if not msg:
491             self.logger.warning('vpp_api.read failed')
492             return
493         (i, ci), size = self.header.unpack(msg, 0)
494         if self.id_names[i] == 'rx_thread_exit':
495             return
496
497         #
498         # Decode message and returns a tuple.
499         #
500         msgobj = self.id_msgdef[i]
501         if not msgobj:
502             raise VPPIOError(2, 'Reply message undefined')
503
504         r, size = msgobj.unpack(msg)
505         return r
506
507     def msg_handler_async(self, msg):
508         """Process a message from VPP in async mode.
509
510         In async mode, all messages are returned to the callback.
511         """
512         r = self.decode_incoming_msg(msg)
513         if r is None:
514             return
515
516         msgname = type(r).__name__
517
518         if self.event_callback:
519             self.event_callback(msgname, r)
520
521     def _control_ping(self, context):
522         """Send a ping command."""
523         self._call_vpp_async(self.control_ping_index,
524                              self.control_ping_msgdef,
525                              context=context)
526
527     def validate_args(self, msg, kwargs):
528         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
529         if d:
530             raise VPPValueError('Invalid argument {} to {}'
531                              .format(list(d), msg.name))
532
533     def _call_vpp(self, i, msg, multipart, **kwargs):
534         """Given a message, send the message and await a reply.
535
536         msgdef - the message packing definition
537         i - the message type index
538         multipart - True if the message returns multiple
539         messages in return.
540         context - context number - chosen at random if not
541         supplied.
542         The remainder of the kwargs are the arguments to the API call.
543
544         The return value is the message or message array containing
545         the response.  It will raise an IOError exception if there was
546         no response within the timeout window.
547         """
548
549         if 'context' not in kwargs:
550             context = self.get_context()
551             kwargs['context'] = context
552         else:
553             context = kwargs['context']
554         kwargs['_vl_msg_id'] = i
555
556         try:
557             if self.transport.socket_index:
558                 kwargs['client_index'] = self.transport.socket_index
559         except AttributeError:
560             pass
561         self.validate_args(msg, kwargs)
562         b = msg.pack(kwargs)
563         self.transport.suspend()
564
565         self.transport.write(b)
566
567         if multipart:
568             # Send a ping after the request - we use its response
569             # to detect that we have seen all results.
570             self._control_ping(context)
571
572         # Block until we get a reply.
573         rl = []
574         while (True):
575             msg = self.transport.read()
576             if not msg:
577                 raise VPPIOError(2, 'VPP API client: read failed')
578             r = self.decode_incoming_msg(msg)
579             msgname = type(r).__name__
580             if context not in r or r.context == 0 or context != r.context:
581                 # Message being queued
582                 self.message_queue.put_nowait(r)
583                 continue
584
585             if not multipart:
586                 rl = r
587                 break
588             if msgname == 'control_ping_reply':
589                 break
590
591             rl.append(r)
592
593         self.transport.resume()
594
595         return rl
596
597     def _call_vpp_async(self, i, msg, **kwargs):
598         """Given a message, send the message and await a reply.
599
600         msgdef - the message packing definition
601         i - the message type index
602         context - context number - chosen at random if not
603         supplied.
604         The remainder of the kwargs are the arguments to the API call.
605         """
606         if 'context' not in kwargs:
607             context = self.get_context()
608             kwargs['context'] = context
609         else:
610             context = kwargs['context']
611         try:
612             if self.transport.socket_index:
613                 kwargs['client_index'] = self.transport.socket_index
614         except AttributeError:
615             kwargs['client_index'] = 0
616         kwargs['_vl_msg_id'] = i
617         b = msg.pack(kwargs)
618
619         self.transport.write(b)
620
621     def register_event_callback(self, callback):
622         """Register a callback for async messages.
623
624         This will be called for async notifications in sync mode,
625         and all messages in async mode.  In sync mode, replies to
626         requests will not come here.
627
628         callback is a fn(msg_type_name, msg_type) that will be
629         called when a message comes in.  While this function is
630         executing, note that (a) you are in a background thread and
631         may wish to use threading.Lock to protect your datastructures,
632         and (b) message processing from VPP will stop (so if you take
633         a long while about it you may provoke reply timeouts or cause
634         VPP to fill the RX buffer).  Passing None will disable the
635         callback.
636         """
637         self.event_callback = callback
638
639     def thread_msg_handler(self):
640         """Python thread calling the user registered message handler.
641
642         This is to emulate the old style event callback scheme. Modern
643         clients should provide their own thread to poll the event
644         queue.
645         """
646         while True:
647             r = self.message_queue.get()
648             if r == "terminate event thread":
649                 break
650             msgname = type(r).__name__
651             if self.event_callback:
652                 self.event_callback(msgname, r)
653
654
655 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4