32abe14da949ee231a07bb3167f794413c847fd2
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import sys
20 import os
21 import logging
22 import collections
23 import struct
24 import json
25 import threading
26 import fnmatch
27 import weakref
28 import atexit
29 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
30 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
31
32 if sys.version[0] == '2':
33     import Queue as queue
34 else:
35     import queue as queue
36
37
38 class VppEnumType(type):
39     def __getattr__(cls, name):
40         t = vpp_get_type(name)
41         return t.enum
42
43
44 # Python3
45 # class VppEnum(metaclass=VppEnumType):
46 #    pass
47 class VppEnum(object):
48     __metaclass__ = VppEnumType
49
50
51 def vpp_atexit(vpp_weakref):
52     """Clean up VPP connection on shutdown."""
53     vpp_instance = vpp_weakref()
54     if vpp_instance and vpp_instance.transport.connected:
55         vpp_instance.logger.debug('Cleaning up VPP on exit')
56         vpp_instance.disconnect()
57
58 if sys.version[0] == '2':
59     def vpp_iterator(d):
60         return d.iteritems()
61 else:
62     def vpp_iterator(d):
63         return d.items()
64
65
66 class VppApiDynamicMethodHolder(object):
67     pass
68
69
70 class FuncWrapper(object):
71     def __init__(self, func):
72         self._func = func
73         self.__name__ = func.__name__
74
75     def __call__(self, **kwargs):
76         return self._func(**kwargs)
77
78
79 class VPPApiError(Exception):
80     pass
81
82
83 class VPPNotImplementedError(NotImplementedError):
84     pass
85
86
87 class VPPIOError(IOError):
88     pass
89
90
91 class VPPRuntimeError(RuntimeError):
92     pass
93
94
95 class VPPValueError(ValueError):
96     pass
97
98
99 class VPP(object):
100     """VPP interface.
101
102     This class provides the APIs to VPP.  The APIs are loaded
103     from provided .api.json files and makes functions accordingly.
104     These functions are documented in the VPP .api files, as they
105     are dynamically created.
106
107     Additionally, VPP can send callback messages; this class
108     provides a means to register a callback function to receive
109     these messages in a background thread.
110     """
111     VPPApiError = VPPApiError
112     VPPRuntimeError = VPPRuntimeError
113     VPPValueError = VPPValueError
114     VPPNotImplementedError = VPPNotImplementedError
115     VPPIOError = VPPIOError
116
117     def process_json_file(self, apidef_file):
118         api = json.load(apidef_file)
119         types = {}
120         for t in api['enums']:
121             t[0] = 'vl_api_' + t[0] + '_t'
122             types[t[0]] = {'type': 'enum', 'data': t}
123         for t in api['unions']:
124             t[0] = 'vl_api_' + t[0] + '_t'
125             types[t[0]] = {'type': 'union', 'data': t}
126         for t in api['types']:
127             t[0] = 'vl_api_' + t[0] + '_t'
128             types[t[0]] = {'type': 'type', 'data': t}
129         for t, v in api['aliases'].items():
130             types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
131         self.services.update(api['services'])
132
133         i = 0
134         while True:
135             unresolved = {}
136             for k, v in types.items():
137                 t = v['data']
138                 if not vpp_get_type(k):
139                     if v['type'] == 'enum':
140                         try:
141                             VPPEnumType(t[0], t[1:])
142                         except ValueError:
143                             unresolved[k] = v
144                     elif v['type'] == 'union':
145                         try:
146                             VPPUnionType(t[0], t[1:])
147                         except ValueError:
148                             unresolved[k] = v
149                     elif v['type'] == 'type':
150                         try:
151                             VPPType(t[0], t[1:])
152                         except ValueError:
153                             unresolved[k] = v
154                     elif v['type'] == 'alias':
155                         try:
156                             VPPTypeAlias(k, t)
157                         except ValueError:
158                             unresolved[k] = v
159             if len(unresolved) == 0:
160                 break
161             if i > 3:
162                 raise VPPValueError('Unresolved type definitions {}'
163                                     .format(unresolved))
164             types = unresolved
165             i += 1
166
167         for m in api['messages']:
168             try:
169                 self.messages[m[0]] = VPPMessage(m[0], m[1:])
170             except VPPNotImplementedError:
171                 self.logger.error('Not implemented error for {}'.format(m[0]))
172
173     def __init__(self, apifiles=None, testmode=False, async_thread=True,
174                  logger=None, loglevel=None,
175                  read_timeout=5, use_socket=False,
176                  server_address='/run/vpp-api.sock'):
177         """Create a VPP API object.
178
179         apifiles is a list of files containing API
180         descriptions that will be loaded - methods will be
181         dynamically created reflecting these APIs.  If not
182         provided this will load the API files from VPP's
183         default install location.
184
185         logger, if supplied, is the logging logger object to log to.
186         loglevel, if supplied, is the log level this logger is set
187         to report at (from the loglevels in the logging module).
188         """
189         if logger is None:
190             logger = logging.getLogger(__name__)
191             if loglevel is not None:
192                 logger.setLevel(loglevel)
193         self.logger = logger
194
195         self.messages = {}
196         self.services = {}
197         self.id_names = []
198         self.id_msgdef = []
199         self.header = VPPType('header', [['u16', 'msgid'],
200                                          ['u32', 'client_index']])
201         self.apifiles = []
202         self.event_callback = None
203         self.message_queue = queue.Queue()
204         self.read_timeout = read_timeout
205         self.async_thread = async_thread
206
207         if use_socket:
208             from . vpp_transport_socket import VppTransport
209         else:
210             from . vpp_transport_shmem import VppTransport
211
212         if not apifiles:
213             # Pick up API definitions from default directory
214             try:
215                 apifiles = self.find_api_files()
216             except RuntimeError:
217                 # In test mode we don't care that we can't find the API files
218                 if testmode:
219                     apifiles = []
220                 else:
221                     raise VPPRuntimeError
222
223         for file in apifiles:
224             with open(file) as apidef_file:
225                 self.process_json_file(apidef_file)
226
227         self.apifiles = apifiles
228
229         # Basic sanity check
230         if len(self.messages) == 0 and not testmode:
231             raise VPPValueError(1, 'Missing JSON message definitions')
232
233         self.transport = VppTransport(self, read_timeout=read_timeout,
234                                       server_address=server_address)
235         # Make sure we allow VPP to clean up the message rings.
236         atexit.register(vpp_atexit, weakref.ref(self))
237
238     class ContextId(object):
239         """Thread-safe provider of unique context IDs."""
240         def __init__(self):
241             self.context = 0
242             self.lock = threading.Lock()
243
244         def __call__(self):
245             """Get a new unique (or, at least, not recently used) context."""
246             with self.lock:
247                 self.context += 1
248                 return self.context
249     get_context = ContextId()
250
251     def get_type(self, name):
252         return vpp_get_type(name)
253
254     @classmethod
255     def find_api_dir(cls):
256         """Attempt to find the best directory in which API definition
257         files may reside. If the value VPP_API_DIR exists in the environment
258         then it is first on the search list. If we're inside a recognized
259         location in a VPP source tree (src/scripts and src/vpp-api/python)
260         then entries from there to the likely locations in build-root are
261         added. Finally the location used by system packages is added.
262
263         :returns: A single directory name, or None if no such directory
264             could be found.
265         """
266         dirs = []
267
268         if 'VPP_API_DIR' in os.environ:
269             dirs.append(os.environ['VPP_API_DIR'])
270
271         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
272         # in which case, plot a course to likely places in the src tree
273         import __main__ as main
274         if hasattr(main, '__file__'):
275             # get the path of the calling script
276             localdir = os.path.dirname(os.path.realpath(main.__file__))
277         else:
278             # use cwd if there is no calling script
279             localdir = os.getcwd()
280         localdir_s = localdir.split(os.path.sep)
281
282         def dmatch(dir):
283             """Match dir against right-hand components of the script dir"""
284             d = dir.split('/')  # param 'dir' assumes a / separator
285             length = len(d)
286             return len(localdir_s) > length and localdir_s[-length:] == d
287
288         def sdir(srcdir, variant):
289             """Build a path from srcdir to the staged API files of
290             'variant'  (typically '' or '_debug')"""
291             # Since 'core' and 'plugin' files are staged
292             # in separate directories, we target the parent dir.
293             return os.path.sep.join((
294                 srcdir,
295                 'build-root',
296                 'install-vpp%s-native' % variant,
297                 'vpp',
298                 'share',
299                 'vpp',
300                 'api',
301             ))
302
303         srcdir = None
304         if dmatch('src/scripts'):
305             srcdir = os.path.sep.join(localdir_s[:-2])
306         elif dmatch('src/vpp-api/python'):
307             srcdir = os.path.sep.join(localdir_s[:-3])
308         elif dmatch('test'):
309             # we're apparently running tests
310             srcdir = os.path.sep.join(localdir_s[:-1])
311
312         if srcdir:
313             # we're in the source tree, try both the debug and release
314             # variants.
315             dirs.append(sdir(srcdir, '_debug'))
316             dirs.append(sdir(srcdir, ''))
317
318         # Test for staged copies of the scripts
319         # For these, since we explicitly know if we're running a debug versus
320         # release variant, target only the relevant directory
321         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
322             srcdir = os.path.sep.join(localdir_s[:-4])
323             dirs.append(sdir(srcdir, '_debug'))
324         if dmatch('build-root/install-vpp-native/vpp/bin'):
325             srcdir = os.path.sep.join(localdir_s[:-4])
326             dirs.append(sdir(srcdir, ''))
327
328         # finally, try the location system packages typically install into
329         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
330
331         # check the directories for existance; first one wins
332         for dir in dirs:
333             if os.path.isdir(dir):
334                 return dir
335
336         return None
337
338     @classmethod
339     def find_api_files(cls, api_dir=None, patterns='*'):
340         """Find API definition files from the given directory tree with the
341         given pattern. If no directory is given then find_api_dir() is used
342         to locate one. If no pattern is given then all definition files found
343         in the directory tree are used.
344
345         :param api_dir: A directory tree in which to locate API definition
346             files; subdirectories are descended into.
347             If this is None then find_api_dir() is called to discover it.
348         :param patterns: A list of patterns to use in each visited directory
349             when looking for files.
350             This can be a list/tuple object or a comma-separated string of
351             patterns. Each value in the list will have leading/trialing
352             whitespace stripped.
353             The pattern specifies the first part of the filename, '.api.json'
354             is appended.
355             The results are de-duplicated, thus overlapping patterns are fine.
356             If this is None it defaults to '*' meaning "all API files".
357         :returns: A list of file paths for the API files found.
358         """
359         if api_dir is None:
360             api_dir = cls.find_api_dir()
361             if api_dir is None:
362                 raise VPPApiError("api_dir cannot be located")
363
364         if isinstance(patterns, list) or isinstance(patterns, tuple):
365             patterns = [p.strip() + '.api.json' for p in patterns]
366         else:
367             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
368
369         api_files = []
370         for root, dirnames, files in os.walk(api_dir):
371             # iterate all given patterns and de-dup the result
372             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
373             for filename in files:
374                 api_files.append(os.path.join(root, filename))
375
376         return api_files
377
378     @property
379     def api(self):
380         if not hasattr(self, "_api"):
381             raise VPPApiError("Not connected, api definitions not available")
382         return self._api
383
384     def make_function(self, msg, i, multipart, do_async):
385         if (do_async):
386             def f(**kwargs):
387                 return self._call_vpp_async(i, msg, **kwargs)
388         else:
389             def f(**kwargs):
390                 return self._call_vpp(i, msg, multipart, **kwargs)
391
392         f.__name__ = str(msg.name)
393         f.__doc__ = ", ".join(["%s %s" %
394                                (msg.fieldtypes[j], k)
395                                for j, k in enumerate(msg.fields)])
396         return f
397
398     def _register_functions(self, do_async=False):
399         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
400         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
401         self._api = VppApiDynamicMethodHolder()
402         for name, msg in vpp_iterator(self.messages):
403             n = name + '_' + msg.crc[2:]
404             i = self.transport.get_msg_index(n.encode())
405             if i > 0:
406                 self.id_msgdef[i] = msg
407                 self.id_names[i] = name
408
409                 # Create function for client side messages.
410                 if name in self.services:
411                     if 'stream' in self.services[name] and \
412                        self.services[name]['stream']:
413                         multipart = True
414                     else:
415                         multipart = False
416                     f = self.make_function(msg, i, multipart, do_async)
417                     setattr(self._api, name, FuncWrapper(f))
418             else:
419                 self.logger.debug(
420                     'No such message type or failed CRC checksum: %s', n)
421
422     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
423                          do_async):
424         pfx = chroot_prefix.encode() if chroot_prefix else None
425
426         rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
427         if rv != 0:
428             raise VPPIOError(2, 'Connect failed')
429         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
430         self._register_functions(do_async=do_async)
431
432         # Initialise control ping
433         crc = self.messages['control_ping'].crc
434         self.control_ping_index = self.transport.get_msg_index(
435             ('control_ping' + '_' + crc[2:]).encode())
436         self.control_ping_msgdef = self.messages['control_ping']
437         if self.async_thread:
438             self.event_thread = threading.Thread(
439                 target=self.thread_msg_handler)
440             self.event_thread.daemon = True
441             self.event_thread.start()
442         return rv
443
444     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
445         """Attach to VPP.
446
447         name - the name of the client.
448         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
449         do_async - if true, messages are sent without waiting for a reply
450         rx_qlen - the length of the VPP message receive queue between
451         client and server.
452         """
453         msg_handler = self.transport.get_callback(do_async)
454         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
455                                      do_async)
456
457     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
458         """Attach to VPP in synchronous mode. Application must poll for events.
459
460         name - the name of the client.
461         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
462         rx_qlen - the length of the VPP message receive queue between
463         client and server.
464         """
465
466         return self.connect_internal(name, None, chroot_prefix, rx_qlen,
467                                      do_async=False)
468
469     def disconnect(self):
470         """Detach from VPP."""
471         rv = self.transport.disconnect()
472         self.message_queue.put("terminate event thread")
473         return rv
474
475     def msg_handler_sync(self, msg):
476         """Process an incoming message from VPP in sync mode.
477
478         The message may be a reply or it may be an async notification.
479         """
480         r = self.decode_incoming_msg(msg)
481         if r is None:
482             return
483
484         # If we have a context, then use the context to find any
485         # request waiting for a reply
486         context = 0
487         if hasattr(r, 'context') and r.context > 0:
488             context = r.context
489
490         if context == 0:
491             # No context -> async notification that we feed to the callback
492             self.message_queue.put_nowait(r)
493         else:
494             raise VPPIOError(2, 'RPC reply message received in event handler')
495
496     def decode_incoming_msg(self, msg, no_type_conversion=False):
497         if not msg:
498             self.logger.warning('vpp_api.read failed')
499             return
500         (i, ci), size = self.header.unpack(msg, 0)
501         if self.id_names[i] == 'rx_thread_exit':
502             return
503
504         #
505         # Decode message and returns a tuple.
506         #
507         msgobj = self.id_msgdef[i]
508         if not msgobj:
509             raise VPPIOError(2, 'Reply message undefined')
510
511         r, size = msgobj.unpack(msg, ntc=no_type_conversion)
512         return r
513
514     def msg_handler_async(self, msg):
515         """Process a message from VPP in async mode.
516
517         In async mode, all messages are returned to the callback.
518         """
519         r = self.decode_incoming_msg(msg)
520         if r is None:
521             return
522
523         msgname = type(r).__name__
524
525         if self.event_callback:
526             self.event_callback(msgname, r)
527
528     def _control_ping(self, context):
529         """Send a ping command."""
530         self._call_vpp_async(self.control_ping_index,
531                              self.control_ping_msgdef,
532                              context=context)
533
534     def validate_args(self, msg, kwargs):
535         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
536         if d:
537             raise VPPValueError('Invalid argument {} to {}'
538                                 .format(list(d), msg.name))
539
540     def _call_vpp(self, i, msg, multipart, **kwargs):
541         """Given a message, send the message and await a reply.
542
543         msgdef - the message packing definition
544         i - the message type index
545         multipart - True if the message returns multiple
546         messages in return.
547         context - context number - chosen at random if not
548         supplied.
549         The remainder of the kwargs are the arguments to the API call.
550
551         The return value is the message or message array containing
552         the response.  It will raise an IOError exception if there was
553         no response within the timeout window.
554         """
555
556         if 'context' not in kwargs:
557             context = self.get_context()
558             kwargs['context'] = context
559         else:
560             context = kwargs['context']
561         kwargs['_vl_msg_id'] = i
562
563         no_type_conversion = kwargs.pop('_no_type_conversion', False)
564
565         try:
566             if self.transport.socket_index:
567                 kwargs['client_index'] = self.transport.socket_index
568         except AttributeError:
569             pass
570         self.validate_args(msg, kwargs)
571         b = msg.pack(kwargs)
572         self.transport.suspend()
573
574         self.transport.write(b)
575
576         if multipart:
577             # Send a ping after the request - we use its response
578             # to detect that we have seen all results.
579             self._control_ping(context)
580
581         # Block until we get a reply.
582         rl = []
583         while (True):
584             msg = self.transport.read()
585             if not msg:
586                 raise VPPIOError(2, 'VPP API client: read failed')
587             r = self.decode_incoming_msg(msg, no_type_conversion)
588             msgname = type(r).__name__
589             if context not in r or r.context == 0 or context != r.context:
590                 # Message being queued
591                 self.message_queue.put_nowait(r)
592                 continue
593
594             if not multipart:
595                 rl = r
596                 break
597             if msgname == 'control_ping_reply':
598                 break
599
600             rl.append(r)
601
602         self.transport.resume()
603
604         return rl
605
606     def _call_vpp_async(self, i, msg, **kwargs):
607         """Given a message, send the message and await a reply.
608
609         msgdef - the message packing definition
610         i - the message type index
611         context - context number - chosen at random if not
612         supplied.
613         The remainder of the kwargs are the arguments to the API call.
614         """
615         if 'context' not in kwargs:
616             context = self.get_context()
617             kwargs['context'] = context
618         else:
619             context = kwargs['context']
620         try:
621             if self.transport.socket_index:
622                 kwargs['client_index'] = self.transport.socket_index
623         except AttributeError:
624             kwargs['client_index'] = 0
625         kwargs['_vl_msg_id'] = i
626         b = msg.pack(kwargs)
627
628         self.transport.write(b)
629
630     def register_event_callback(self, callback):
631         """Register a callback for async messages.
632
633         This will be called for async notifications in sync mode,
634         and all messages in async mode.  In sync mode, replies to
635         requests will not come here.
636
637         callback is a fn(msg_type_name, msg_type) that will be
638         called when a message comes in.  While this function is
639         executing, note that (a) you are in a background thread and
640         may wish to use threading.Lock to protect your datastructures,
641         and (b) message processing from VPP will stop (so if you take
642         a long while about it you may provoke reply timeouts or cause
643         VPP to fill the RX buffer).  Passing None will disable the
644         callback.
645         """
646         self.event_callback = callback
647
648     def thread_msg_handler(self):
649         """Python thread calling the user registered message handler.
650
651         This is to emulate the old style event callback scheme. Modern
652         clients should provide their own thread to poll the event
653         queue.
654         """
655         while True:
656             r = self.message_queue.get()
657             if r == "terminate event thread":
658                 break
659             msgname = type(r).__name__
660             if self.event_callback:
661                 self.event_callback(msgname, r)
662
663
664 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4