misc: papi - add __repr__ to FuncWrapper
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import ctypes
20 import sys
21 import multiprocessing as mp
22 import os
23 import logging
24 import functools
25 import json
26 import threading
27 import fnmatch
28 import weakref
29 import atexit
30 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType
31 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
32
33 logger = logging.getLogger(__name__)
34
35 if sys.version[0] == '2':
36     import Queue as queue
37 else:
38     import queue as queue
39
40 __all__ = ('FuncWrapper', 'VPP', 'VppApiDynamicMethodHolder',
41            'VppEnum', 'VppEnumType',
42            'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
43            'VPPApiClient', )
44
45
46 def metaclass(metaclass):
47     @functools.wraps(metaclass)
48     def wrapper(cls):
49         return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
50
51     return wrapper
52
53
54 class VppEnumType(type):
55     def __getattr__(cls, name):
56         t = vpp_get_type(name)
57         return t.enum
58
59
60 @metaclass(VppEnumType)
61 class VppEnum(object):
62     pass
63
64
65 def vpp_atexit(vpp_weakref):
66     """Clean up VPP connection on shutdown."""
67     vpp_instance = vpp_weakref()
68     if vpp_instance and vpp_instance.transport.connected:
69         vpp_instance.logger.debug('Cleaning up VPP on exit')
70         vpp_instance.disconnect()
71
72
73 if sys.version[0] == '2':
74     def vpp_iterator(d):
75         return d.iteritems()
76 else:
77     def vpp_iterator(d):
78         return d.items()
79
80
81 def call_logger(msgdef, kwargs):
82     s = 'Calling {}('.format(msgdef.name)
83     for k, v in kwargs.items():
84         s += '{}:{} '.format(k, v)
85     s += ')'
86     return s
87
88
89 def return_logger(r):
90     s = 'Return from {}'.format(r)
91     return s
92
93
94 class VppApiDynamicMethodHolder(object):
95     pass
96
97
98 class FuncWrapper(object):
99     def __init__(self, func):
100         self._func = func
101         self.__name__ = func.__name__
102         self.__doc__ = func.__doc__
103
104     def __call__(self, **kwargs):
105         return self._func(**kwargs)
106
107     def __repr__(self):
108         return '<FuncWrapper(func=<%s(%s)>)>' % (self.__name__, self.__doc__)
109
110
111 class VPPApiError(Exception):
112     pass
113
114
115 class VPPNotImplementedError(NotImplementedError):
116     pass
117
118
119 class VPPIOError(IOError):
120     pass
121
122
123 class VPPRuntimeError(RuntimeError):
124     pass
125
126
127 class VPPValueError(ValueError):
128     pass
129
130
131 class VPPApiClient(object):
132     """VPP interface.
133
134     This class provides the APIs to VPP.  The APIs are loaded
135     from provided .api.json files and makes functions accordingly.
136     These functions are documented in the VPP .api files, as they
137     are dynamically created.
138
139     Additionally, VPP can send callback messages; this class
140     provides a means to register a callback function to receive
141     these messages in a background thread.
142     """
143     apidir = None
144     VPPApiError = VPPApiError
145     VPPRuntimeError = VPPRuntimeError
146     VPPValueError = VPPValueError
147     VPPNotImplementedError = VPPNotImplementedError
148     VPPIOError = VPPIOError
149
150     def process_json_file(self, apidef_file):
151         api = json.load(apidef_file)
152         types = {}
153         for t in api['enums']:
154             t[0] = 'vl_api_' + t[0] + '_t'
155             types[t[0]] = {'type': 'enum', 'data': t}
156         for t in api['unions']:
157             t[0] = 'vl_api_' + t[0] + '_t'
158             types[t[0]] = {'type': 'union', 'data': t}
159         for t in api['types']:
160             t[0] = 'vl_api_' + t[0] + '_t'
161             types[t[0]] = {'type': 'type', 'data': t}
162         for t, v in api['aliases'].items():
163             types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
164         self.services.update(api['services'])
165
166         i = 0
167         while True:
168             unresolved = {}
169             for k, v in types.items():
170                 t = v['data']
171                 if not vpp_get_type(k):
172                     if v['type'] == 'enum':
173                         try:
174                             VPPEnumType(t[0], t[1:])
175                         except ValueError:
176                             unresolved[k] = v
177                     elif v['type'] == 'union':
178                         try:
179                             VPPUnionType(t[0], t[1:])
180                         except ValueError:
181                             unresolved[k] = v
182                     elif v['type'] == 'type':
183                         try:
184                             VPPType(t[0], t[1:])
185                         except ValueError:
186                             unresolved[k] = v
187                     elif v['type'] == 'alias':
188                         try:
189                             VPPTypeAlias(k, t)
190                         except ValueError:
191                             unresolved[k] = v
192             if len(unresolved) == 0:
193                 break
194             if i > 3:
195                 raise VPPValueError('Unresolved type definitions {}'
196                                     .format(unresolved))
197             types = unresolved
198             i += 1
199
200         for m in api['messages']:
201             try:
202                 self.messages[m[0]] = VPPMessage(m[0], m[1:])
203             except VPPNotImplementedError:
204                 self.logger.error('Not implemented error for {}'.format(m[0]))
205
206     def __init__(self, apifiles=None, testmode=False, async_thread=True,
207                  logger=None, loglevel=None,
208                  read_timeout=5, use_socket=False,
209                  server_address='/run/vpp-api.sock'):
210         """Create a VPP API object.
211
212         apifiles is a list of files containing API
213         descriptions that will be loaded - methods will be
214         dynamically created reflecting these APIs.  If not
215         provided this will load the API files from VPP's
216         default install location.
217
218         logger, if supplied, is the logging logger object to log to.
219         loglevel, if supplied, is the log level this logger is set
220         to report at (from the loglevels in the logging module).
221         """
222         if logger is None:
223             logger = logging.getLogger(__name__)
224             if loglevel is not None:
225                 logger.setLevel(loglevel)
226         self.logger = logger
227
228         self.messages = {}
229         self.services = {}
230         self.id_names = []
231         self.id_msgdef = []
232         self.header = VPPType('header', [['u16', 'msgid'],
233                                          ['u32', 'client_index']])
234         self.apifiles = []
235         self.event_callback = None
236         self.message_queue = queue.Queue()
237         self.read_timeout = read_timeout
238         self.async_thread = async_thread
239         self.event_thread = None
240         self.testmode = testmode
241         self.use_socket = use_socket
242         self.server_address = server_address
243         self._apifiles = apifiles
244
245         if use_socket:
246             from . vpp_transport_socket import VppTransport
247         else:
248             from . vpp_transport_shmem import VppTransport
249
250         if not apifiles:
251             # Pick up API definitions from default directory
252             try:
253                 apifiles = self.find_api_files()
254             except RuntimeError:
255                 # In test mode we don't care that we can't find the API files
256                 if testmode:
257                     apifiles = []
258                 else:
259                     raise VPPRuntimeError
260
261         for file in apifiles:
262             with open(file) as apidef_file:
263                 self.process_json_file(apidef_file)
264
265         self.apifiles = apifiles
266
267         # Basic sanity check
268         if len(self.messages) == 0 and not testmode:
269             raise VPPValueError(1, 'Missing JSON message definitions')
270
271         self.transport = VppTransport(self, read_timeout=read_timeout,
272                                       server_address=server_address)
273         # Make sure we allow VPP to clean up the message rings.
274         atexit.register(vpp_atexit, weakref.ref(self))
275
276     class ContextId(object):
277         """Multiprocessing-safe provider of unique context IDs."""
278         def __init__(self):
279             self.context = mp.Value(ctypes.c_uint, 0)
280             self.lock = mp.Lock()
281
282         def __call__(self):
283             """Get a new unique (or, at least, not recently used) context."""
284             with self.lock:
285                 self.context.value += 1
286                 return self.context.value
287     get_context = ContextId()
288
289     def get_type(self, name):
290         return vpp_get_type(name)
291
292     @classmethod
293     def find_api_dir(cls):
294         """Attempt to find the best directory in which API definition
295         files may reside. If the value VPP_API_DIR exists in the environment
296         then it is first on the search list. If we're inside a recognized
297         location in a VPP source tree (src/scripts and src/vpp-api/python)
298         then entries from there to the likely locations in build-root are
299         added. Finally the location used by system packages is added.
300
301         :returns: A single directory name, or None if no such directory
302             could be found.
303         """
304         dirs = [cls.apidir] if cls.apidir else []
305
306         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
307         # in which case, plot a course to likely places in the src tree
308         import __main__ as main
309         if hasattr(main, '__file__'):
310             # get the path of the calling script
311             localdir = os.path.dirname(os.path.realpath(main.__file__))
312         else:
313             # use cwd if there is no calling script
314             localdir = os.getcwd()
315         localdir_s = localdir.split(os.path.sep)
316
317         def dmatch(dir):
318             """Match dir against right-hand components of the script dir"""
319             d = dir.split('/')  # param 'dir' assumes a / separator
320             length = len(d)
321             return len(localdir_s) > length and localdir_s[-length:] == d
322
323         def sdir(srcdir, variant):
324             """Build a path from srcdir to the staged API files of
325             'variant'  (typically '' or '_debug')"""
326             # Since 'core' and 'plugin' files are staged
327             # in separate directories, we target the parent dir.
328             return os.path.sep.join((
329                 srcdir,
330                 'build-root',
331                 'install-vpp%s-native' % variant,
332                 'vpp',
333                 'share',
334                 'vpp',
335                 'api',
336             ))
337
338         srcdir = None
339         if dmatch('src/scripts'):
340             srcdir = os.path.sep.join(localdir_s[:-2])
341         elif dmatch('src/vpp-api/python'):
342             srcdir = os.path.sep.join(localdir_s[:-3])
343         elif dmatch('test'):
344             # we're apparently running tests
345             srcdir = os.path.sep.join(localdir_s[:-1])
346
347         if srcdir:
348             # we're in the source tree, try both the debug and release
349             # variants.
350             dirs.append(sdir(srcdir, '_debug'))
351             dirs.append(sdir(srcdir, ''))
352
353         # Test for staged copies of the scripts
354         # For these, since we explicitly know if we're running a debug versus
355         # release variant, target only the relevant directory
356         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
357             srcdir = os.path.sep.join(localdir_s[:-4])
358             dirs.append(sdir(srcdir, '_debug'))
359         if dmatch('build-root/install-vpp-native/vpp/bin'):
360             srcdir = os.path.sep.join(localdir_s[:-4])
361             dirs.append(sdir(srcdir, ''))
362
363         # finally, try the location system packages typically install into
364         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
365
366         # check the directories for existence; first one wins
367         for dir in dirs:
368             if os.path.isdir(dir):
369                 return dir
370
371         return None
372
373     @classmethod
374     def find_api_files(cls, api_dir=None, patterns='*'):
375         """Find API definition files from the given directory tree with the
376         given pattern. If no directory is given then find_api_dir() is used
377         to locate one. If no pattern is given then all definition files found
378         in the directory tree are used.
379
380         :param api_dir: A directory tree in which to locate API definition
381             files; subdirectories are descended into.
382             If this is None then find_api_dir() is called to discover it.
383         :param patterns: A list of patterns to use in each visited directory
384             when looking for files.
385             This can be a list/tuple object or a comma-separated string of
386             patterns. Each value in the list will have leading/trialing
387             whitespace stripped.
388             The pattern specifies the first part of the filename, '.api.json'
389             is appended.
390             The results are de-duplicated, thus overlapping patterns are fine.
391             If this is None it defaults to '*' meaning "all API files".
392         :returns: A list of file paths for the API files found.
393         """
394         if api_dir is None:
395             api_dir = cls.find_api_dir()
396             if api_dir is None:
397                 raise VPPApiError("api_dir cannot be located")
398
399         if isinstance(patterns, list) or isinstance(patterns, tuple):
400             patterns = [p.strip() + '.api.json' for p in patterns]
401         else:
402             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
403
404         api_files = []
405         for root, dirnames, files in os.walk(api_dir):
406             # iterate all given patterns and de-dup the result
407             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
408             for filename in files:
409                 api_files.append(os.path.join(root, filename))
410
411         return api_files
412
413     @property
414     def api(self):
415         if not hasattr(self, "_api"):
416             raise VPPApiError("Not connected, api definitions not available")
417         return self._api
418
419     def make_function(self, msg, i, multipart, do_async):
420         if (do_async):
421             def f(**kwargs):
422                 return self._call_vpp_async(i, msg, **kwargs)
423         else:
424             def f(**kwargs):
425                 return self._call_vpp(i, msg, multipart, **kwargs)
426
427         f.__name__ = str(msg.name)
428         f.__doc__ = ", ".join(["%s %s" %
429                                (msg.fieldtypes[j], k)
430                                for j, k in enumerate(msg.fields)])
431         f.msg = msg
432
433         return f
434
435     def _register_functions(self, do_async=False):
436         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
437         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
438         self._api = VppApiDynamicMethodHolder()
439         for name, msg in vpp_iterator(self.messages):
440             n = name + '_' + msg.crc[2:]
441             i = self.transport.get_msg_index(n.encode('utf-8'))
442             if i > 0:
443                 self.id_msgdef[i] = msg
444                 self.id_names[i] = name
445
446                 # Create function for client side messages.
447                 if name in self.services:
448                     if 'stream' in self.services[name] and \
449                        self.services[name]['stream']:
450                         multipart = True
451                     else:
452                         multipart = False
453                     f = self.make_function(msg, i, multipart, do_async)
454                     setattr(self._api, name, FuncWrapper(f))
455             else:
456                 self.logger.debug(
457                     'No such message type or failed CRC checksum: %s', n)
458
459     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
460                          do_async):
461         pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
462
463         rv = self.transport.connect(name.encode('utf-8'), pfx,
464                                     msg_handler, rx_qlen)
465         if rv != 0:
466             raise VPPIOError(2, 'Connect failed')
467         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
468         self._register_functions(do_async=do_async)
469
470         # Initialise control ping
471         crc = self.messages['control_ping'].crc
472         self.control_ping_index = self.transport.get_msg_index(
473             ('control_ping' + '_' + crc[2:]).encode('utf-8'))
474         self.control_ping_msgdef = self.messages['control_ping']
475         if self.async_thread:
476             self.event_thread = threading.Thread(
477                 target=self.thread_msg_handler)
478             self.event_thread.daemon = True
479             self.event_thread.start()
480         else:
481             self.event_thread = None
482         return rv
483
484     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
485         """Attach to VPP.
486
487         name - the name of the client.
488         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
489         do_async - if true, messages are sent without waiting for a reply
490         rx_qlen - the length of the VPP message receive queue between
491         client and server.
492         """
493         msg_handler = self.transport.get_callback(do_async)
494         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
495                                      do_async)
496
497     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
498         """Attach to VPP in synchronous mode. Application must poll for events.
499
500         name - the name of the client.
501         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
502         rx_qlen - the length of the VPP message receive queue between
503         client and server.
504         """
505
506         return self.connect_internal(name, None, chroot_prefix, rx_qlen,
507                                      do_async=False)
508
509     def disconnect(self):
510         """Detach from VPP."""
511         rv = self.transport.disconnect()
512         if self.event_thread is not None:
513             self.message_queue.put("terminate event thread")
514         return rv
515
516     def msg_handler_sync(self, msg):
517         """Process an incoming message from VPP in sync mode.
518
519         The message may be a reply or it may be an async notification.
520         """
521         r = self.decode_incoming_msg(msg)
522         if r is None:
523             return
524
525         # If we have a context, then use the context to find any
526         # request waiting for a reply
527         context = 0
528         if hasattr(r, 'context') and r.context > 0:
529             context = r.context
530
531         if context == 0:
532             # No context -> async notification that we feed to the callback
533             self.message_queue.put_nowait(r)
534         else:
535             raise VPPIOError(2, 'RPC reply message received in event handler')
536
537     def has_context(self, msg):
538         if len(msg) < 10:
539             return False
540
541         header = VPPType('header_with_context', [['u16', 'msgid'],
542                                                  ['u32', 'client_index'],
543                                                  ['u32', 'context']])
544
545         (i, ci, context), size = header.unpack(msg, 0)
546         if self.id_names[i] == 'rx_thread_exit':
547             return
548
549         #
550         # Decode message and returns a tuple.
551         #
552         msgobj = self.id_msgdef[i]
553         if 'context' in msgobj.field_by_name and context >= 0:
554             return True
555         return False
556
557     def decode_incoming_msg(self, msg, no_type_conversion=False):
558         if not msg:
559             self.logger.warning('vpp_api.read failed')
560             return
561
562         (i, ci), size = self.header.unpack(msg, 0)
563         if self.id_names[i] == 'rx_thread_exit':
564             return
565
566         #
567         # Decode message and returns a tuple.
568         #
569         msgobj = self.id_msgdef[i]
570         if not msgobj:
571             raise VPPIOError(2, 'Reply message undefined')
572
573         r, size = msgobj.unpack(msg, ntc=no_type_conversion)
574         return r
575
576     def msg_handler_async(self, msg):
577         """Process a message from VPP in async mode.
578
579         In async mode, all messages are returned to the callback.
580         """
581         r = self.decode_incoming_msg(msg)
582         if r is None:
583             return
584
585         msgname = type(r).__name__
586
587         if self.event_callback:
588             self.event_callback(msgname, r)
589
590     def _control_ping(self, context):
591         """Send a ping command."""
592         self._call_vpp_async(self.control_ping_index,
593                              self.control_ping_msgdef,
594                              context=context)
595
596     def validate_args(self, msg, kwargs):
597         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
598         if d:
599             raise VPPValueError('Invalid argument {} to {}'
600                                 .format(list(d), msg.name))
601
602     def _call_vpp(self, i, msgdef, multipart, **kwargs):
603         """Given a message, send the message and await a reply.
604
605         msgdef - the message packing definition
606         i - the message type index
607         multipart - True if the message returns multiple
608         messages in return.
609         context - context number - chosen at random if not
610         supplied.
611         The remainder of the kwargs are the arguments to the API call.
612
613         The return value is the message or message array containing
614         the response.  It will raise an IOError exception if there was
615         no response within the timeout window.
616         """
617
618         if 'context' not in kwargs:
619             context = self.get_context()
620             kwargs['context'] = context
621         else:
622             context = kwargs['context']
623         kwargs['_vl_msg_id'] = i
624
625         no_type_conversion = kwargs.pop('_no_type_conversion', False)
626
627         try:
628             if self.transport.socket_index:
629                 kwargs['client_index'] = self.transport.socket_index
630         except AttributeError:
631             pass
632         self.validate_args(msgdef, kwargs)
633
634         logging.debug(call_logger(msgdef, kwargs))
635
636         b = msgdef.pack(kwargs)
637         self.transport.suspend()
638
639         self.transport.write(b)
640
641         if multipart:
642             # Send a ping after the request - we use its response
643             # to detect that we have seen all results.
644             self._control_ping(context)
645
646         # Block until we get a reply.
647         rl = []
648         while (True):
649             msg = self.transport.read()
650             if not msg:
651                 raise VPPIOError(2, 'VPP API client: read failed')
652             r = self.decode_incoming_msg(msg, no_type_conversion)
653             msgname = type(r).__name__
654             if context not in r or r.context == 0 or context != r.context:
655                 # Message being queued
656                 self.message_queue.put_nowait(r)
657                 continue
658
659             if not multipart:
660                 rl = r
661                 break
662             if msgname == 'control_ping_reply':
663                 break
664
665             rl.append(r)
666
667         self.transport.resume()
668
669         logger.debug(return_logger(rl))
670         return rl
671
672     def _call_vpp_async(self, i, msg, **kwargs):
673         """Given a message, send the message and await a reply.
674
675         msgdef - the message packing definition
676         i - the message type index
677         context - context number - chosen at random if not
678         supplied.
679         The remainder of the kwargs are the arguments to the API call.
680         """
681         if 'context' not in kwargs:
682             context = self.get_context()
683             kwargs['context'] = context
684         else:
685             context = kwargs['context']
686         try:
687             if self.transport.socket_index:
688                 kwargs['client_index'] = self.transport.socket_index
689         except AttributeError:
690             kwargs['client_index'] = 0
691         kwargs['_vl_msg_id'] = i
692         b = msg.pack(kwargs)
693
694         self.transport.write(b)
695
696     def register_event_callback(self, callback):
697         """Register a callback for async messages.
698
699         This will be called for async notifications in sync mode,
700         and all messages in async mode.  In sync mode, replies to
701         requests will not come here.
702
703         callback is a fn(msg_type_name, msg_type) that will be
704         called when a message comes in.  While this function is
705         executing, note that (a) you are in a background thread and
706         may wish to use threading.Lock to protect your datastructures,
707         and (b) message processing from VPP will stop (so if you take
708         a long while about it you may provoke reply timeouts or cause
709         VPP to fill the RX buffer).  Passing None will disable the
710         callback.
711         """
712         self.event_callback = callback
713
714     def thread_msg_handler(self):
715         """Python thread calling the user registered message handler.
716
717         This is to emulate the old style event callback scheme. Modern
718         clients should provide their own thread to poll the event
719         queue.
720         """
721         while True:
722             r = self.message_queue.get()
723             if r == "terminate event thread":
724                 break
725             msgname = type(r).__name__
726             if self.event_callback:
727                 self.event_callback(msgname, r)
728
729     def __repr__(self):
730         return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
731                "logger=%s, read_timeout=%s, use_socket=%s, " \
732                "server_address='%s'>" % (
733                    self._apifiles, self.testmode, self.async_thread,
734                    self.logger, self.read_timeout, self.use_socket,
735                    self.server_address)
736
737
738 # Provide the old name for backward compatibility.
739 VPP = VPPApiClient
740
741 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4