ca4b955fd0725eba94df12171c03a9bddc68e536
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import sys
20 import os
21 import logging
22 import collections
23 import struct
24 import json
25 import threading
26 import fnmatch
27 import weakref
28 import atexit
29 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
30 from . vpp_serializer import VPPMessage, vpp_get_type
31 from . vpp_format import VPPFormat
32
33 if sys.version[0] == '2':
34     import Queue as queue
35 else:
36     import queue as queue
37
38
39 class VppEnumType(type):
40     def __getattr__(cls, name):
41         t = vpp_get_type(name)
42         return t.enum
43
44
45 # Python3
46 # class VppEnum(metaclass=VppEnumType):
47 #    pass
48 class VppEnum(object):
49     __metaclass__ = VppEnumType
50
51
52 def vpp_atexit(vpp_weakref):
53     """Clean up VPP connection on shutdown."""
54     vpp_instance = vpp_weakref()
55     if vpp_instance and vpp_instance.transport.connected:
56         vpp_instance.logger.debug('Cleaning up VPP on exit')
57         vpp_instance.disconnect()
58
59
60 def vpp_iterator(d):
61     if sys.version[0] == '2':
62         return d.iteritems()
63     else:
64         return d.items()
65
66
67 class VppApiDynamicMethodHolder(object):
68     pass
69
70
71 class FuncWrapper(object):
72     def __init__(self, func):
73         self._func = func
74         self.__name__ = func.__name__
75
76     def __call__(self, **kwargs):
77         return self._func(**kwargs)
78
79
80 class VPP(object):
81     """VPP interface.
82
83     This class provides the APIs to VPP.  The APIs are loaded
84     from provided .api.json files and makes functions accordingly.
85     These functions are documented in the VPP .api files, as they
86     are dynamically created.
87
88     Additionally, VPP can send callback messages; this class
89     provides a means to register a callback function to receive
90     these messages in a background thread.
91     """
92
93     def process_json_file(self, apidef_file):
94         api = json.load(apidef_file)
95         types = {}
96         for t in api['enums']:
97             t[0] = 'vl_api_' + t[0] + '_t'
98             types[t[0]] = {'type': 'enum', 'data': t}
99         for t in api['unions']:
100             t[0] = 'vl_api_' + t[0] + '_t'
101             types[t[0]] = {'type': 'union', 'data': t}
102         for t in api['types']:
103             t[0] = 'vl_api_' + t[0] + '_t'
104             types[t[0]] = {'type': 'type', 'data': t}
105
106         i = 0
107         while True:
108             unresolved = {}
109             for k, v in types.items():
110                 t = v['data']
111                 if not vpp_get_type(t[0]):
112                     if v['type'] == 'enum':
113                         try:
114                             VPPEnumType(t[0], t[1:])
115                         except ValueError:
116                             unresolved[k] = v
117                     elif v['type'] == 'union':
118                         try:
119                             VPPUnionType(t[0], t[1:])
120                         except ValueError:
121                             unresolved[k] = v
122                     elif v['type'] == 'type':
123                         try:
124                             VPPType(t[0], t[1:])
125                         except ValueError:
126                             unresolved[k] = v
127             if len(unresolved) == 0:
128                 break
129             if i > 3:
130                 raise ValueError('Unresolved type definitions {}'
131                                  .format(unresolved))
132             types = unresolved
133             i += 1
134
135         for m in api['messages']:
136             try:
137                 self.messages[m[0]] = VPPMessage(m[0], m[1:])
138             except NotImplementedError:
139                 self.logger.error('Not implemented error for {}'.format(m[0]))
140
141     def __init__(self, apifiles=None, testmode=False, async_thread=True,
142                  logger=None, loglevel=None,
143                  read_timeout=5, use_socket=False,
144                  server_address='/run/vpp-api.sock'):
145         """Create a VPP API object.
146
147         apifiles is a list of files containing API
148         descriptions that will be loaded - methods will be
149         dynamically created reflecting these APIs.  If not
150         provided this will load the API files from VPP's
151         default install location.
152
153         logger, if supplied, is the logging logger object to log to.
154         loglevel, if supplied, is the log level this logger is set
155         to report at (from the loglevels in the logging module).
156         """
157         if logger is None:
158             logger = logging.getLogger(__name__)
159             if loglevel is not None:
160                 logger.setLevel(loglevel)
161         self.logger = logger
162
163         self.messages = {}
164         self.id_names = []
165         self.id_msgdef = []
166         self.header = VPPType('header', [['u16', 'msgid'],
167                                          ['u32', 'client_index']])
168         self.apifiles = []
169         self.event_callback = None
170         self.message_queue = queue.Queue()
171         self.read_timeout = read_timeout
172         self.async_thread = async_thread
173
174         if use_socket:
175             from . vpp_transport_socket import VppTransport
176         else:
177             from . vpp_transport_shmem import VppTransport
178
179         if not apifiles:
180             # Pick up API definitions from default directory
181             try:
182                 apifiles = self.find_api_files()
183             except RuntimeError:
184                 # In test mode we don't care that we can't find the API files
185                 if testmode:
186                     apifiles = []
187                 else:
188                     raise
189
190         for file in apifiles:
191             with open(file) as apidef_file:
192                 self.process_json_file(apidef_file)
193
194         self.apifiles = apifiles
195
196         # Basic sanity check
197         if len(self.messages) == 0 and not testmode:
198             raise ValueError(1, 'Missing JSON message definitions')
199
200         self.transport = VppTransport(self, read_timeout=read_timeout,
201                                       server_address=server_address)
202         # Make sure we allow VPP to clean up the message rings.
203         atexit.register(vpp_atexit, weakref.ref(self))
204
205     class ContextId(object):
206         """Thread-safe provider of unique context IDs."""
207         def __init__(self):
208             self.context = 0
209             self.lock = threading.Lock()
210
211         def __call__(self):
212             """Get a new unique (or, at least, not recently used) context."""
213             with self.lock:
214                 self.context += 1
215                 return self.context
216     get_context = ContextId()
217
218     def get_type(self, name):
219         return vpp_get_type(name)
220
221     @classmethod
222     def find_api_dir(cls):
223         """Attempt to find the best directory in which API definition
224         files may reside. If the value VPP_API_DIR exists in the environment
225         then it is first on the search list. If we're inside a recognized
226         location in a VPP source tree (src/scripts and src/vpp-api/python)
227         then entries from there to the likely locations in build-root are
228         added. Finally the location used by system packages is added.
229
230         :returns: A single directory name, or None if no such directory
231             could be found.
232         """
233         dirs = []
234
235         if 'VPP_API_DIR' in os.environ:
236             dirs.append(os.environ['VPP_API_DIR'])
237
238         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
239         # in which case, plot a course to likely places in the src tree
240         import __main__ as main
241         if hasattr(main, '__file__'):
242             # get the path of the calling script
243             localdir = os.path.dirname(os.path.realpath(main.__file__))
244         else:
245             # use cwd if there is no calling script
246             localdir = os.getcwd()
247         localdir_s = localdir.split(os.path.sep)
248
249         def dmatch(dir):
250             """Match dir against right-hand components of the script dir"""
251             d = dir.split('/')  # param 'dir' assumes a / separator
252             length = len(d)
253             return len(localdir_s) > length and localdir_s[-length:] == d
254
255         def sdir(srcdir, variant):
256             """Build a path from srcdir to the staged API files of
257             'variant'  (typically '' or '_debug')"""
258             # Since 'core' and 'plugin' files are staged
259             # in separate directories, we target the parent dir.
260             return os.path.sep.join((
261                 srcdir,
262                 'build-root',
263                 'install-vpp%s-native' % variant,
264                 'vpp',
265                 'share',
266                 'vpp',
267                 'api',
268             ))
269
270         srcdir = None
271         if dmatch('src/scripts'):
272             srcdir = os.path.sep.join(localdir_s[:-2])
273         elif dmatch('src/vpp-api/python'):
274             srcdir = os.path.sep.join(localdir_s[:-3])
275         elif dmatch('test'):
276             # we're apparently running tests
277             srcdir = os.path.sep.join(localdir_s[:-1])
278
279         if srcdir:
280             # we're in the source tree, try both the debug and release
281             # variants.
282             dirs.append(sdir(srcdir, '_debug'))
283             dirs.append(sdir(srcdir, ''))
284
285         # Test for staged copies of the scripts
286         # For these, since we explicitly know if we're running a debug versus
287         # release variant, target only the relevant directory
288         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
289             srcdir = os.path.sep.join(localdir_s[:-4])
290             dirs.append(sdir(srcdir, '_debug'))
291         if dmatch('build-root/install-vpp-native/vpp/bin'):
292             srcdir = os.path.sep.join(localdir_s[:-4])
293             dirs.append(sdir(srcdir, ''))
294
295         # finally, try the location system packages typically install into
296         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
297
298         # check the directories for existance; first one wins
299         for dir in dirs:
300             if os.path.isdir(dir):
301                 return dir
302
303         return None
304
305     @classmethod
306     def find_api_files(cls, api_dir=None, patterns='*'):
307         """Find API definition files from the given directory tree with the
308         given pattern. If no directory is given then find_api_dir() is used
309         to locate one. If no pattern is given then all definition files found
310         in the directory tree are used.
311
312         :param api_dir: A directory tree in which to locate API definition
313             files; subdirectories are descended into.
314             If this is None then find_api_dir() is called to discover it.
315         :param patterns: A list of patterns to use in each visited directory
316             when looking for files.
317             This can be a list/tuple object or a comma-separated string of
318             patterns. Each value in the list will have leading/trialing
319             whitespace stripped.
320             The pattern specifies the first part of the filename, '.api.json'
321             is appended.
322             The results are de-duplicated, thus overlapping patterns are fine.
323             If this is None it defaults to '*' meaning "all API files".
324         :returns: A list of file paths for the API files found.
325         """
326         if api_dir is None:
327             api_dir = cls.find_api_dir()
328             if api_dir is None:
329                 raise RuntimeError("api_dir cannot be located")
330
331         if isinstance(patterns, list) or isinstance(patterns, tuple):
332             patterns = [p.strip() + '.api.json' for p in patterns]
333         else:
334             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
335
336         api_files = []
337         for root, dirnames, files in os.walk(api_dir):
338             # iterate all given patterns and de-dup the result
339             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
340             for filename in files:
341                 api_files.append(os.path.join(root, filename))
342
343         return api_files
344
345     @property
346     def api(self):
347         if not hasattr(self, "_api"):
348             raise Exception("Not connected, api definitions not available")
349         return self._api
350
351     def make_function(self, msg, i, multipart, do_async):
352         if (do_async):
353             def f(**kwargs):
354                 return self._call_vpp_async(i, msg, **kwargs)
355         else:
356             def f(**kwargs):
357                 return self._call_vpp(i, msg, multipart, **kwargs)
358
359         f.__name__ = str(msg.name)
360         f.__doc__ = ", ".join(["%s %s" %
361                                (msg.fieldtypes[j], k)
362                                for j, k in enumerate(msg.fields)])
363         return f
364
365     def _register_functions(self, do_async=False):
366         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
367         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
368         self._api = VppApiDynamicMethodHolder()
369         for name, msg in vpp_iterator(self.messages):
370             n = name + '_' + msg.crc[2:]
371             i = self.transport.get_msg_index(n.encode())
372             if i > 0:
373                 self.id_msgdef[i] = msg
374                 self.id_names[i] = name
375                 # TODO: Fix multipart (use services)
376                 multipart = True if name.find('_dump') > 0 else False
377                 f = self.make_function(msg, i, multipart, do_async)
378                 setattr(self._api, name, FuncWrapper(f))
379             else:
380                 self.logger.debug(
381                     'No such message type or failed CRC checksum: %s', n)
382
383     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
384                          do_async):
385         pfx = chroot_prefix.encode() if chroot_prefix else None
386
387         rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
388         if rv != 0:
389             raise IOError(2, 'Connect failed')
390         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
391         self._register_functions(do_async=do_async)
392
393         # Initialise control ping
394         crc = self.messages['control_ping'].crc
395         self.control_ping_index = self.transport.get_msg_index(
396             ('control_ping' + '_' + crc[2:]).encode())
397         self.control_ping_msgdef = self.messages['control_ping']
398         if self.async_thread:
399             self.event_thread = threading.Thread(
400                 target=self.thread_msg_handler)
401             self.event_thread.daemon = True
402             self.event_thread.start()
403         return rv
404
405     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
406         """Attach to VPP.
407
408         name - the name of the client.
409         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
410         do_async - if true, messages are sent without waiting for a reply
411         rx_qlen - the length of the VPP message receive queue between
412         client and server.
413         """
414         msg_handler = self.transport.get_callback(do_async)
415         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
416                                      do_async)
417
418     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
419         """Attach to VPP in synchronous mode. Application must poll for events.
420
421         name - the name of the client.
422         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
423         rx_qlen - the length of the VPP message receive queue between
424         client and server.
425         """
426
427         return self.connect_internal(name, None, chroot_prefix, rx_qlen,
428                                      do_async=False)
429
430     def disconnect(self):
431         """Detach from VPP."""
432         rv = self.transport.disconnect()
433         self.message_queue.put("terminate event thread")
434         return rv
435
436     def msg_handler_sync(self, msg):
437         """Process an incoming message from VPP in sync mode.
438
439         The message may be a reply or it may be an async notification.
440         """
441         r = self.decode_incoming_msg(msg)
442         if r is None:
443             return
444
445         # If we have a context, then use the context to find any
446         # request waiting for a reply
447         context = 0
448         if hasattr(r, 'context') and r.context > 0:
449             context = r.context
450
451         if context == 0:
452             # No context -> async notification that we feed to the callback
453             self.message_queue.put_nowait(r)
454         else:
455             raise IOError(2, 'RPC reply message received in event handler')
456
457     def decode_incoming_msg(self, msg):
458         if not msg:
459             self.logger.warning('vpp_api.read failed')
460             return
461         (i, ci), size = self.header.unpack(msg, 0)
462         if self.id_names[i] == 'rx_thread_exit':
463             return
464
465         #
466         # Decode message and returns a tuple.
467         #
468         msgobj = self.id_msgdef[i]
469         if not msgobj:
470             raise IOError(2, 'Reply message undefined')
471
472         r, size = msgobj.unpack(msg)
473         return r
474
475     def msg_handler_async(self, msg):
476         """Process a message from VPP in async mode.
477
478         In async mode, all messages are returned to the callback.
479         """
480         r = self.decode_incoming_msg(msg)
481         if r is None:
482             return
483
484         msgname = type(r).__name__
485
486         if self.event_callback:
487             self.event_callback(msgname, r)
488
489     def _control_ping(self, context):
490         """Send a ping command."""
491         self._call_vpp_async(self.control_ping_index,
492                              self.control_ping_msgdef,
493                              context=context)
494
495     def validate_args(self, msg, kwargs):
496         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
497         if d:
498             raise ValueError('Invalid argument {} to {}'
499                              .format(list(d), msg.name))
500
501     def _call_vpp(self, i, msg, multipart, **kwargs):
502         """Given a message, send the message and await a reply.
503
504         msgdef - the message packing definition
505         i - the message type index
506         multipart - True if the message returns multiple
507         messages in return.
508         context - context number - chosen at random if not
509         supplied.
510         The remainder of the kwargs are the arguments to the API call.
511
512         The return value is the message or message array containing
513         the response.  It will raise an IOError exception if there was
514         no response within the timeout window.
515         """
516
517         if 'context' not in kwargs:
518             context = self.get_context()
519             kwargs['context'] = context
520         else:
521             context = kwargs['context']
522         kwargs['_vl_msg_id'] = i
523
524         try:
525             if self.transport.socket_index:
526                 kwargs['client_index'] = self.transport.socket_index
527         except AttributeError:
528             pass
529         self.validate_args(msg, kwargs)
530         b = msg.pack(kwargs)
531         self.transport.suspend()
532
533         self.transport.write(b)
534
535         if multipart:
536             # Send a ping after the request - we use its response
537             # to detect that we have seen all results.
538             self._control_ping(context)
539
540         # Block until we get a reply.
541         rl = []
542         while (True):
543             msg = self.transport.read()
544             if not msg:
545                 raise IOError(2, 'VPP API client: read failed')
546             r = self.decode_incoming_msg(msg)
547             msgname = type(r).__name__
548             if context not in r or r.context == 0 or context != r.context:
549                 # Message being queued
550                 self.message_queue.put_nowait(r)
551                 continue
552
553             if not multipart:
554                 rl = r
555                 break
556             if msgname == 'control_ping_reply':
557                 break
558
559             rl.append(r)
560
561         self.transport.resume()
562
563         return rl
564
565     def _call_vpp_async(self, i, msg, **kwargs):
566         """Given a message, send the message and await a reply.
567
568         msgdef - the message packing definition
569         i - the message type index
570         context - context number - chosen at random if not
571         supplied.
572         The remainder of the kwargs are the arguments to the API call.
573         """
574         if 'context' not in kwargs:
575             context = self.get_context()
576             kwargs['context'] = context
577         else:
578             context = kwargs['context']
579         try:
580             if self.transport.socket_index:
581                 kwargs['client_index'] = self.transport.socket_index
582         except AttributeError:
583             kwargs['client_index'] = 0
584         kwargs['_vl_msg_id'] = i
585         b = msg.pack(kwargs)
586
587         self.transport.write(b)
588
589     def register_event_callback(self, callback):
590         """Register a callback for async messages.
591
592         This will be called for async notifications in sync mode,
593         and all messages in async mode.  In sync mode, replies to
594         requests will not come here.
595
596         callback is a fn(msg_type_name, msg_type) that will be
597         called when a message comes in.  While this function is
598         executing, note that (a) you are in a background thread and
599         may wish to use threading.Lock to protect your datastructures,
600         and (b) message processing from VPP will stop (so if you take
601         a long while about it you may provoke reply timeouts or cause
602         VPP to fill the RX buffer).  Passing None will disable the
603         callback.
604         """
605         self.event_callback = callback
606
607     def thread_msg_handler(self):
608         """Python thread calling the user registered message handler.
609
610         This is to emulate the old style event callback scheme. Modern
611         clients should provide their own thread to poll the event
612         queue.
613         """
614         while True:
615             r = self.message_queue.get()
616             if r == "terminate event thread":
617                 break
618             msgname = type(r).__name__
619             if self.event_callback:
620                 self.event_callback(msgname, r)
621
622
623 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4