api: api socket respect unix runtime directory
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import ctypes
20 import sys
21 import multiprocessing as mp
22 import os
23 import logging
24 import functools
25 import json
26 import threading
27 import fnmatch
28 import weakref
29 import atexit
30 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType
31 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
32
33 if sys.version[0] == '2':
34     import Queue as queue
35 else:
36     import queue as queue
37
38 __all__ = ('FuncWrapper', 'VPP', 'VppApiDynamicMethodHolder',
39            'VppEnum', 'VppEnumType',
40            'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
41            'VPPApiClient', )
42
43
44 def metaclass(metaclass):
45     @functools.wraps(metaclass)
46     def wrapper(cls):
47         return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
48
49     return wrapper
50
51
52 class VppEnumType(type):
53     def __getattr__(cls, name):
54         t = vpp_get_type(name)
55         return t.enum
56
57
58 @metaclass(VppEnumType)
59 class VppEnum(object):
60     pass
61
62
63 def vpp_atexit(vpp_weakref):
64     """Clean up VPP connection on shutdown."""
65     vpp_instance = vpp_weakref()
66     if vpp_instance and vpp_instance.transport.connected:
67         vpp_instance.logger.debug('Cleaning up VPP on exit')
68         vpp_instance.disconnect()
69
70
71 if sys.version[0] == '2':
72     def vpp_iterator(d):
73         return d.iteritems()
74 else:
75     def vpp_iterator(d):
76         return d.items()
77
78
79 class VppApiDynamicMethodHolder(object):
80     pass
81
82
83 class FuncWrapper(object):
84     def __init__(self, func):
85         self._func = func
86         self.__name__ = func.__name__
87         self.__doc__ = func.__doc__
88
89     def __call__(self, **kwargs):
90         return self._func(**kwargs)
91
92     def __repr__(self):
93         return '<FuncWrapper(func=<%s(%s)>)>' % (self.__name__, self.__doc__)
94
95
96 class VPPApiError(Exception):
97     pass
98
99
100 class VPPNotImplementedError(NotImplementedError):
101     pass
102
103
104 class VPPIOError(IOError):
105     pass
106
107
108 class VPPRuntimeError(RuntimeError):
109     pass
110
111
112 class VPPValueError(ValueError):
113     pass
114
115
116 class VPPApiClient(object):
117     """VPP interface.
118
119     This class provides the APIs to VPP.  The APIs are loaded
120     from provided .api.json files and makes functions accordingly.
121     These functions are documented in the VPP .api files, as they
122     are dynamically created.
123
124     Additionally, VPP can send callback messages; this class
125     provides a means to register a callback function to receive
126     these messages in a background thread.
127     """
128     apidir = None
129     VPPApiError = VPPApiError
130     VPPRuntimeError = VPPRuntimeError
131     VPPValueError = VPPValueError
132     VPPNotImplementedError = VPPNotImplementedError
133     VPPIOError = VPPIOError
134
135     def process_json_file(self, apidef_file):
136         api = json.load(apidef_file)
137         types = {}
138         for t in api['enums']:
139             t[0] = 'vl_api_' + t[0] + '_t'
140             types[t[0]] = {'type': 'enum', 'data': t}
141         for t in api['unions']:
142             t[0] = 'vl_api_' + t[0] + '_t'
143             types[t[0]] = {'type': 'union', 'data': t}
144         for t in api['types']:
145             t[0] = 'vl_api_' + t[0] + '_t'
146             types[t[0]] = {'type': 'type', 'data': t}
147         for t, v in api['aliases'].items():
148             types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
149         self.services.update(api['services'])
150
151         i = 0
152         while True:
153             unresolved = {}
154             for k, v in types.items():
155                 t = v['data']
156                 if not vpp_get_type(k):
157                     if v['type'] == 'enum':
158                         try:
159                             VPPEnumType(t[0], t[1:])
160                         except ValueError:
161                             unresolved[k] = v
162                     elif v['type'] == 'union':
163                         try:
164                             VPPUnionType(t[0], t[1:])
165                         except ValueError:
166                             unresolved[k] = v
167                     elif v['type'] == 'type':
168                         try:
169                             VPPType(t[0], t[1:])
170                         except ValueError:
171                             unresolved[k] = v
172                     elif v['type'] == 'alias':
173                         try:
174                             VPPTypeAlias(k, t)
175                         except ValueError:
176                             unresolved[k] = v
177             if len(unresolved) == 0:
178                 break
179             if i > 3:
180                 raise VPPValueError('Unresolved type definitions {}'
181                                     .format(unresolved))
182             types = unresolved
183             i += 1
184
185         for m in api['messages']:
186             try:
187                 self.messages[m[0]] = VPPMessage(m[0], m[1:])
188             except VPPNotImplementedError:
189                 self.logger.error('Not implemented error for {}'.format(m[0]))
190
191     def __init__(self, apifiles=None, testmode=False, async_thread=True,
192                  logger=None, loglevel=None,
193                  read_timeout=5, use_socket=False,
194                  server_address='/run/vpp/api.sock'):
195         """Create a VPP API object.
196
197         apifiles is a list of files containing API
198         descriptions that will be loaded - methods will be
199         dynamically created reflecting these APIs.  If not
200         provided this will load the API files from VPP's
201         default install location.
202
203         logger, if supplied, is the logging logger object to log to.
204         loglevel, if supplied, is the log level this logger is set
205         to report at (from the loglevels in the logging module).
206         """
207         if logger is None:
208             logger = logging.getLogger(
209                 "{}.{}".format(__name__, self.__class__.__name__))
210             if loglevel is not None:
211                 logger.setLevel(loglevel)
212         self.logger = logger
213
214         self.messages = {}
215         self.services = {}
216         self.id_names = []
217         self.id_msgdef = []
218         self.header = VPPType('header', [['u16', 'msgid'],
219                                          ['u32', 'client_index']])
220         self.apifiles = []
221         self.event_callback = None
222         self.message_queue = queue.Queue()
223         self.read_timeout = read_timeout
224         self.async_thread = async_thread
225         self.event_thread = None
226         self.testmode = testmode
227         self.use_socket = use_socket
228         self.server_address = server_address
229         self._apifiles = apifiles
230
231         if use_socket:
232             from . vpp_transport_socket import VppTransport
233         else:
234             from . vpp_transport_shmem import VppTransport
235
236         if not apifiles:
237             # Pick up API definitions from default directory
238             try:
239                 apifiles = self.find_api_files()
240             except RuntimeError:
241                 # In test mode we don't care that we can't find the API files
242                 if testmode:
243                     apifiles = []
244                 else:
245                     raise VPPRuntimeError
246
247         for file in apifiles:
248             with open(file) as apidef_file:
249                 self.process_json_file(apidef_file)
250
251         self.apifiles = apifiles
252
253         # Basic sanity check
254         if len(self.messages) == 0 and not testmode:
255             raise VPPValueError(1, 'Missing JSON message definitions')
256
257         self.transport = VppTransport(self, read_timeout=read_timeout,
258                                       server_address=server_address)
259         # Make sure we allow VPP to clean up the message rings.
260         atexit.register(vpp_atexit, weakref.ref(self))
261
262     class ContextId(object):
263         """Multiprocessing-safe provider of unique context IDs."""
264         def __init__(self):
265             self.context = mp.Value(ctypes.c_uint, 0)
266             self.lock = mp.Lock()
267
268         def __call__(self):
269             """Get a new unique (or, at least, not recently used) context."""
270             with self.lock:
271                 self.context.value += 1
272                 return self.context.value
273     get_context = ContextId()
274
275     def get_type(self, name):
276         return vpp_get_type(name)
277
278     @classmethod
279     def find_api_dir(cls):
280         """Attempt to find the best directory in which API definition
281         files may reside. If the value VPP_API_DIR exists in the environment
282         then it is first on the search list. If we're inside a recognized
283         location in a VPP source tree (src/scripts and src/vpp-api/python)
284         then entries from there to the likely locations in build-root are
285         added. Finally the location used by system packages is added.
286
287         :returns: A single directory name, or None if no such directory
288             could be found.
289         """
290         dirs = [cls.apidir] if cls.apidir else []
291
292         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
293         # in which case, plot a course to likely places in the src tree
294         import __main__ as main
295         if hasattr(main, '__file__'):
296             # get the path of the calling script
297             localdir = os.path.dirname(os.path.realpath(main.__file__))
298         else:
299             # use cwd if there is no calling script
300             localdir = os.getcwd()
301         localdir_s = localdir.split(os.path.sep)
302
303         def dmatch(dir):
304             """Match dir against right-hand components of the script dir"""
305             d = dir.split('/')  # param 'dir' assumes a / separator
306             length = len(d)
307             return len(localdir_s) > length and localdir_s[-length:] == d
308
309         def sdir(srcdir, variant):
310             """Build a path from srcdir to the staged API files of
311             'variant'  (typically '' or '_debug')"""
312             # Since 'core' and 'plugin' files are staged
313             # in separate directories, we target the parent dir.
314             return os.path.sep.join((
315                 srcdir,
316                 'build-root',
317                 'install-vpp%s-native' % variant,
318                 'vpp',
319                 'share',
320                 'vpp',
321                 'api',
322             ))
323
324         srcdir = None
325         if dmatch('src/scripts'):
326             srcdir = os.path.sep.join(localdir_s[:-2])
327         elif dmatch('src/vpp-api/python'):
328             srcdir = os.path.sep.join(localdir_s[:-3])
329         elif dmatch('test'):
330             # we're apparently running tests
331             srcdir = os.path.sep.join(localdir_s[:-1])
332
333         if srcdir:
334             # we're in the source tree, try both the debug and release
335             # variants.
336             dirs.append(sdir(srcdir, '_debug'))
337             dirs.append(sdir(srcdir, ''))
338
339         # Test for staged copies of the scripts
340         # For these, since we explicitly know if we're running a debug versus
341         # release variant, target only the relevant directory
342         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
343             srcdir = os.path.sep.join(localdir_s[:-4])
344             dirs.append(sdir(srcdir, '_debug'))
345         if dmatch('build-root/install-vpp-native/vpp/bin'):
346             srcdir = os.path.sep.join(localdir_s[:-4])
347             dirs.append(sdir(srcdir, ''))
348
349         # finally, try the location system packages typically install into
350         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
351
352         # check the directories for existence; first one wins
353         for dir in dirs:
354             if os.path.isdir(dir):
355                 return dir
356
357         return None
358
359     @classmethod
360     def find_api_files(cls, api_dir=None, patterns='*'):
361         """Find API definition files from the given directory tree with the
362         given pattern. If no directory is given then find_api_dir() is used
363         to locate one. If no pattern is given then all definition files found
364         in the directory tree are used.
365
366         :param api_dir: A directory tree in which to locate API definition
367             files; subdirectories are descended into.
368             If this is None then find_api_dir() is called to discover it.
369         :param patterns: A list of patterns to use in each visited directory
370             when looking for files.
371             This can be a list/tuple object or a comma-separated string of
372             patterns. Each value in the list will have leading/trialing
373             whitespace stripped.
374             The pattern specifies the first part of the filename, '.api.json'
375             is appended.
376             The results are de-duplicated, thus overlapping patterns are fine.
377             If this is None it defaults to '*' meaning "all API files".
378         :returns: A list of file paths for the API files found.
379         """
380         if api_dir is None:
381             api_dir = cls.find_api_dir()
382             if api_dir is None:
383                 raise VPPApiError("api_dir cannot be located")
384
385         if isinstance(patterns, list) or isinstance(patterns, tuple):
386             patterns = [p.strip() + '.api.json' for p in patterns]
387         else:
388             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
389
390         api_files = []
391         for root, dirnames, files in os.walk(api_dir):
392             # iterate all given patterns and de-dup the result
393             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
394             for filename in files:
395                 api_files.append(os.path.join(root, filename))
396
397         return api_files
398
399     @property
400     def api(self):
401         if not hasattr(self, "_api"):
402             raise VPPApiError("Not connected, api definitions not available")
403         return self._api
404
405     def make_function(self, msg, i, multipart, do_async):
406         if (do_async):
407             def f(**kwargs):
408                 return self._call_vpp_async(i, msg, **kwargs)
409         else:
410             def f(**kwargs):
411                 return self._call_vpp(i, msg, multipart, **kwargs)
412
413         f.__name__ = str(msg.name)
414         f.__doc__ = ", ".join(["%s %s" %
415                                (msg.fieldtypes[j], k)
416                                for j, k in enumerate(msg.fields)])
417         f.msg = msg
418
419         return f
420
421     def _register_functions(self, do_async=False):
422         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
423         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
424         self._api = VppApiDynamicMethodHolder()
425         for name, msg in vpp_iterator(self.messages):
426             n = name + '_' + msg.crc[2:]
427             i = self.transport.get_msg_index(n.encode('utf-8'))
428             if i > 0:
429                 self.id_msgdef[i] = msg
430                 self.id_names[i] = name
431
432                 # Create function for client side messages.
433                 if name in self.services:
434                     if 'stream' in self.services[name] and \
435                        self.services[name]['stream']:
436                         multipart = True
437                     else:
438                         multipart = False
439                     f = self.make_function(msg, i, multipart, do_async)
440                     setattr(self._api, name, FuncWrapper(f))
441             else:
442                 self.logger.debug(
443                     'No such message type or failed CRC checksum: %s', n)
444
445     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
446                          do_async):
447         pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
448
449         rv = self.transport.connect(name.encode('utf-8'), pfx,
450                                     msg_handler, rx_qlen)
451         if rv != 0:
452             raise VPPIOError(2, 'Connect failed')
453         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
454         self._register_functions(do_async=do_async)
455
456         # Initialise control ping
457         crc = self.messages['control_ping'].crc
458         self.control_ping_index = self.transport.get_msg_index(
459             ('control_ping' + '_' + crc[2:]).encode('utf-8'))
460         self.control_ping_msgdef = self.messages['control_ping']
461         if self.async_thread:
462             self.event_thread = threading.Thread(
463                 target=self.thread_msg_handler)
464             self.event_thread.daemon = True
465             self.event_thread.start()
466         else:
467             self.event_thread = None
468         return rv
469
470     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
471         """Attach to VPP.
472
473         name - the name of the client.
474         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
475         do_async - if true, messages are sent without waiting for a reply
476         rx_qlen - the length of the VPP message receive queue between
477         client and server.
478         """
479         msg_handler = self.transport.get_callback(do_async)
480         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
481                                      do_async)
482
483     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
484         """Attach to VPP in synchronous mode. Application must poll for events.
485
486         name - the name of the client.
487         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
488         rx_qlen - the length of the VPP message receive queue between
489         client and server.
490         """
491
492         return self.connect_internal(name, None, chroot_prefix, rx_qlen,
493                                      do_async=False)
494
495     def disconnect(self):
496         """Detach from VPP."""
497         rv = self.transport.disconnect()
498         if self.event_thread is not None:
499             self.message_queue.put("terminate event thread")
500         return rv
501
502     def msg_handler_sync(self, msg):
503         """Process an incoming message from VPP in sync mode.
504
505         The message may be a reply or it may be an async notification.
506         """
507         r = self.decode_incoming_msg(msg)
508         if r is None:
509             return
510
511         # If we have a context, then use the context to find any
512         # request waiting for a reply
513         context = 0
514         if hasattr(r, 'context') and r.context > 0:
515             context = r.context
516
517         if context == 0:
518             # No context -> async notification that we feed to the callback
519             self.message_queue.put_nowait(r)
520         else:
521             raise VPPIOError(2, 'RPC reply message received in event handler')
522
523     def has_context(self, msg):
524         if len(msg) < 10:
525             return False
526
527         header = VPPType('header_with_context', [['u16', 'msgid'],
528                                                  ['u32', 'client_index'],
529                                                  ['u32', 'context']])
530
531         (i, ci, context), size = header.unpack(msg, 0)
532         if self.id_names[i] == 'rx_thread_exit':
533             return
534
535         #
536         # Decode message and returns a tuple.
537         #
538         msgobj = self.id_msgdef[i]
539         if 'context' in msgobj.field_by_name and context >= 0:
540             return True
541         return False
542
543     def decode_incoming_msg(self, msg, no_type_conversion=False):
544         if not msg:
545             self.logger.warning('vpp_api.read failed')
546             return
547
548         (i, ci), size = self.header.unpack(msg, 0)
549         if self.id_names[i] == 'rx_thread_exit':
550             return
551
552         #
553         # Decode message and returns a tuple.
554         #
555         msgobj = self.id_msgdef[i]
556         if not msgobj:
557             raise VPPIOError(2, 'Reply message undefined')
558
559         r, size = msgobj.unpack(msg, ntc=no_type_conversion)
560         return r
561
562     def msg_handler_async(self, msg):
563         """Process a message from VPP in async mode.
564
565         In async mode, all messages are returned to the callback.
566         """
567         r = self.decode_incoming_msg(msg)
568         if r is None:
569             return
570
571         msgname = type(r).__name__
572
573         if self.event_callback:
574             self.event_callback(msgname, r)
575
576     def _control_ping(self, context):
577         """Send a ping command."""
578         self._call_vpp_async(self.control_ping_index,
579                              self.control_ping_msgdef,
580                              context=context)
581
582     def validate_args(self, msg, kwargs):
583         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
584         if d:
585             raise VPPValueError('Invalid argument {} to {}'
586                                 .format(list(d), msg.name))
587
588     def _call_vpp(self, i, msgdef, multipart, **kwargs):
589         """Given a message, send the message and await a reply.
590
591         msgdef - the message packing definition
592         i - the message type index
593         multipart - True if the message returns multiple
594         messages in return.
595         context - context number - chosen at random if not
596         supplied.
597         The remainder of the kwargs are the arguments to the API call.
598
599         The return value is the message or message array containing
600         the response.  It will raise an IOError exception if there was
601         no response within the timeout window.
602         """
603
604         if 'context' not in kwargs:
605             context = self.get_context()
606             kwargs['context'] = context
607         else:
608             context = kwargs['context']
609         kwargs['_vl_msg_id'] = i
610
611         no_type_conversion = kwargs.pop('_no_type_conversion', False)
612
613         try:
614             if self.transport.socket_index:
615                 kwargs['client_index'] = self.transport.socket_index
616         except AttributeError:
617             pass
618         self.validate_args(msgdef, kwargs)
619
620         s = 'Calling {}({})'.format(msgdef.name,
621             ','.join(['{!r}:{!r}'.format(k, v) for k, v in kwargs.items()]))
622         self.logger.debug(s)
623
624         b = msgdef.pack(kwargs)
625         self.transport.suspend()
626
627         self.transport.write(b)
628
629         if multipart:
630             # Send a ping after the request - we use its response
631             # to detect that we have seen all results.
632             self._control_ping(context)
633
634         # Block until we get a reply.
635         rl = []
636         while (True):
637             msg = self.transport.read()
638             if not msg:
639                 raise VPPIOError(2, 'VPP API client: read failed')
640             r = self.decode_incoming_msg(msg, no_type_conversion)
641             msgname = type(r).__name__
642             if context not in r or r.context == 0 or context != r.context:
643                 # Message being queued
644                 self.message_queue.put_nowait(r)
645                 continue
646
647             if not multipart:
648                 rl = r
649                 break
650             if msgname == 'control_ping_reply':
651                 break
652
653             rl.append(r)
654
655         self.transport.resume()
656
657         self.logger.debug('Return from {!r}'.format(r))
658         return rl
659
660     def _call_vpp_async(self, i, msg, **kwargs):
661         """Given a message, send the message and await a reply.
662
663         msgdef - the message packing definition
664         i - the message type index
665         context - context number - chosen at random if not
666         supplied.
667         The remainder of the kwargs are the arguments to the API call.
668         """
669         if 'context' not in kwargs:
670             context = self.get_context()
671             kwargs['context'] = context
672         else:
673             context = kwargs['context']
674         try:
675             if self.transport.socket_index:
676                 kwargs['client_index'] = self.transport.socket_index
677         except AttributeError:
678             kwargs['client_index'] = 0
679         kwargs['_vl_msg_id'] = i
680         b = msg.pack(kwargs)
681
682         self.transport.write(b)
683
684     def register_event_callback(self, callback):
685         """Register a callback for async messages.
686
687         This will be called for async notifications in sync mode,
688         and all messages in async mode.  In sync mode, replies to
689         requests will not come here.
690
691         callback is a fn(msg_type_name, msg_type) that will be
692         called when a message comes in.  While this function is
693         executing, note that (a) you are in a background thread and
694         may wish to use threading.Lock to protect your datastructures,
695         and (b) message processing from VPP will stop (so if you take
696         a long while about it you may provoke reply timeouts or cause
697         VPP to fill the RX buffer).  Passing None will disable the
698         callback.
699         """
700         self.event_callback = callback
701
702     def thread_msg_handler(self):
703         """Python thread calling the user registered message handler.
704
705         This is to emulate the old style event callback scheme. Modern
706         clients should provide their own thread to poll the event
707         queue.
708         """
709         while True:
710             r = self.message_queue.get()
711             if r == "terminate event thread":
712                 break
713             msgname = type(r).__name__
714             if self.event_callback:
715                 self.event_callback(msgname, r)
716
717     def __repr__(self):
718         return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
719                "logger=%s, read_timeout=%s, use_socket=%s, " \
720                "server_address='%s'>" % (
721                    self._apifiles, self.testmode, self.async_thread,
722                    self.logger, self.read_timeout, self.use_socket,
723                    self.server_address)
724
725
726 # Provide the old name for backward compatibility.
727 VPP = VPPApiClient
728
729 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4