API: Use string type instead of u8.
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import sys
20 import os
21 import logging
22 import collections
23 import struct
24 import json
25 import threading
26 import fnmatch
27 import weakref
28 import atexit
29 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
30 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
31
32 logger = logging.getLogger(__name__)
33
34 if sys.version[0] == '2':
35     import Queue as queue
36 else:
37     import queue as queue
38
39
40 class VppEnumType(type):
41     def __getattr__(cls, name):
42         t = vpp_get_type(name)
43         return t.enum
44
45
46 # Python3
47 # class VppEnum(metaclass=VppEnumType):
48 #    pass
49 class VppEnum(object):
50     __metaclass__ = VppEnumType
51
52
53 def vpp_atexit(vpp_weakref):
54     """Clean up VPP connection on shutdown."""
55     vpp_instance = vpp_weakref()
56     if vpp_instance and vpp_instance.transport.connected:
57         vpp_instance.logger.debug('Cleaning up VPP on exit')
58         vpp_instance.disconnect()
59
60 if sys.version[0] == '2':
61     def vpp_iterator(d):
62         return d.iteritems()
63 else:
64     def vpp_iterator(d):
65         return d.items()
66
67
68 def call_logger(msgdef, kwargs):
69     s = 'Calling {}('.format(msgdef.name)
70     for k, v in kwargs.items():
71         s += '{}:{} '.format(k, v)
72     s += ')'
73     return s
74
75
76 def return_logger(r):
77     s = 'Return from {}'.format(r)
78     return s
79
80
81 class VppApiDynamicMethodHolder(object):
82     pass
83
84
85 class FuncWrapper(object):
86     def __init__(self, func):
87         self._func = func
88         self.__name__ = func.__name__
89
90     def __call__(self, **kwargs):
91         return self._func(**kwargs)
92
93
94 class VPPApiError(Exception):
95     pass
96
97
98 class VPPNotImplementedError(NotImplementedError):
99     pass
100
101
102 class VPPIOError(IOError):
103     pass
104
105
106 class VPPRuntimeError(RuntimeError):
107     pass
108
109
110 class VPPValueError(ValueError):
111     pass
112
113
114 class VPP(object):
115     """VPP interface.
116
117     This class provides the APIs to VPP.  The APIs are loaded
118     from provided .api.json files and makes functions accordingly.
119     These functions are documented in the VPP .api files, as they
120     are dynamically created.
121
122     Additionally, VPP can send callback messages; this class
123     provides a means to register a callback function to receive
124     these messages in a background thread.
125     """
126     VPPApiError = VPPApiError
127     VPPRuntimeError = VPPRuntimeError
128     VPPValueError = VPPValueError
129     VPPNotImplementedError = VPPNotImplementedError
130     VPPIOError = VPPIOError
131
132     def process_json_file(self, apidef_file):
133         api = json.load(apidef_file)
134         types = {}
135         for t in api['enums']:
136             t[0] = 'vl_api_' + t[0] + '_t'
137             types[t[0]] = {'type': 'enum', 'data': t}
138         for t in api['unions']:
139             t[0] = 'vl_api_' + t[0] + '_t'
140             types[t[0]] = {'type': 'union', 'data': t}
141         for t in api['types']:
142             t[0] = 'vl_api_' + t[0] + '_t'
143             types[t[0]] = {'type': 'type', 'data': t}
144         for t, v in api['aliases'].items():
145             types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
146         self.services.update(api['services'])
147
148         i = 0
149         while True:
150             unresolved = {}
151             for k, v in types.items():
152                 t = v['data']
153                 if not vpp_get_type(k):
154                     if v['type'] == 'enum':
155                         try:
156                             VPPEnumType(t[0], t[1:])
157                         except ValueError:
158                             unresolved[k] = v
159                     elif v['type'] == 'union':
160                         try:
161                             VPPUnionType(t[0], t[1:])
162                         except ValueError:
163                             unresolved[k] = v
164                     elif v['type'] == 'type':
165                         try:
166                             VPPType(t[0], t[1:])
167                         except ValueError:
168                             unresolved[k] = v
169                     elif v['type'] == 'alias':
170                         try:
171                             VPPTypeAlias(k, t)
172                         except ValueError:
173                             unresolved[k] = v
174             if len(unresolved) == 0:
175                 break
176             if i > 3:
177                 raise VPPValueError('Unresolved type definitions {}'
178                                     .format(unresolved))
179             types = unresolved
180             i += 1
181
182         for m in api['messages']:
183             try:
184                 self.messages[m[0]] = VPPMessage(m[0], m[1:])
185             except VPPNotImplementedError:
186                 self.logger.error('Not implemented error for {}'.format(m[0]))
187
188     def __init__(self, apifiles=None, testmode=False, async_thread=True,
189                  logger=None, loglevel=None,
190                  read_timeout=5, use_socket=False,
191                  server_address='/run/vpp-api.sock'):
192         """Create a VPP API object.
193
194         apifiles is a list of files containing API
195         descriptions that will be loaded - methods will be
196         dynamically created reflecting these APIs.  If not
197         provided this will load the API files from VPP's
198         default install location.
199
200         logger, if supplied, is the logging logger object to log to.
201         loglevel, if supplied, is the log level this logger is set
202         to report at (from the loglevels in the logging module).
203         """
204         if logger is None:
205             logger = logging.getLogger(__name__)
206             if loglevel is not None:
207                 logger.setLevel(loglevel)
208         self.logger = logger
209
210         self.messages = {}
211         self.services = {}
212         self.id_names = []
213         self.id_msgdef = []
214         self.header = VPPType('header', [['u16', 'msgid'],
215                                          ['u32', 'client_index']])
216         self.apifiles = []
217         self.event_callback = None
218         self.message_queue = queue.Queue()
219         self.read_timeout = read_timeout
220         self.async_thread = async_thread
221
222         if use_socket:
223             from . vpp_transport_socket import VppTransport
224         else:
225             from . vpp_transport_shmem import VppTransport
226
227         if not apifiles:
228             # Pick up API definitions from default directory
229             try:
230                 apifiles = self.find_api_files()
231             except RuntimeError:
232                 # In test mode we don't care that we can't find the API files
233                 if testmode:
234                     apifiles = []
235                 else:
236                     raise VPPRuntimeError
237
238         for file in apifiles:
239             with open(file) as apidef_file:
240                 self.process_json_file(apidef_file)
241
242         self.apifiles = apifiles
243
244         # Basic sanity check
245         if len(self.messages) == 0 and not testmode:
246             raise VPPValueError(1, 'Missing JSON message definitions')
247
248         self.transport = VppTransport(self, read_timeout=read_timeout,
249                                       server_address=server_address)
250         # Make sure we allow VPP to clean up the message rings.
251         atexit.register(vpp_atexit, weakref.ref(self))
252
253     class ContextId(object):
254         """Thread-safe provider of unique context IDs."""
255         def __init__(self):
256             self.context = 0
257             self.lock = threading.Lock()
258
259         def __call__(self):
260             """Get a new unique (or, at least, not recently used) context."""
261             with self.lock:
262                 self.context += 1
263                 return self.context
264     get_context = ContextId()
265
266     def get_type(self, name):
267         return vpp_get_type(name)
268
269     @classmethod
270     def find_api_dir(cls):
271         """Attempt to find the best directory in which API definition
272         files may reside. If the value VPP_API_DIR exists in the environment
273         then it is first on the search list. If we're inside a recognized
274         location in a VPP source tree (src/scripts and src/vpp-api/python)
275         then entries from there to the likely locations in build-root are
276         added. Finally the location used by system packages is added.
277
278         :returns: A single directory name, or None if no such directory
279             could be found.
280         """
281         dirs = []
282
283         if 'VPP_API_DIR' in os.environ:
284             dirs.append(os.environ['VPP_API_DIR'])
285
286         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
287         # in which case, plot a course to likely places in the src tree
288         import __main__ as main
289         if hasattr(main, '__file__'):
290             # get the path of the calling script
291             localdir = os.path.dirname(os.path.realpath(main.__file__))
292         else:
293             # use cwd if there is no calling script
294             localdir = os.getcwd()
295         localdir_s = localdir.split(os.path.sep)
296
297         def dmatch(dir):
298             """Match dir against right-hand components of the script dir"""
299             d = dir.split('/')  # param 'dir' assumes a / separator
300             length = len(d)
301             return len(localdir_s) > length and localdir_s[-length:] == d
302
303         def sdir(srcdir, variant):
304             """Build a path from srcdir to the staged API files of
305             'variant'  (typically '' or '_debug')"""
306             # Since 'core' and 'plugin' files are staged
307             # in separate directories, we target the parent dir.
308             return os.path.sep.join((
309                 srcdir,
310                 'build-root',
311                 'install-vpp%s-native' % variant,
312                 'vpp',
313                 'share',
314                 'vpp',
315                 'api',
316             ))
317
318         srcdir = None
319         if dmatch('src/scripts'):
320             srcdir = os.path.sep.join(localdir_s[:-2])
321         elif dmatch('src/vpp-api/python'):
322             srcdir = os.path.sep.join(localdir_s[:-3])
323         elif dmatch('test'):
324             # we're apparently running tests
325             srcdir = os.path.sep.join(localdir_s[:-1])
326
327         if srcdir:
328             # we're in the source tree, try both the debug and release
329             # variants.
330             dirs.append(sdir(srcdir, '_debug'))
331             dirs.append(sdir(srcdir, ''))
332
333         # Test for staged copies of the scripts
334         # For these, since we explicitly know if we're running a debug versus
335         # release variant, target only the relevant directory
336         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
337             srcdir = os.path.sep.join(localdir_s[:-4])
338             dirs.append(sdir(srcdir, '_debug'))
339         if dmatch('build-root/install-vpp-native/vpp/bin'):
340             srcdir = os.path.sep.join(localdir_s[:-4])
341             dirs.append(sdir(srcdir, ''))
342
343         # finally, try the location system packages typically install into
344         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
345
346         # check the directories for existance; first one wins
347         for dir in dirs:
348             if os.path.isdir(dir):
349                 return dir
350
351         return None
352
353     @classmethod
354     def find_api_files(cls, api_dir=None, patterns='*'):
355         """Find API definition files from the given directory tree with the
356         given pattern. If no directory is given then find_api_dir() is used
357         to locate one. If no pattern is given then all definition files found
358         in the directory tree are used.
359
360         :param api_dir: A directory tree in which to locate API definition
361             files; subdirectories are descended into.
362             If this is None then find_api_dir() is called to discover it.
363         :param patterns: A list of patterns to use in each visited directory
364             when looking for files.
365             This can be a list/tuple object or a comma-separated string of
366             patterns. Each value in the list will have leading/trialing
367             whitespace stripped.
368             The pattern specifies the first part of the filename, '.api.json'
369             is appended.
370             The results are de-duplicated, thus overlapping patterns are fine.
371             If this is None it defaults to '*' meaning "all API files".
372         :returns: A list of file paths for the API files found.
373         """
374         if api_dir is None:
375             api_dir = cls.find_api_dir()
376             if api_dir is None:
377                 raise VPPApiError("api_dir cannot be located")
378
379         if isinstance(patterns, list) or isinstance(patterns, tuple):
380             patterns = [p.strip() + '.api.json' for p in patterns]
381         else:
382             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
383
384         api_files = []
385         for root, dirnames, files in os.walk(api_dir):
386             # iterate all given patterns and de-dup the result
387             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
388             for filename in files:
389                 api_files.append(os.path.join(root, filename))
390
391         return api_files
392
393     @property
394     def api(self):
395         if not hasattr(self, "_api"):
396             raise VPPApiError("Not connected, api definitions not available")
397         return self._api
398
399     def make_function(self, msg, i, multipart, do_async):
400         if (do_async):
401             def f(**kwargs):
402                 return self._call_vpp_async(i, msg, **kwargs)
403         else:
404             def f(**kwargs):
405                 return self._call_vpp(i, msg, multipart, **kwargs)
406
407         f.__name__ = str(msg.name)
408         f.__doc__ = ", ".join(["%s %s" %
409                                (msg.fieldtypes[j], k)
410                                for j, k in enumerate(msg.fields)])
411         return f
412
413     def _register_functions(self, do_async=False):
414         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
415         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
416         self._api = VppApiDynamicMethodHolder()
417         for name, msg in vpp_iterator(self.messages):
418             n = name + '_' + msg.crc[2:]
419             i = self.transport.get_msg_index(n.encode())
420             if i > 0:
421                 self.id_msgdef[i] = msg
422                 self.id_names[i] = name
423
424                 # Create function for client side messages.
425                 if name in self.services:
426                     if 'stream' in self.services[name] and \
427                        self.services[name]['stream']:
428                         multipart = True
429                     else:
430                         multipart = False
431                     f = self.make_function(msg, i, multipart, do_async)
432                     setattr(self._api, name, FuncWrapper(f))
433             else:
434                 self.logger.debug(
435                     'No such message type or failed CRC checksum: %s', n)
436
437     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
438                          do_async):
439         pfx = chroot_prefix.encode() if chroot_prefix else None
440
441         rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
442         if rv != 0:
443             raise VPPIOError(2, 'Connect failed')
444         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
445         self._register_functions(do_async=do_async)
446
447         # Initialise control ping
448         crc = self.messages['control_ping'].crc
449         self.control_ping_index = self.transport.get_msg_index(
450             ('control_ping' + '_' + crc[2:]).encode())
451         self.control_ping_msgdef = self.messages['control_ping']
452         if self.async_thread:
453             self.event_thread = threading.Thread(
454                 target=self.thread_msg_handler)
455             self.event_thread.daemon = True
456             self.event_thread.start()
457         return rv
458
459     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
460         """Attach to VPP.
461
462         name - the name of the client.
463         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
464         do_async - if true, messages are sent without waiting for a reply
465         rx_qlen - the length of the VPP message receive queue between
466         client and server.
467         """
468         msg_handler = self.transport.get_callback(do_async)
469         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
470                                      do_async)
471
472     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
473         """Attach to VPP in synchronous mode. Application must poll for events.
474
475         name - the name of the client.
476         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
477         rx_qlen - the length of the VPP message receive queue between
478         client and server.
479         """
480
481         return self.connect_internal(name, None, chroot_prefix, rx_qlen,
482                                      do_async=False)
483
484     def disconnect(self):
485         """Detach from VPP."""
486         rv = self.transport.disconnect()
487         self.message_queue.put("terminate event thread")
488         return rv
489
490     def msg_handler_sync(self, msg):
491         """Process an incoming message from VPP in sync mode.
492
493         The message may be a reply or it may be an async notification.
494         """
495         r = self.decode_incoming_msg(msg)
496         if r is None:
497             return
498
499         # If we have a context, then use the context to find any
500         # request waiting for a reply
501         context = 0
502         if hasattr(r, 'context') and r.context > 0:
503             context = r.context
504
505         if context == 0:
506             # No context -> async notification that we feed to the callback
507             self.message_queue.put_nowait(r)
508         else:
509             raise VPPIOError(2, 'RPC reply message received in event handler')
510
511     def has_context(self, msg):
512         if len(msg) < 10:
513             return False
514
515         header = VPPType('header_with_context', [['u16', 'msgid'],
516                                                  ['u32', 'client_index'],
517                                                  ['u32', 'context']])
518
519         (i, ci, context), size = header.unpack(msg, 0)
520         if self.id_names[i] == 'rx_thread_exit':
521             return
522
523         #
524         # Decode message and returns a tuple.
525         #
526         msgobj = self.id_msgdef[i]
527         if 'context' in msgobj.field_by_name and context >= 0:
528             return True
529         return False
530
531     def decode_incoming_msg(self, msg, no_type_conversion=False):
532         if not msg:
533             self.logger.warning('vpp_api.read failed')
534             return
535
536         (i, ci), size = self.header.unpack(msg, 0)
537         if self.id_names[i] == 'rx_thread_exit':
538             return
539
540         #
541         # Decode message and returns a tuple.
542         #
543         msgobj = self.id_msgdef[i]
544         if not msgobj:
545             raise VPPIOError(2, 'Reply message undefined')
546
547         r, size = msgobj.unpack(msg, ntc=no_type_conversion)
548         return r
549
550     def msg_handler_async(self, msg):
551         """Process a message from VPP in async mode.
552
553         In async mode, all messages are returned to the callback.
554         """
555         r = self.decode_incoming_msg(msg)
556         if r is None:
557             return
558
559         msgname = type(r).__name__
560
561         if self.event_callback:
562             self.event_callback(msgname, r)
563
564     def _control_ping(self, context):
565         """Send a ping command."""
566         self._call_vpp_async(self.control_ping_index,
567                              self.control_ping_msgdef,
568                              context=context)
569
570     def validate_args(self, msg, kwargs):
571         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
572         if d:
573             raise VPPValueError('Invalid argument {} to {}'
574                                 .format(list(d), msg.name))
575
576     def _call_vpp(self, i, msgdef, multipart, **kwargs):
577         """Given a message, send the message and await a reply.
578
579         msgdef - the message packing definition
580         i - the message type index
581         multipart - True if the message returns multiple
582         messages in return.
583         context - context number - chosen at random if not
584         supplied.
585         The remainder of the kwargs are the arguments to the API call.
586
587         The return value is the message or message array containing
588         the response.  It will raise an IOError exception if there was
589         no response within the timeout window.
590         """
591
592         if 'context' not in kwargs:
593             context = self.get_context()
594             kwargs['context'] = context
595         else:
596             context = kwargs['context']
597         kwargs['_vl_msg_id'] = i
598
599         no_type_conversion = kwargs.pop('_no_type_conversion', False)
600
601         try:
602             if self.transport.socket_index:
603                 kwargs['client_index'] = self.transport.socket_index
604         except AttributeError:
605             pass
606         self.validate_args(msgdef, kwargs)
607
608         logging.debug(call_logger(msgdef, kwargs))
609
610         b = msgdef.pack(kwargs)
611         self.transport.suspend()
612
613         self.transport.write(b)
614
615         if multipart:
616             # Send a ping after the request - we use its response
617             # to detect that we have seen all results.
618             self._control_ping(context)
619
620         # Block until we get a reply.
621         rl = []
622         while (True):
623             msg = self.transport.read()
624             if not msg:
625                 raise VPPIOError(2, 'VPP API client: read failed')
626             r = self.decode_incoming_msg(msg, no_type_conversion)
627             msgname = type(r).__name__
628             if context not in r or r.context == 0 or context != r.context:
629                 # Message being queued
630                 self.message_queue.put_nowait(r)
631                 continue
632
633             if not multipart:
634                 rl = r
635                 break
636             if msgname == 'control_ping_reply':
637                 break
638
639             rl.append(r)
640
641         self.transport.resume()
642
643         logger.debug(return_logger(rl))
644         return rl
645
646     def _call_vpp_async(self, i, msg, **kwargs):
647         """Given a message, send the message and await a reply.
648
649         msgdef - the message packing definition
650         i - the message type index
651         context - context number - chosen at random if not
652         supplied.
653         The remainder of the kwargs are the arguments to the API call.
654         """
655         if 'context' not in kwargs:
656             context = self.get_context()
657             kwargs['context'] = context
658         else:
659             context = kwargs['context']
660         try:
661             if self.transport.socket_index:
662                 kwargs['client_index'] = self.transport.socket_index
663         except AttributeError:
664             kwargs['client_index'] = 0
665         kwargs['_vl_msg_id'] = i
666         b = msg.pack(kwargs)
667
668         self.transport.write(b)
669
670     def register_event_callback(self, callback):
671         """Register a callback for async messages.
672
673         This will be called for async notifications in sync mode,
674         and all messages in async mode.  In sync mode, replies to
675         requests will not come here.
676
677         callback is a fn(msg_type_name, msg_type) that will be
678         called when a message comes in.  While this function is
679         executing, note that (a) you are in a background thread and
680         may wish to use threading.Lock to protect your datastructures,
681         and (b) message processing from VPP will stop (so if you take
682         a long while about it you may provoke reply timeouts or cause
683         VPP to fill the RX buffer).  Passing None will disable the
684         callback.
685         """
686         self.event_callback = callback
687
688     def thread_msg_handler(self):
689         """Python thread calling the user registered message handler.
690
691         This is to emulate the old style event callback scheme. Modern
692         clients should provide their own thread to poll the event
693         queue.
694         """
695         while True:
696             r = self.message_queue.get()
697             if r == "terminate event thread":
698                 break
699             msgname = type(r).__name__
700             if self.event_callback:
701                 self.event_callback(msgname, r)
702
703
704 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4