c37334cd4e546d1f5969f5d1319b850ca92bf871
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import sys
20 import os
21 import logging
22 import collections
23 import struct
24 import json
25 import threading
26 import fnmatch
27 import weakref
28 import atexit
29 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
30 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
31 from . vpp_format import VPPFormat
32
33 if sys.version[0] == '2':
34     import Queue as queue
35 else:
36     import queue as queue
37
38
39 class VppEnumType(type):
40     def __getattr__(cls, name):
41         t = vpp_get_type(name)
42         return t.enum
43
44
45 # Python3
46 # class VppEnum(metaclass=VppEnumType):
47 #    pass
48 class VppEnum(object):
49     __metaclass__ = VppEnumType
50
51
52 def vpp_atexit(vpp_weakref):
53     """Clean up VPP connection on shutdown."""
54     vpp_instance = vpp_weakref()
55     if vpp_instance and vpp_instance.transport.connected:
56         vpp_instance.logger.debug('Cleaning up VPP on exit')
57         vpp_instance.disconnect()
58
59
60 def vpp_iterator(d):
61     if sys.version[0] == '2':
62         return d.iteritems()
63     else:
64         return d.items()
65
66
67 class VppApiDynamicMethodHolder(object):
68     pass
69
70
71 class FuncWrapper(object):
72     def __init__(self, func):
73         self._func = func
74         self.__name__ = func.__name__
75
76     def __call__(self, **kwargs):
77         return self._func(**kwargs)
78
79
80 class VPPApiError(Exception):
81     pass
82
83
84 class VPPNotImplementedError(NotImplementedError):
85     pass
86
87
88 class VPPIOError(IOError):
89     pass
90
91
92 class VPPRuntimeError(RuntimeError):
93     pass
94
95
96 class VPPValueError(ValueError):
97     pass
98
99
100 class VPP(object):
101     """VPP interface.
102
103     This class provides the APIs to VPP.  The APIs are loaded
104     from provided .api.json files and makes functions accordingly.
105     These functions are documented in the VPP .api files, as they
106     are dynamically created.
107
108     Additionally, VPP can send callback messages; this class
109     provides a means to register a callback function to receive
110     these messages in a background thread.
111     """
112     VPPApiError = VPPApiError
113     VPPRuntimeError = VPPRuntimeError
114     VPPValueError = VPPValueError
115     VPPNotImplementedError = VPPNotImplementedError
116     VPPIOError = VPPIOError
117
118     def process_json_file(self, apidef_file):
119         api = json.load(apidef_file)
120         types = {}
121         for t in api['enums']:
122             t[0] = 'vl_api_' + t[0] + '_t'
123             types[t[0]] = {'type': 'enum', 'data': t}
124         for t in api['unions']:
125             t[0] = 'vl_api_' + t[0] + '_t'
126             types[t[0]] = {'type': 'union', 'data': t}
127         for t in api['types']:
128             t[0] = 'vl_api_' + t[0] + '_t'
129             types[t[0]] = {'type': 'type', 'data': t}
130         for t, v in api['aliases'].items():
131             types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
132         self.services.update(api['services'])
133
134         i = 0
135         while True:
136             unresolved = {}
137             for k, v in types.items():
138                 t = v['data']
139                 if not vpp_get_type(k):
140                     if v['type'] == 'enum':
141                         try:
142                             VPPEnumType(t[0], t[1:])
143                         except ValueError:
144                             unresolved[k] = v
145                     elif v['type'] == 'union':
146                         try:
147                             VPPUnionType(t[0], t[1:])
148                         except ValueError:
149                             unresolved[k] = v
150                     elif v['type'] == 'type':
151                         try:
152                             VPPType(t[0], t[1:])
153                         except ValueError:
154                             unresolved[k] = v
155                     elif v['type'] == 'alias':
156                         try:
157                             VPPTypeAlias(k, t)
158                         except ValueError:
159                             unresolved[k] = v
160             if len(unresolved) == 0:
161                 break
162             if i > 3:
163                 raise VPPValueError('Unresolved type definitions {}'
164                                     .format(unresolved))
165             types = unresolved
166             i += 1
167
168         for m in api['messages']:
169             try:
170                 self.messages[m[0]] = VPPMessage(m[0], m[1:])
171             except VPPNotImplementedError:
172                 self.logger.error('Not implemented error for {}'.format(m[0]))
173
174     def __init__(self, apifiles=None, testmode=False, async_thread=True,
175                  logger=None, loglevel=None,
176                  read_timeout=5, use_socket=False,
177                  server_address='/run/vpp-api.sock'):
178         """Create a VPP API object.
179
180         apifiles is a list of files containing API
181         descriptions that will be loaded - methods will be
182         dynamically created reflecting these APIs.  If not
183         provided this will load the API files from VPP's
184         default install location.
185
186         logger, if supplied, is the logging logger object to log to.
187         loglevel, if supplied, is the log level this logger is set
188         to report at (from the loglevels in the logging module).
189         """
190         if logger is None:
191             logger = logging.getLogger(__name__)
192             if loglevel is not None:
193                 logger.setLevel(loglevel)
194         self.logger = logger
195
196         self.messages = {}
197         self.services = {}
198         self.id_names = []
199         self.id_msgdef = []
200         self.header = VPPType('header', [['u16', 'msgid'],
201                                          ['u32', 'client_index']])
202         self.apifiles = []
203         self.event_callback = None
204         self.message_queue = queue.Queue()
205         self.read_timeout = read_timeout
206         self.async_thread = async_thread
207
208         if use_socket:
209             from . vpp_transport_socket import VppTransport
210         else:
211             from . vpp_transport_shmem import VppTransport
212
213         if not apifiles:
214             # Pick up API definitions from default directory
215             try:
216                 apifiles = self.find_api_files()
217             except RuntimeError:
218                 # In test mode we don't care that we can't find the API files
219                 if testmode:
220                     apifiles = []
221                 else:
222                     raise VPPRuntimeError
223
224         for file in apifiles:
225             with open(file) as apidef_file:
226                 self.process_json_file(apidef_file)
227
228         self.apifiles = apifiles
229
230         # Basic sanity check
231         if len(self.messages) == 0 and not testmode:
232             raise VPPValueError(1, 'Missing JSON message definitions')
233
234         self.transport = VppTransport(self, read_timeout=read_timeout,
235                                       server_address=server_address)
236         # Make sure we allow VPP to clean up the message rings.
237         atexit.register(vpp_atexit, weakref.ref(self))
238
239     class ContextId(object):
240         """Thread-safe provider of unique context IDs."""
241         def __init__(self):
242             self.context = 0
243             self.lock = threading.Lock()
244
245         def __call__(self):
246             """Get a new unique (or, at least, not recently used) context."""
247             with self.lock:
248                 self.context += 1
249                 return self.context
250     get_context = ContextId()
251
252     def get_type(self, name):
253         return vpp_get_type(name)
254
255     @classmethod
256     def find_api_dir(cls):
257         """Attempt to find the best directory in which API definition
258         files may reside. If the value VPP_API_DIR exists in the environment
259         then it is first on the search list. If we're inside a recognized
260         location in a VPP source tree (src/scripts and src/vpp-api/python)
261         then entries from there to the likely locations in build-root are
262         added. Finally the location used by system packages is added.
263
264         :returns: A single directory name, or None if no such directory
265             could be found.
266         """
267         dirs = []
268
269         if 'VPP_API_DIR' in os.environ:
270             dirs.append(os.environ['VPP_API_DIR'])
271
272         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
273         # in which case, plot a course to likely places in the src tree
274         import __main__ as main
275         if hasattr(main, '__file__'):
276             # get the path of the calling script
277             localdir = os.path.dirname(os.path.realpath(main.__file__))
278         else:
279             # use cwd if there is no calling script
280             localdir = os.getcwd()
281         localdir_s = localdir.split(os.path.sep)
282
283         def dmatch(dir):
284             """Match dir against right-hand components of the script dir"""
285             d = dir.split('/')  # param 'dir' assumes a / separator
286             length = len(d)
287             return len(localdir_s) > length and localdir_s[-length:] == d
288
289         def sdir(srcdir, variant):
290             """Build a path from srcdir to the staged API files of
291             'variant'  (typically '' or '_debug')"""
292             # Since 'core' and 'plugin' files are staged
293             # in separate directories, we target the parent dir.
294             return os.path.sep.join((
295                 srcdir,
296                 'build-root',
297                 'install-vpp%s-native' % variant,
298                 'vpp',
299                 'share',
300                 'vpp',
301                 'api',
302             ))
303
304         srcdir = None
305         if dmatch('src/scripts'):
306             srcdir = os.path.sep.join(localdir_s[:-2])
307         elif dmatch('src/vpp-api/python'):
308             srcdir = os.path.sep.join(localdir_s[:-3])
309         elif dmatch('test'):
310             # we're apparently running tests
311             srcdir = os.path.sep.join(localdir_s[:-1])
312
313         if srcdir:
314             # we're in the source tree, try both the debug and release
315             # variants.
316             dirs.append(sdir(srcdir, '_debug'))
317             dirs.append(sdir(srcdir, ''))
318
319         # Test for staged copies of the scripts
320         # For these, since we explicitly know if we're running a debug versus
321         # release variant, target only the relevant directory
322         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
323             srcdir = os.path.sep.join(localdir_s[:-4])
324             dirs.append(sdir(srcdir, '_debug'))
325         if dmatch('build-root/install-vpp-native/vpp/bin'):
326             srcdir = os.path.sep.join(localdir_s[:-4])
327             dirs.append(sdir(srcdir, ''))
328
329         # finally, try the location system packages typically install into
330         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
331
332         # check the directories for existance; first one wins
333         for dir in dirs:
334             if os.path.isdir(dir):
335                 return dir
336
337         return None
338
339     @classmethod
340     def find_api_files(cls, api_dir=None, patterns='*'):
341         """Find API definition files from the given directory tree with the
342         given pattern. If no directory is given then find_api_dir() is used
343         to locate one. If no pattern is given then all definition files found
344         in the directory tree are used.
345
346         :param api_dir: A directory tree in which to locate API definition
347             files; subdirectories are descended into.
348             If this is None then find_api_dir() is called to discover it.
349         :param patterns: A list of patterns to use in each visited directory
350             when looking for files.
351             This can be a list/tuple object or a comma-separated string of
352             patterns. Each value in the list will have leading/trialing
353             whitespace stripped.
354             The pattern specifies the first part of the filename, '.api.json'
355             is appended.
356             The results are de-duplicated, thus overlapping patterns are fine.
357             If this is None it defaults to '*' meaning "all API files".
358         :returns: A list of file paths for the API files found.
359         """
360         if api_dir is None:
361             api_dir = cls.find_api_dir()
362             if api_dir is None:
363                 raise VPPApiError("api_dir cannot be located")
364
365         if isinstance(patterns, list) or isinstance(patterns, tuple):
366             patterns = [p.strip() + '.api.json' for p in patterns]
367         else:
368             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
369
370         api_files = []
371         for root, dirnames, files in os.walk(api_dir):
372             # iterate all given patterns and de-dup the result
373             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
374             for filename in files:
375                 api_files.append(os.path.join(root, filename))
376
377         return api_files
378
379     @property
380     def api(self):
381         if not hasattr(self, "_api"):
382             raise VPPApiError("Not connected, api definitions not available")
383         return self._api
384
385     def make_function(self, msg, i, multipart, do_async):
386         if (do_async):
387             def f(**kwargs):
388                 return self._call_vpp_async(i, msg, **kwargs)
389         else:
390             def f(**kwargs):
391                 return self._call_vpp(i, msg, multipart, **kwargs)
392
393         f.__name__ = str(msg.name)
394         f.__doc__ = ", ".join(["%s %s" %
395                                (msg.fieldtypes[j], k)
396                                for j, k in enumerate(msg.fields)])
397         return f
398
399     def _register_functions(self, do_async=False):
400         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
401         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
402         self._api = VppApiDynamicMethodHolder()
403         for name, msg in vpp_iterator(self.messages):
404             n = name + '_' + msg.crc[2:]
405             i = self.transport.get_msg_index(n.encode())
406             if i > 0:
407                 self.id_msgdef[i] = msg
408                 self.id_names[i] = name
409
410                 # Create function for client side messages.
411                 if name in self.services:
412                     if 'stream' in self.services[name] and self.services[name]['stream']:
413                         multipart = True
414                     else:
415                         multipart = False
416                     f = self.make_function(msg, i, multipart, do_async)
417                     setattr(self._api, name, FuncWrapper(f))
418             else:
419                 self.logger.debug(
420                     'No such message type or failed CRC checksum: %s', n)
421
422     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
423                          do_async):
424         pfx = chroot_prefix.encode() if chroot_prefix else None
425
426         rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
427         if rv != 0:
428             raise VPPIOError(2, 'Connect failed')
429         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
430         self._register_functions(do_async=do_async)
431
432         # Initialise control ping
433         crc = self.messages['control_ping'].crc
434         self.control_ping_index = self.transport.get_msg_index(
435             ('control_ping' + '_' + crc[2:]).encode())
436         self.control_ping_msgdef = self.messages['control_ping']
437         if self.async_thread:
438             self.event_thread = threading.Thread(
439                 target=self.thread_msg_handler)
440             self.event_thread.daemon = True
441             self.event_thread.start()
442         return rv
443
444     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
445         """Attach to VPP.
446
447         name - the name of the client.
448         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
449         do_async - if true, messages are sent without waiting for a reply
450         rx_qlen - the length of the VPP message receive queue between
451         client and server.
452         """
453         msg_handler = self.transport.get_callback(do_async)
454         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
455                                      do_async)
456
457     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
458         """Attach to VPP in synchronous mode. Application must poll for events.
459
460         name - the name of the client.
461         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
462         rx_qlen - the length of the VPP message receive queue between
463         client and server.
464         """
465
466         return self.connect_internal(name, None, chroot_prefix, rx_qlen,
467                                      do_async=False)
468
469     def disconnect(self):
470         """Detach from VPP."""
471         rv = self.transport.disconnect()
472         self.message_queue.put("terminate event thread")
473         return rv
474
475     def msg_handler_sync(self, msg):
476         """Process an incoming message from VPP in sync mode.
477
478         The message may be a reply or it may be an async notification.
479         """
480         r = self.decode_incoming_msg(msg)
481         if r is None:
482             return
483
484         # If we have a context, then use the context to find any
485         # request waiting for a reply
486         context = 0
487         if hasattr(r, 'context') and r.context > 0:
488             context = r.context
489
490         if context == 0:
491             # No context -> async notification that we feed to the callback
492             self.message_queue.put_nowait(r)
493         else:
494             raise VPPIOError(2, 'RPC reply message received in event handler')
495
496     def decode_incoming_msg(self, msg):
497         if not msg:
498             self.logger.warning('vpp_api.read failed')
499             return
500         (i, ci), size = self.header.unpack(msg, 0)
501         if self.id_names[i] == 'rx_thread_exit':
502             return
503
504         #
505         # Decode message and returns a tuple.
506         #
507         msgobj = self.id_msgdef[i]
508         if not msgobj:
509             raise VPPIOError(2, 'Reply message undefined')
510
511         r, size = msgobj.unpack(msg)
512         return r
513
514     def msg_handler_async(self, msg):
515         """Process a message from VPP in async mode.
516
517         In async mode, all messages are returned to the callback.
518         """
519         r = self.decode_incoming_msg(msg)
520         if r is None:
521             return
522
523         msgname = type(r).__name__
524
525         if self.event_callback:
526             self.event_callback(msgname, r)
527
528     def _control_ping(self, context):
529         """Send a ping command."""
530         self._call_vpp_async(self.control_ping_index,
531                              self.control_ping_msgdef,
532                              context=context)
533
534     def validate_args(self, msg, kwargs):
535         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
536         if d:
537             raise VPPValueError('Invalid argument {} to {}'
538                              .format(list(d), msg.name))
539
540     def _call_vpp(self, i, msg, multipart, **kwargs):
541         """Given a message, send the message and await a reply.
542
543         msgdef - the message packing definition
544         i - the message type index
545         multipart - True if the message returns multiple
546         messages in return.
547         context - context number - chosen at random if not
548         supplied.
549         The remainder of the kwargs are the arguments to the API call.
550
551         The return value is the message or message array containing
552         the response.  It will raise an IOError exception if there was
553         no response within the timeout window.
554         """
555
556         if 'context' not in kwargs:
557             context = self.get_context()
558             kwargs['context'] = context
559         else:
560             context = kwargs['context']
561         kwargs['_vl_msg_id'] = i
562
563         try:
564             if self.transport.socket_index:
565                 kwargs['client_index'] = self.transport.socket_index
566         except AttributeError:
567             pass
568         self.validate_args(msg, kwargs)
569         b = msg.pack(kwargs)
570         self.transport.suspend()
571
572         self.transport.write(b)
573
574         if multipart:
575             # Send a ping after the request - we use its response
576             # to detect that we have seen all results.
577             self._control_ping(context)
578
579         # Block until we get a reply.
580         rl = []
581         while (True):
582             msg = self.transport.read()
583             if not msg:
584                 raise VPPIOError(2, 'VPP API client: read failed')
585             r = self.decode_incoming_msg(msg)
586             msgname = type(r).__name__
587             if context not in r or r.context == 0 or context != r.context:
588                 # Message being queued
589                 self.message_queue.put_nowait(r)
590                 continue
591
592             if not multipart:
593                 rl = r
594                 break
595             if msgname == 'control_ping_reply':
596                 break
597
598             rl.append(r)
599
600         self.transport.resume()
601
602         return rl
603
604     def _call_vpp_async(self, i, msg, **kwargs):
605         """Given a message, send the message and await a reply.
606
607         msgdef - the message packing definition
608         i - the message type index
609         context - context number - chosen at random if not
610         supplied.
611         The remainder of the kwargs are the arguments to the API call.
612         """
613         if 'context' not in kwargs:
614             context = self.get_context()
615             kwargs['context'] = context
616         else:
617             context = kwargs['context']
618         try:
619             if self.transport.socket_index:
620                 kwargs['client_index'] = self.transport.socket_index
621         except AttributeError:
622             kwargs['client_index'] = 0
623         kwargs['_vl_msg_id'] = i
624         b = msg.pack(kwargs)
625
626         self.transport.write(b)
627
628     def register_event_callback(self, callback):
629         """Register a callback for async messages.
630
631         This will be called for async notifications in sync mode,
632         and all messages in async mode.  In sync mode, replies to
633         requests will not come here.
634
635         callback is a fn(msg_type_name, msg_type) that will be
636         called when a message comes in.  While this function is
637         executing, note that (a) you are in a background thread and
638         may wish to use threading.Lock to protect your datastructures,
639         and (b) message processing from VPP will stop (so if you take
640         a long while about it you may provoke reply timeouts or cause
641         VPP to fill the RX buffer).  Passing None will disable the
642         callback.
643         """
644         self.event_callback = callback
645
646     def thread_msg_handler(self):
647         """Python thread calling the user registered message handler.
648
649         This is to emulate the old style event callback scheme. Modern
650         clients should provide their own thread to poll the event
651         queue.
652         """
653         while True:
654             r = self.message_queue.get()
655             if r == "terminate event thread":
656                 break
657             msgname = type(r).__name__
658             if self.event_callback:
659                 self.event_callback(msgname, r)
660
661
662 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4