PAPI: Use UNIX domain sockets instead of shared memory
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import sys
20 import os
21 import logging
22 import collections
23 import struct
24 import json
25 import threading
26 import fnmatch
27 import weakref
28 import atexit
29 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
30 from . vpp_serializer import VPPMessage
31
32 if sys.version[0] == '2':
33     import Queue as queue
34 else:
35     import queue as queue
36
37
38 def vpp_atexit(vpp_weakref):
39     """Clean up VPP connection on shutdown."""
40     vpp_instance = vpp_weakref()
41     if vpp_instance and vpp_instance.transport.connected:
42         vpp_instance.logger.debug('Cleaning up VPP on exit')
43         vpp_instance.disconnect()
44
45
46 def vpp_iterator(d):
47     if sys.version[0] == '2':
48         return d.iteritems()
49     else:
50         return d.items()
51
52
53 class VppApiDynamicMethodHolder(object):
54     pass
55
56
57 class FuncWrapper(object):
58     def __init__(self, func):
59         self._func = func
60         self.__name__ = func.__name__
61
62     def __call__(self, **kwargs):
63         return self._func(**kwargs)
64
65
66 class VPP():
67     """VPP interface.
68
69     This class provides the APIs to VPP.  The APIs are loaded
70     from provided .api.json files and makes functions accordingly.
71     These functions are documented in the VPP .api files, as they
72     are dynamically created.
73
74     Additionally, VPP can send callback messages; this class
75     provides a means to register a callback function to receive
76     these messages in a background thread.
77     """
78
79     def process_json_file(self, apidef_file):
80         api = json.load(apidef_file)
81         types = {}
82         for t in api['enums']:
83             t[0] = 'vl_api_' + t[0] + '_t'
84             types[t[0]] = {'type': 'enum', 'data': t}
85         for t in api['unions']:
86             t[0] = 'vl_api_' + t[0] + '_t'
87             types[t[0]] = {'type': 'union', 'data': t}
88         for t in api['types']:
89             t[0] = 'vl_api_' + t[0] + '_t'
90             types[t[0]] = {'type': 'type', 'data': t}
91
92         i = 0
93         while True:
94             unresolved = {}
95             for k, v in types.items():
96                 t = v['data']
97                 if v['type'] == 'enum':
98                     try:
99                         VPPEnumType(t[0], t[1:])
100                     except ValueError:
101                         unresolved[k] = v
102                 elif v['type'] == 'union':
103                     try:
104                         VPPUnionType(t[0], t[1:])
105                     except ValueError:
106                         unresolved[k] = v
107                 elif v['type'] == 'type':
108                     try:
109                         VPPType(t[0], t[1:])
110                     except ValueError:
111                         unresolved[k] = v
112             if len(unresolved) == 0:
113                 break
114             if i > 3:
115                 raise ValueError('Unresolved type definitions {}'
116                                  .format(unresolved))
117             types = unresolved
118             i += 1
119
120         for m in api['messages']:
121             try:
122                 self.messages[m[0]] = VPPMessage(m[0], m[1:])
123             except NotImplementedError:
124                 self.logger.error('Not implemented error for {}'.format(m[0]))
125
126     def __init__(self, apifiles=None, testmode=False, async_thread=True,
127                  logger=logging.getLogger('vpp_papi'), loglevel='debug',
128                  read_timeout=5, use_socket=False,
129                  server_address='/run/vpp-api.sock'):
130         """Create a VPP API object.
131
132         apifiles is a list of files containing API
133         descriptions that will be loaded - methods will be
134         dynamically created reflecting these APIs.  If not
135         provided this will load the API files from VPP's
136         default install location.
137
138         logger, if supplied, is the logging logger object to log to.
139         loglevel, if supplied, is the log level this logger is set
140         to report at (from the loglevels in the logging module).
141         """
142         if logger is None:
143             logger = logging.getLogger(__name__)
144             if loglevel is not None:
145                 logger.setLevel(loglevel)
146         self.logger = logger
147
148         self.messages = {}
149         self.id_names = []
150         self.id_msgdef = []
151         self.header = VPPType('header', [['u16', 'msgid'],
152                                          ['u32', 'client_index']])
153         self.apifiles = []
154         self.event_callback = None
155         self.message_queue = queue.Queue()
156         self.read_timeout = read_timeout
157         self.async_thread = async_thread
158
159         if use_socket:
160             from . vpp_transport_socket import VppTransport
161         else:
162             from . vpp_transport_shmem import VppTransport
163
164         if not apifiles:
165             # Pick up API definitions from default directory
166             try:
167                 apifiles = self.find_api_files()
168             except RuntimeError:
169                 # In test mode we don't care that we can't find the API files
170                 if testmode:
171                     apifiles = []
172                 else:
173                     raise
174
175         for file in apifiles:
176             with open(file) as apidef_file:
177                 self.process_json_file(apidef_file)
178
179         self.apifiles = apifiles
180
181         # Basic sanity check
182         if len(self.messages) == 0 and not testmode:
183             raise ValueError(1, 'Missing JSON message definitions')
184
185         self.transport = VppTransport(self, read_timeout=read_timeout,
186                                       server_address=server_address)
187         # Make sure we allow VPP to clean up the message rings.
188         atexit.register(vpp_atexit, weakref.ref(self))
189
190     class ContextId(object):
191         """Thread-safe provider of unique context IDs."""
192         def __init__(self):
193             self.context = 0
194             self.lock = threading.Lock()
195
196         def __call__(self):
197             """Get a new unique (or, at least, not recently used) context."""
198             with self.lock:
199                 self.context += 1
200                 return self.context
201     get_context = ContextId()
202
203     @classmethod
204     def find_api_dir(cls):
205         """Attempt to find the best directory in which API definition
206         files may reside. If the value VPP_API_DIR exists in the environment
207         then it is first on the search list. If we're inside a recognized
208         location in a VPP source tree (src/scripts and src/vpp-api/python)
209         then entries from there to the likely locations in build-root are
210         added. Finally the location used by system packages is added.
211
212         :returns: A single directory name, or None if no such directory
213             could be found.
214         """
215         dirs = []
216
217         if 'VPP_API_DIR' in os.environ:
218             dirs.append(os.environ['VPP_API_DIR'])
219
220         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
221         # in which case, plot a course to likely places in the src tree
222         import __main__ as main
223         if hasattr(main, '__file__'):
224             # get the path of the calling script
225             localdir = os.path.dirname(os.path.realpath(main.__file__))
226         else:
227             # use cwd if there is no calling script
228             localdir = os.getcwd()
229         localdir_s = localdir.split(os.path.sep)
230
231         def dmatch(dir):
232             """Match dir against right-hand components of the script dir"""
233             d = dir.split('/')  # param 'dir' assumes a / separator
234             length = len(d)
235             return len(localdir_s) > length and localdir_s[-length:] == d
236
237         def sdir(srcdir, variant):
238             """Build a path from srcdir to the staged API files of
239             'variant'  (typically '' or '_debug')"""
240             # Since 'core' and 'plugin' files are staged
241             # in separate directories, we target the parent dir.
242             return os.path.sep.join((
243                 srcdir,
244                 'build-root',
245                 'install-vpp%s-native' % variant,
246                 'vpp',
247                 'share',
248                 'vpp',
249                 'api',
250             ))
251
252         srcdir = None
253         if dmatch('src/scripts'):
254             srcdir = os.path.sep.join(localdir_s[:-2])
255         elif dmatch('src/vpp-api/python'):
256             srcdir = os.path.sep.join(localdir_s[:-3])
257         elif dmatch('test'):
258             # we're apparently running tests
259             srcdir = os.path.sep.join(localdir_s[:-1])
260
261         if srcdir:
262             # we're in the source tree, try both the debug and release
263             # variants.
264             dirs.append(sdir(srcdir, '_debug'))
265             dirs.append(sdir(srcdir, ''))
266
267         # Test for staged copies of the scripts
268         # For these, since we explicitly know if we're running a debug versus
269         # release variant, target only the relevant directory
270         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
271             srcdir = os.path.sep.join(localdir_s[:-4])
272             dirs.append(sdir(srcdir, '_debug'))
273         if dmatch('build-root/install-vpp-native/vpp/bin'):
274             srcdir = os.path.sep.join(localdir_s[:-4])
275             dirs.append(sdir(srcdir, ''))
276
277         # finally, try the location system packages typically install into
278         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
279
280         # check the directories for existance; first one wins
281         for dir in dirs:
282             if os.path.isdir(dir):
283                 return dir
284
285         return None
286
287     @classmethod
288     def find_api_files(cls, api_dir=None, patterns='*'):
289         """Find API definition files from the given directory tree with the
290         given pattern. If no directory is given then find_api_dir() is used
291         to locate one. If no pattern is given then all definition files found
292         in the directory tree are used.
293
294         :param api_dir: A directory tree in which to locate API definition
295             files; subdirectories are descended into.
296             If this is None then find_api_dir() is called to discover it.
297         :param patterns: A list of patterns to use in each visited directory
298             when looking for files.
299             This can be a list/tuple object or a comma-separated string of
300             patterns. Each value in the list will have leading/trialing
301             whitespace stripped.
302             The pattern specifies the first part of the filename, '.api.json'
303             is appended.
304             The results are de-duplicated, thus overlapping patterns are fine.
305             If this is None it defaults to '*' meaning "all API files".
306         :returns: A list of file paths for the API files found.
307         """
308         if api_dir is None:
309             api_dir = cls.find_api_dir()
310             if api_dir is None:
311                 raise RuntimeError("api_dir cannot be located")
312
313         if isinstance(patterns, list) or isinstance(patterns, tuple):
314             patterns = [p.strip() + '.api.json' for p in patterns]
315         else:
316             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
317
318         api_files = []
319         for root, dirnames, files in os.walk(api_dir):
320             # iterate all given patterns and de-dup the result
321             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
322             for filename in files:
323                 api_files.append(os.path.join(root, filename))
324
325         return api_files
326
327     @property
328     def api(self):
329         if not hasattr(self, "_api"):
330             raise Exception("Not connected, api definitions not available")
331         return self._api
332
333     def make_function(self, msg, i, multipart, do_async):
334         if (do_async):
335             def f(**kwargs):
336                 return self._call_vpp_async(i, msg, **kwargs)
337         else:
338             def f(**kwargs):
339                 return self._call_vpp(i, msg, multipart, **kwargs)
340
341         f.__name__ = str(msg.name)
342         f.__doc__ = ", ".join(["%s %s" %
343                                (msg.fieldtypes[j], k)
344                                for j, k in enumerate(msg.fields)])
345         return f
346
347     def _register_functions(self, do_async=False):
348         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
349         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
350         self._api = VppApiDynamicMethodHolder()
351         for name, msg in vpp_iterator(self.messages):
352             n = name + '_' + msg.crc[2:]
353             i = self.transport.get_msg_index(n.encode())
354             if i > 0:
355                 self.id_msgdef[i] = msg
356                 self.id_names[i] = name
357                 # TODO: Fix multipart (use services)
358                 multipart = True if name.find('_dump') > 0 else False
359                 f = self.make_function(msg, i, multipart, do_async)
360                 setattr(self._api, name, FuncWrapper(f))
361             else:
362                 self.logger.debug(
363                     'No such message type or failed CRC checksum: %s', n)
364
365     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
366                          do_async):
367         pfx = chroot_prefix.encode() if chroot_prefix else None
368
369         rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
370         if rv != 0:
371             raise IOError(2, 'Connect failed')
372         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
373         self._register_functions(do_async=do_async)
374
375         # Initialise control ping
376         crc = self.messages['control_ping'].crc
377         self.control_ping_index = self.transport.get_msg_index(
378             ('control_ping' + '_' + crc[2:]).encode())
379         self.control_ping_msgdef = self.messages['control_ping']
380         if self.async_thread:
381             self.event_thread = threading.Thread(
382                 target=self.thread_msg_handler)
383             self.event_thread.daemon = True
384             self.event_thread.start()
385         return rv
386
387     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
388         """Attach to VPP.
389
390         name - the name of the client.
391         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
392         do_async - if true, messages are sent without waiting for a reply
393         rx_qlen - the length of the VPP message receive queue between
394         client and server.
395         """
396         msg_handler = self.transport.get_callback(do_async)
397         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
398                                      do_async)
399
400     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
401         """Attach to VPP in synchronous mode. Application must poll for events.
402
403         name - the name of the client.
404         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
405         rx_qlen - the length of the VPP message receive queue between
406         client and server.
407         """
408
409         return self.connect_internal(name, None, chroot_prefix, rx_qlen,
410                                      do_async=False)
411
412     def disconnect(self):
413         """Detach from VPP."""
414         rv = self.transport.disconnect()
415         self.message_queue.put("terminate event thread")
416         return rv
417
418     def msg_handler_sync(self, msg):
419         """Process an incoming message from VPP in sync mode.
420
421         The message may be a reply or it may be an async notification.
422         """
423         r = self.decode_incoming_msg(msg)
424         if r is None:
425             return
426
427         # If we have a context, then use the context to find any
428         # request waiting for a reply
429         context = 0
430         if hasattr(r, 'context') and r.context > 0:
431             context = r.context
432
433         if context == 0:
434             # No context -> async notification that we feed to the callback
435             self.message_queue.put_nowait(r)
436         else:
437             raise IOError(2, 'RPC reply message received in event handler')
438
439     def decode_incoming_msg(self, msg):
440         if not msg:
441             self.logger.warning('vpp_api.read failed')
442             return
443         (i, ci), size = self.header.unpack(msg, 0)
444         if self.id_names[i] == 'rx_thread_exit':
445             return
446
447         #
448         # Decode message and returns a tuple.
449         #
450         msgobj = self.id_msgdef[i]
451         if not msgobj:
452             raise IOError(2, 'Reply message undefined')
453
454         r, size = msgobj.unpack(msg)
455         return r
456
457     def msg_handler_async(self, msg):
458         """Process a message from VPP in async mode.
459
460         In async mode, all messages are returned to the callback.
461         """
462         r = self.decode_incoming_msg(msg)
463         if r is None:
464             return
465
466         msgname = type(r).__name__
467
468         if self.event_callback:
469             self.event_callback(msgname, r)
470
471     def _control_ping(self, context):
472         """Send a ping command."""
473         self._call_vpp_async(self.control_ping_index,
474                              self.control_ping_msgdef,
475                              context=context)
476
477     def validate_args(self, msg, kwargs):
478         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
479         if d:
480             raise ValueError('Invalid argument {} to {}'
481                              .format(list(d), msg.name))
482
483     def _call_vpp(self, i, msg, multipart, **kwargs):
484         """Given a message, send the message and await a reply.
485
486         msgdef - the message packing definition
487         i - the message type index
488         multipart - True if the message returns multiple
489         messages in return.
490         context - context number - chosen at random if not
491         supplied.
492         The remainder of the kwargs are the arguments to the API call.
493
494         The return value is the message or message array containing
495         the response.  It will raise an IOError exception if there was
496         no response within the timeout window.
497         """
498
499         if 'context' not in kwargs:
500             context = self.get_context()
501             kwargs['context'] = context
502         else:
503             context = kwargs['context']
504         kwargs['_vl_msg_id'] = i
505
506         try:
507             if self.transport.socket_index:
508                 kwargs['client_index'] = self.transport.socket_index
509         except AttributeError:
510             pass
511         self.validate_args(msg, kwargs)
512         b = msg.pack(kwargs)
513         self.transport.suspend()
514
515         self.transport.write(b)
516
517         if multipart:
518             # Send a ping after the request - we use its response
519             # to detect that we have seen all results.
520             self._control_ping(context)
521
522         # Block until we get a reply.
523         rl = []
524         while (True):
525             msg = self.transport.read()
526             if not msg:
527                 raise IOError(2, 'VPP API client: read failed')
528             r = self.decode_incoming_msg(msg)
529             msgname = type(r).__name__
530             if context not in r or r.context == 0 or context != r.context:
531                 # Message being queued
532                 self.message_queue.put_nowait(r)
533                 continue
534
535             if not multipart:
536                 rl = r
537                 break
538             if msgname == 'control_ping_reply':
539                 break
540
541             rl.append(r)
542
543         self.transport.resume()
544
545         return rl
546
547     def _call_vpp_async(self, i, msg, **kwargs):
548         """Given a message, send the message and await a reply.
549
550         msgdef - the message packing definition
551         i - the message type index
552         context - context number - chosen at random if not
553         supplied.
554         The remainder of the kwargs are the arguments to the API call.
555         """
556         if 'context' not in kwargs:
557             context = self.get_context()
558             kwargs['context'] = context
559         else:
560             context = kwargs['context']
561         try:
562             if self.transport.socket_index:
563                 kwargs['client_index'] = self.transport.socket_index
564         except AttributeError:
565             kwargs['client_index'] = 0
566         kwargs['_vl_msg_id'] = i
567         b = msg.pack(kwargs)
568
569         self.transport.write(b)
570
571     def register_event_callback(self, callback):
572         """Register a callback for async messages.
573
574         This will be called for async notifications in sync mode,
575         and all messages in async mode.  In sync mode, replies to
576         requests will not come here.
577
578         callback is a fn(msg_type_name, msg_type) that will be
579         called when a message comes in.  While this function is
580         executing, note that (a) you are in a background thread and
581         may wish to use threading.Lock to protect your datastructures,
582         and (b) message processing from VPP will stop (so if you take
583         a long while about it you may provoke reply timeouts or cause
584         VPP to fill the RX buffer).  Passing None will disable the
585         callback.
586         """
587         self.event_callback = callback
588
589     def thread_msg_handler(self):
590         """Python thread calling the user registered message handler.
591
592         This is to emulate the old style event callback scheme. Modern
593         clients should provide their own thread to poll the event
594         queue.
595         """
596         while True:
597             r = self.message_queue.get()
598             if r == "terminate event thread":
599                 break
600             msgname = type(r).__name__
601             if self.event_callback:
602                 self.event_callback(msgname, r)
603
604
605 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4