misc: vpp-api - add __repr__ to VPPApiClient.
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import ctypes
20 import sys
21 import multiprocessing as mp
22 import os
23 import logging
24 import collections
25 import struct
26 import functools
27 import json
28 import threading
29 import fnmatch
30 import weakref
31 import atexit
32 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
33 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
34 from . macaddress import MACAddress, mac_pton, mac_ntop
35
36 logger = logging.getLogger(__name__)
37
38 if sys.version[0] == '2':
39     import Queue as queue
40 else:
41     import queue as queue
42
43
44 def metaclass(metaclass):
45     @functools.wraps(metaclass)
46     def wrapper(cls):
47         return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
48
49     return wrapper
50
51
52 class VppEnumType(type):
53     def __getattr__(cls, name):
54         t = vpp_get_type(name)
55         return t.enum
56
57
58 @metaclass(VppEnumType)
59 class VppEnum(object):
60     pass
61
62
63 def vpp_atexit(vpp_weakref):
64     """Clean up VPP connection on shutdown."""
65     vpp_instance = vpp_weakref()
66     if vpp_instance and vpp_instance.transport.connected:
67         vpp_instance.logger.debug('Cleaning up VPP on exit')
68         vpp_instance.disconnect()
69
70
71 if sys.version[0] == '2':
72     def vpp_iterator(d):
73         return d.iteritems()
74 else:
75     def vpp_iterator(d):
76         return d.items()
77
78
79 def call_logger(msgdef, kwargs):
80     s = 'Calling {}('.format(msgdef.name)
81     for k, v in kwargs.items():
82         s += '{}:{} '.format(k, v)
83     s += ')'
84     return s
85
86
87 def return_logger(r):
88     s = 'Return from {}'.format(r)
89     return s
90
91
92 class VppApiDynamicMethodHolder(object):
93     pass
94
95
96 class FuncWrapper(object):
97     def __init__(self, func):
98         self._func = func
99         self.__name__ = func.__name__
100         self.__doc__ = func.__doc__
101
102     def __call__(self, **kwargs):
103         return self._func(**kwargs)
104
105
106 class VPPApiError(Exception):
107     pass
108
109
110 class VPPNotImplementedError(NotImplementedError):
111     pass
112
113
114 class VPPIOError(IOError):
115     pass
116
117
118 class VPPRuntimeError(RuntimeError):
119     pass
120
121
122 class VPPValueError(ValueError):
123     pass
124
125
126 class VPPApiClient(object):
127     """VPP interface.
128
129     This class provides the APIs to VPP.  The APIs are loaded
130     from provided .api.json files and makes functions accordingly.
131     These functions are documented in the VPP .api files, as they
132     are dynamically created.
133
134     Additionally, VPP can send callback messages; this class
135     provides a means to register a callback function to receive
136     these messages in a background thread.
137     """
138     apidir = None
139     VPPApiError = VPPApiError
140     VPPRuntimeError = VPPRuntimeError
141     VPPValueError = VPPValueError
142     VPPNotImplementedError = VPPNotImplementedError
143     VPPIOError = VPPIOError
144
145     def process_json_file(self, apidef_file):
146         api = json.load(apidef_file)
147         types = {}
148         for t in api['enums']:
149             t[0] = 'vl_api_' + t[0] + '_t'
150             types[t[0]] = {'type': 'enum', 'data': t}
151         for t in api['unions']:
152             t[0] = 'vl_api_' + t[0] + '_t'
153             types[t[0]] = {'type': 'union', 'data': t}
154         for t in api['types']:
155             t[0] = 'vl_api_' + t[0] + '_t'
156             types[t[0]] = {'type': 'type', 'data': t}
157         for t, v in api['aliases'].items():
158             types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
159         self.services.update(api['services'])
160
161         i = 0
162         while True:
163             unresolved = {}
164             for k, v in types.items():
165                 t = v['data']
166                 if not vpp_get_type(k):
167                     if v['type'] == 'enum':
168                         try:
169                             VPPEnumType(t[0], t[1:])
170                         except ValueError:
171                             unresolved[k] = v
172                     elif v['type'] == 'union':
173                         try:
174                             VPPUnionType(t[0], t[1:])
175                         except ValueError:
176                             unresolved[k] = v
177                     elif v['type'] == 'type':
178                         try:
179                             VPPType(t[0], t[1:])
180                         except ValueError:
181                             unresolved[k] = v
182                     elif v['type'] == 'alias':
183                         try:
184                             VPPTypeAlias(k, t)
185                         except ValueError:
186                             unresolved[k] = v
187             if len(unresolved) == 0:
188                 break
189             if i > 3:
190                 raise VPPValueError('Unresolved type definitions {}'
191                                     .format(unresolved))
192             types = unresolved
193             i += 1
194
195         for m in api['messages']:
196             try:
197                 self.messages[m[0]] = VPPMessage(m[0], m[1:])
198             except VPPNotImplementedError:
199                 self.logger.error('Not implemented error for {}'.format(m[0]))
200
201     def __init__(self, apifiles=None, testmode=False, async_thread=True,
202                  logger=None, loglevel=None,
203                  read_timeout=5, use_socket=False,
204                  server_address='/run/vpp-api.sock'):
205         """Create a VPP API object.
206
207         apifiles is a list of files containing API
208         descriptions that will be loaded - methods will be
209         dynamically created reflecting these APIs.  If not
210         provided this will load the API files from VPP's
211         default install location.
212
213         logger, if supplied, is the logging logger object to log to.
214         loglevel, if supplied, is the log level this logger is set
215         to report at (from the loglevels in the logging module).
216         """
217         if logger is None:
218             logger = logging.getLogger(__name__)
219             if loglevel is not None:
220                 logger.setLevel(loglevel)
221         self.logger = logger
222
223         self.messages = {}
224         self.services = {}
225         self.id_names = []
226         self.id_msgdef = []
227         self.header = VPPType('header', [['u16', 'msgid'],
228                                          ['u32', 'client_index']])
229         self.apifiles = []
230         self.event_callback = None
231         self.message_queue = queue.Queue()
232         self.read_timeout = read_timeout
233         self.async_thread = async_thread
234         self.event_thread = None
235         self.testmode = testmode
236         self.use_socket = use_socket
237         self.server_address = server_address
238         self._apifiles = apifiles
239
240         if use_socket:
241             from . vpp_transport_socket import VppTransport
242         else:
243             from . vpp_transport_shmem import VppTransport
244
245         if not apifiles:
246             # Pick up API definitions from default directory
247             try:
248                 apifiles = self.find_api_files()
249             except RuntimeError:
250                 # In test mode we don't care that we can't find the API files
251                 if testmode:
252                     apifiles = []
253                 else:
254                     raise VPPRuntimeError
255
256         for file in apifiles:
257             with open(file) as apidef_file:
258                 self.process_json_file(apidef_file)
259
260         self.apifiles = apifiles
261
262         # Basic sanity check
263         if len(self.messages) == 0 and not testmode:
264             raise VPPValueError(1, 'Missing JSON message definitions')
265
266         self.transport = VppTransport(self, read_timeout=read_timeout,
267                                       server_address=server_address)
268         # Make sure we allow VPP to clean up the message rings.
269         atexit.register(vpp_atexit, weakref.ref(self))
270
271     class ContextId(object):
272         """Multiprocessing-safe provider of unique context IDs."""
273         def __init__(self):
274             self.context = mp.Value(ctypes.c_uint, 0)
275             self.lock = mp.Lock()
276
277         def __call__(self):
278             """Get a new unique (or, at least, not recently used) context."""
279             with self.lock:
280                 self.context.value += 1
281                 return self.context.value
282     get_context = ContextId()
283
284     def get_type(self, name):
285         return vpp_get_type(name)
286
287     @classmethod
288     def find_api_dir(cls):
289         """Attempt to find the best directory in which API definition
290         files may reside. If the value VPP_API_DIR exists in the environment
291         then it is first on the search list. If we're inside a recognized
292         location in a VPP source tree (src/scripts and src/vpp-api/python)
293         then entries from there to the likely locations in build-root are
294         added. Finally the location used by system packages is added.
295
296         :returns: A single directory name, or None if no such directory
297             could be found.
298         """
299         dirs = [cls.apidir] if cls.apidir else []
300
301         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
302         # in which case, plot a course to likely places in the src tree
303         import __main__ as main
304         if hasattr(main, '__file__'):
305             # get the path of the calling script
306             localdir = os.path.dirname(os.path.realpath(main.__file__))
307         else:
308             # use cwd if there is no calling script
309             localdir = os.getcwd()
310         localdir_s = localdir.split(os.path.sep)
311
312         def dmatch(dir):
313             """Match dir against right-hand components of the script dir"""
314             d = dir.split('/')  # param 'dir' assumes a / separator
315             length = len(d)
316             return len(localdir_s) > length and localdir_s[-length:] == d
317
318         def sdir(srcdir, variant):
319             """Build a path from srcdir to the staged API files of
320             'variant'  (typically '' or '_debug')"""
321             # Since 'core' and 'plugin' files are staged
322             # in separate directories, we target the parent dir.
323             return os.path.sep.join((
324                 srcdir,
325                 'build-root',
326                 'install-vpp%s-native' % variant,
327                 'vpp',
328                 'share',
329                 'vpp',
330                 'api',
331             ))
332
333         srcdir = None
334         if dmatch('src/scripts'):
335             srcdir = os.path.sep.join(localdir_s[:-2])
336         elif dmatch('src/vpp-api/python'):
337             srcdir = os.path.sep.join(localdir_s[:-3])
338         elif dmatch('test'):
339             # we're apparently running tests
340             srcdir = os.path.sep.join(localdir_s[:-1])
341
342         if srcdir:
343             # we're in the source tree, try both the debug and release
344             # variants.
345             dirs.append(sdir(srcdir, '_debug'))
346             dirs.append(sdir(srcdir, ''))
347
348         # Test for staged copies of the scripts
349         # For these, since we explicitly know if we're running a debug versus
350         # release variant, target only the relevant directory
351         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
352             srcdir = os.path.sep.join(localdir_s[:-4])
353             dirs.append(sdir(srcdir, '_debug'))
354         if dmatch('build-root/install-vpp-native/vpp/bin'):
355             srcdir = os.path.sep.join(localdir_s[:-4])
356             dirs.append(sdir(srcdir, ''))
357
358         # finally, try the location system packages typically install into
359         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
360
361         # check the directories for existence; first one wins
362         for dir in dirs:
363             if os.path.isdir(dir):
364                 return dir
365
366         return None
367
368     @classmethod
369     def find_api_files(cls, api_dir=None, patterns='*'):
370         """Find API definition files from the given directory tree with the
371         given pattern. If no directory is given then find_api_dir() is used
372         to locate one. If no pattern is given then all definition files found
373         in the directory tree are used.
374
375         :param api_dir: A directory tree in which to locate API definition
376             files; subdirectories are descended into.
377             If this is None then find_api_dir() is called to discover it.
378         :param patterns: A list of patterns to use in each visited directory
379             when looking for files.
380             This can be a list/tuple object or a comma-separated string of
381             patterns. Each value in the list will have leading/trialing
382             whitespace stripped.
383             The pattern specifies the first part of the filename, '.api.json'
384             is appended.
385             The results are de-duplicated, thus overlapping patterns are fine.
386             If this is None it defaults to '*' meaning "all API files".
387         :returns: A list of file paths for the API files found.
388         """
389         if api_dir is None:
390             api_dir = cls.find_api_dir()
391             if api_dir is None:
392                 raise VPPApiError("api_dir cannot be located")
393
394         if isinstance(patterns, list) or isinstance(patterns, tuple):
395             patterns = [p.strip() + '.api.json' for p in patterns]
396         else:
397             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
398
399         api_files = []
400         for root, dirnames, files in os.walk(api_dir):
401             # iterate all given patterns and de-dup the result
402             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
403             for filename in files:
404                 api_files.append(os.path.join(root, filename))
405
406         return api_files
407
408     @property
409     def api(self):
410         if not hasattr(self, "_api"):
411             raise VPPApiError("Not connected, api definitions not available")
412         return self._api
413
414     def make_function(self, msg, i, multipart, do_async):
415         if (do_async):
416             def f(**kwargs):
417                 return self._call_vpp_async(i, msg, **kwargs)
418         else:
419             def f(**kwargs):
420                 return self._call_vpp(i, msg, multipart, **kwargs)
421
422         f.__name__ = str(msg.name)
423         f.__doc__ = ", ".join(["%s %s" %
424                                (msg.fieldtypes[j], k)
425                                for j, k in enumerate(msg.fields)])
426         f.msg = msg
427
428         return f
429
430     def _register_functions(self, do_async=False):
431         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
432         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
433         self._api = VppApiDynamicMethodHolder()
434         for name, msg in vpp_iterator(self.messages):
435             n = name + '_' + msg.crc[2:]
436             i = self.transport.get_msg_index(n.encode('utf-8'))
437             if i > 0:
438                 self.id_msgdef[i] = msg
439                 self.id_names[i] = name
440
441                 # Create function for client side messages.
442                 if name in self.services:
443                     if 'stream' in self.services[name] and \
444                        self.services[name]['stream']:
445                         multipart = True
446                     else:
447                         multipart = False
448                     f = self.make_function(msg, i, multipart, do_async)
449                     setattr(self._api, name, FuncWrapper(f))
450             else:
451                 self.logger.debug(
452                     'No such message type or failed CRC checksum: %s', n)
453
454     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
455                          do_async):
456         pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
457
458         rv = self.transport.connect(name.encode('utf-8'), pfx,
459                                     msg_handler, rx_qlen)
460         if rv != 0:
461             raise VPPIOError(2, 'Connect failed')
462         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
463         self._register_functions(do_async=do_async)
464
465         # Initialise control ping
466         crc = self.messages['control_ping'].crc
467         self.control_ping_index = self.transport.get_msg_index(
468             ('control_ping' + '_' + crc[2:]).encode('utf-8'))
469         self.control_ping_msgdef = self.messages['control_ping']
470         if self.async_thread:
471             self.event_thread = threading.Thread(
472                 target=self.thread_msg_handler)
473             self.event_thread.daemon = True
474             self.event_thread.start()
475         else:
476             self.event_thread = None
477         return rv
478
479     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
480         """Attach to VPP.
481
482         name - the name of the client.
483         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
484         do_async - if true, messages are sent without waiting for a reply
485         rx_qlen - the length of the VPP message receive queue between
486         client and server.
487         """
488         msg_handler = self.transport.get_callback(do_async)
489         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
490                                      do_async)
491
492     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
493         """Attach to VPP in synchronous mode. Application must poll for events.
494
495         name - the name of the client.
496         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
497         rx_qlen - the length of the VPP message receive queue between
498         client and server.
499         """
500
501         return self.connect_internal(name, None, chroot_prefix, rx_qlen,
502                                      do_async=False)
503
504     def disconnect(self):
505         """Detach from VPP."""
506         rv = self.transport.disconnect()
507         if self.event_thread is not None:
508             self.message_queue.put("terminate event thread")
509         return rv
510
511     def msg_handler_sync(self, msg):
512         """Process an incoming message from VPP in sync mode.
513
514         The message may be a reply or it may be an async notification.
515         """
516         r = self.decode_incoming_msg(msg)
517         if r is None:
518             return
519
520         # If we have a context, then use the context to find any
521         # request waiting for a reply
522         context = 0
523         if hasattr(r, 'context') and r.context > 0:
524             context = r.context
525
526         if context == 0:
527             # No context -> async notification that we feed to the callback
528             self.message_queue.put_nowait(r)
529         else:
530             raise VPPIOError(2, 'RPC reply message received in event handler')
531
532     def has_context(self, msg):
533         if len(msg) < 10:
534             return False
535
536         header = VPPType('header_with_context', [['u16', 'msgid'],
537                                                  ['u32', 'client_index'],
538                                                  ['u32', 'context']])
539
540         (i, ci, context), size = header.unpack(msg, 0)
541         if self.id_names[i] == 'rx_thread_exit':
542             return
543
544         #
545         # Decode message and returns a tuple.
546         #
547         msgobj = self.id_msgdef[i]
548         if 'context' in msgobj.field_by_name and context >= 0:
549             return True
550         return False
551
552     def decode_incoming_msg(self, msg, no_type_conversion=False):
553         if not msg:
554             self.logger.warning('vpp_api.read failed')
555             return
556
557         (i, ci), size = self.header.unpack(msg, 0)
558         if self.id_names[i] == 'rx_thread_exit':
559             return
560
561         #
562         # Decode message and returns a tuple.
563         #
564         msgobj = self.id_msgdef[i]
565         if not msgobj:
566             raise VPPIOError(2, 'Reply message undefined')
567
568         r, size = msgobj.unpack(msg, ntc=no_type_conversion)
569         return r
570
571     def msg_handler_async(self, msg):
572         """Process a message from VPP in async mode.
573
574         In async mode, all messages are returned to the callback.
575         """
576         r = self.decode_incoming_msg(msg)
577         if r is None:
578             return
579
580         msgname = type(r).__name__
581
582         if self.event_callback:
583             self.event_callback(msgname, r)
584
585     def _control_ping(self, context):
586         """Send a ping command."""
587         self._call_vpp_async(self.control_ping_index,
588                              self.control_ping_msgdef,
589                              context=context)
590
591     def validate_args(self, msg, kwargs):
592         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
593         if d:
594             raise VPPValueError('Invalid argument {} to {}'
595                                 .format(list(d), msg.name))
596
597     def _call_vpp(self, i, msgdef, multipart, **kwargs):
598         """Given a message, send the message and await a reply.
599
600         msgdef - the message packing definition
601         i - the message type index
602         multipart - True if the message returns multiple
603         messages in return.
604         context - context number - chosen at random if not
605         supplied.
606         The remainder of the kwargs are the arguments to the API call.
607
608         The return value is the message or message array containing
609         the response.  It will raise an IOError exception if there was
610         no response within the timeout window.
611         """
612
613         if 'context' not in kwargs:
614             context = self.get_context()
615             kwargs['context'] = context
616         else:
617             context = kwargs['context']
618         kwargs['_vl_msg_id'] = i
619
620         no_type_conversion = kwargs.pop('_no_type_conversion', False)
621
622         try:
623             if self.transport.socket_index:
624                 kwargs['client_index'] = self.transport.socket_index
625         except AttributeError:
626             pass
627         self.validate_args(msgdef, kwargs)
628
629         logging.debug(call_logger(msgdef, kwargs))
630
631         b = msgdef.pack(kwargs)
632         self.transport.suspend()
633
634         self.transport.write(b)
635
636         if multipart:
637             # Send a ping after the request - we use its response
638             # to detect that we have seen all results.
639             self._control_ping(context)
640
641         # Block until we get a reply.
642         rl = []
643         while (True):
644             msg = self.transport.read()
645             if not msg:
646                 raise VPPIOError(2, 'VPP API client: read failed')
647             r = self.decode_incoming_msg(msg, no_type_conversion)
648             msgname = type(r).__name__
649             if context not in r or r.context == 0 or context != r.context:
650                 # Message being queued
651                 self.message_queue.put_nowait(r)
652                 continue
653
654             if not multipart:
655                 rl = r
656                 break
657             if msgname == 'control_ping_reply':
658                 break
659
660             rl.append(r)
661
662         self.transport.resume()
663
664         logger.debug(return_logger(rl))
665         return rl
666
667     def _call_vpp_async(self, i, msg, **kwargs):
668         """Given a message, send the message and await a reply.
669
670         msgdef - the message packing definition
671         i - the message type index
672         context - context number - chosen at random if not
673         supplied.
674         The remainder of the kwargs are the arguments to the API call.
675         """
676         if 'context' not in kwargs:
677             context = self.get_context()
678             kwargs['context'] = context
679         else:
680             context = kwargs['context']
681         try:
682             if self.transport.socket_index:
683                 kwargs['client_index'] = self.transport.socket_index
684         except AttributeError:
685             kwargs['client_index'] = 0
686         kwargs['_vl_msg_id'] = i
687         b = msg.pack(kwargs)
688
689         self.transport.write(b)
690
691     def register_event_callback(self, callback):
692         """Register a callback for async messages.
693
694         This will be called for async notifications in sync mode,
695         and all messages in async mode.  In sync mode, replies to
696         requests will not come here.
697
698         callback is a fn(msg_type_name, msg_type) that will be
699         called when a message comes in.  While this function is
700         executing, note that (a) you are in a background thread and
701         may wish to use threading.Lock to protect your datastructures,
702         and (b) message processing from VPP will stop (so if you take
703         a long while about it you may provoke reply timeouts or cause
704         VPP to fill the RX buffer).  Passing None will disable the
705         callback.
706         """
707         self.event_callback = callback
708
709     def thread_msg_handler(self):
710         """Python thread calling the user registered message handler.
711
712         This is to emulate the old style event callback scheme. Modern
713         clients should provide their own thread to poll the event
714         queue.
715         """
716         while True:
717             r = self.message_queue.get()
718             if r == "terminate event thread":
719                 break
720             msgname = type(r).__name__
721             if self.event_callback:
722                 self.event_callback(msgname, r)
723
724     def __repr__(self):
725         return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
726                "logger=%s, read_timeout=%s, use_socket=%s, " \
727                "server_address='%s'>" % (
728                    self._apifiles, self.testmode, self.async_thread,
729                    self.logger, self.read_timeout, self.use_socket,
730                    self.server_address)
731
732
733 # Provide the old name for backward compatibility.
734 VPP = VPPApiClient
735
736 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4