api: memclnt api use string type.
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import ctypes
20 import sys
21 import multiprocessing as mp
22 import os
23 import logging
24 import functools
25 import json
26 import threading
27 import fnmatch
28 import weakref
29 import atexit
30 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType
31 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
32
33 if sys.version[0] == '2':
34     import Queue as queue
35 else:
36     import queue as queue
37
38 __all__ = ('FuncWrapper', 'VPP', 'VppApiDynamicMethodHolder',
39            'VppEnum', 'VppEnumType',
40            'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
41            'VPPApiClient', )
42
43
44 def metaclass(metaclass):
45     @functools.wraps(metaclass)
46     def wrapper(cls):
47         return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
48
49     return wrapper
50
51
52 class VppEnumType(type):
53     def __getattr__(cls, name):
54         t = vpp_get_type(name)
55         return t.enum
56
57
58 @metaclass(VppEnumType)
59 class VppEnum(object):
60     pass
61
62
63 def vpp_atexit(vpp_weakref):
64     """Clean up VPP connection on shutdown."""
65     vpp_instance = vpp_weakref()
66     if vpp_instance and vpp_instance.transport.connected:
67         vpp_instance.logger.debug('Cleaning up VPP on exit')
68         vpp_instance.disconnect()
69
70
71 if sys.version[0] == '2':
72     def vpp_iterator(d):
73         return d.iteritems()
74 else:
75     def vpp_iterator(d):
76         return d.items()
77
78
79 class VppApiDynamicMethodHolder(object):
80     pass
81
82
83 class FuncWrapper(object):
84     def __init__(self, func):
85         self._func = func
86         self.__name__ = func.__name__
87         self.__doc__ = func.__doc__
88
89     def __call__(self, **kwargs):
90         return self._func(**kwargs)
91
92     def __repr__(self):
93         return '<FuncWrapper(func=<%s(%s)>)>' % (self.__name__, self.__doc__)
94
95
96 class VPPApiError(Exception):
97     pass
98
99
100 class VPPNotImplementedError(NotImplementedError):
101     pass
102
103
104 class VPPIOError(IOError):
105     pass
106
107
108 class VPPRuntimeError(RuntimeError):
109     pass
110
111
112 class VPPValueError(ValueError):
113     pass
114
115 class VPPApiJSONFiles(object):
116     @classmethod
117     def find_api_dir(cls, dirs):
118         """Attempt to find the best directory in which API definition
119         files may reside. If the value VPP_API_DIR exists in the environment
120         then it is first on the search list. If we're inside a recognized
121         location in a VPP source tree (src/scripts and src/vpp-api/python)
122         then entries from there to the likely locations in build-root are
123         added. Finally the location used by system packages is added.
124
125         :returns: A single directory name, or None if no such directory
126             could be found.
127         """
128
129         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
130         # in which case, plot a course to likely places in the src tree
131         import __main__ as main
132         if hasattr(main, '__file__'):
133             # get the path of the calling script
134             localdir = os.path.dirname(os.path.realpath(main.__file__))
135         else:
136             # use cwd if there is no calling script
137             localdir = os.getcwd()
138         localdir_s = localdir.split(os.path.sep)
139
140         def dmatch(dir):
141             """Match dir against right-hand components of the script dir"""
142             d = dir.split('/')  # param 'dir' assumes a / separator
143             length = len(d)
144             return len(localdir_s) > length and localdir_s[-length:] == d
145
146         def sdir(srcdir, variant):
147             """Build a path from srcdir to the staged API files of
148             'variant'  (typically '' or '_debug')"""
149             # Since 'core' and 'plugin' files are staged
150             # in separate directories, we target the parent dir.
151             return os.path.sep.join((
152                 srcdir,
153                 'build-root',
154                 'install-vpp%s-native' % variant,
155                 'vpp',
156                 'share',
157                 'vpp',
158                 'api',
159             ))
160
161         srcdir = None
162         if dmatch('src/scripts'):
163             srcdir = os.path.sep.join(localdir_s[:-2])
164         elif dmatch('src/vpp-api/python'):
165             srcdir = os.path.sep.join(localdir_s[:-3])
166         elif dmatch('test'):
167             # we're apparently running tests
168             srcdir = os.path.sep.join(localdir_s[:-1])
169
170         if srcdir:
171             # we're in the source tree, try both the debug and release
172             # variants.
173             dirs.append(sdir(srcdir, '_debug'))
174             dirs.append(sdir(srcdir, ''))
175
176         # Test for staged copies of the scripts
177         # For these, since we explicitly know if we're running a debug versus
178         # release variant, target only the relevant directory
179         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
180             srcdir = os.path.sep.join(localdir_s[:-4])
181             dirs.append(sdir(srcdir, '_debug'))
182         if dmatch('build-root/install-vpp-native/vpp/bin'):
183             srcdir = os.path.sep.join(localdir_s[:-4])
184             dirs.append(sdir(srcdir, ''))
185
186         # finally, try the location system packages typically install into
187         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
188
189         # check the directories for existence; first one wins
190         for dir in dirs:
191             if os.path.isdir(dir):
192                 return dir
193
194         return None
195
196     @classmethod
197     def find_api_files(cls, api_dir=None, patterns='*'):
198         """Find API definition files from the given directory tree with the
199         given pattern. If no directory is given then find_api_dir() is used
200         to locate one. If no pattern is given then all definition files found
201         in the directory tree are used.
202
203         :param api_dir: A directory tree in which to locate API definition
204             files; subdirectories are descended into.
205             If this is None then find_api_dir() is called to discover it.
206         :param patterns: A list of patterns to use in each visited directory
207             when looking for files.
208             This can be a list/tuple object or a comma-separated string of
209             patterns. Each value in the list will have leading/trialing
210             whitespace stripped.
211             The pattern specifies the first part of the filename, '.api.json'
212             is appended.
213             The results are de-duplicated, thus overlapping patterns are fine.
214             If this is None it defaults to '*' meaning "all API files".
215         :returns: A list of file paths for the API files found.
216         """
217         if api_dir is None:
218             api_dir = cls.find_api_dir([])
219             if api_dir is None:
220                 raise VPPApiError("api_dir cannot be located")
221
222         if isinstance(patterns, list) or isinstance(patterns, tuple):
223             patterns = [p.strip() + '.api.json' for p in patterns]
224         else:
225             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
226
227         api_files = []
228         for root, dirnames, files in os.walk(api_dir):
229             # iterate all given patterns and de-dup the result
230             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
231             for filename in files:
232                 api_files.append(os.path.join(root, filename))
233
234         return api_files
235
236     @classmethod
237     def process_json_file(self, apidef_file):
238         api = json.load(apidef_file)
239         types = {}
240         services = {}
241         messages = {}
242         for t in api['enums']:
243             t[0] = 'vl_api_' + t[0] + '_t'
244             types[t[0]] = {'type': 'enum', 'data': t}
245         for t in api['unions']:
246             t[0] = 'vl_api_' + t[0] + '_t'
247             types[t[0]] = {'type': 'union', 'data': t}
248         for t in api['types']:
249             t[0] = 'vl_api_' + t[0] + '_t'
250             types[t[0]] = {'type': 'type', 'data': t}
251         for t, v in api['aliases'].items():
252             types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
253         services.update(api['services'])
254
255         i = 0
256         while True:
257             unresolved = {}
258             for k, v in types.items():
259                 t = v['data']
260                 if not vpp_get_type(k):
261                     if v['type'] == 'enum':
262                         try:
263                             VPPEnumType(t[0], t[1:])
264                         except ValueError:
265                             unresolved[k] = v
266                     elif v['type'] == 'union':
267                         try:
268                             VPPUnionType(t[0], t[1:])
269                         except ValueError:
270                             unresolved[k] = v
271                     elif v['type'] == 'type':
272                         try:
273                             VPPType(t[0], t[1:])
274                         except ValueError:
275                             unresolved[k] = v
276                     elif v['type'] == 'alias':
277                         try:
278                             VPPTypeAlias(k, t)
279                         except ValueError:
280                             unresolved[k] = v
281             if len(unresolved) == 0:
282                 break
283             if i > 3:
284                 raise VPPValueError('Unresolved type definitions {}'
285                                     .format(unresolved))
286             types = unresolved
287             i += 1
288
289         for m in api['messages']:
290             try:
291                 messages[m[0]] = VPPMessage(m[0], m[1:])
292             except VPPNotImplementedError:
293                 ### OLE FIXME
294                 self.logger.error('Not implemented error for {}'.format(m[0]))
295         return messages, services
296
297 class VPPApiClient(object):
298     """VPP interface.
299
300     This class provides the APIs to VPP.  The APIs are loaded
301     from provided .api.json files and makes functions accordingly.
302     These functions are documented in the VPP .api files, as they
303     are dynamically created.
304
305     Additionally, VPP can send callback messages; this class
306     provides a means to register a callback function to receive
307     these messages in a background thread.
308     """
309     apidir = None
310     VPPApiError = VPPApiError
311     VPPRuntimeError = VPPRuntimeError
312     VPPValueError = VPPValueError
313     VPPNotImplementedError = VPPNotImplementedError
314     VPPIOError = VPPIOError
315
316
317     def __init__(self, apifiles=None, testmode=False, async_thread=True,
318                  logger=None, loglevel=None,
319                  read_timeout=5, use_socket=False,
320                  server_address='/run/vpp/api.sock'):
321         """Create a VPP API object.
322
323         apifiles is a list of files containing API
324         descriptions that will be loaded - methods will be
325         dynamically created reflecting these APIs.  If not
326         provided this will load the API files from VPP's
327         default install location.
328
329         logger, if supplied, is the logging logger object to log to.
330         loglevel, if supplied, is the log level this logger is set
331         to report at (from the loglevels in the logging module).
332         """
333         if logger is None:
334             logger = logging.getLogger(
335                 "{}.{}".format(__name__, self.__class__.__name__))
336             if loglevel is not None:
337                 logger.setLevel(loglevel)
338         self.logger = logger
339
340         self.messages = {}
341         self.services = {}
342         self.id_names = []
343         self.id_msgdef = []
344         self.header = VPPType('header', [['u16', 'msgid'],
345                                          ['u32', 'client_index']])
346         self.apifiles = []
347         self.event_callback = None
348         self.message_queue = queue.Queue()
349         self.read_timeout = read_timeout
350         self.async_thread = async_thread
351         self.event_thread = None
352         self.testmode = testmode
353         self.use_socket = use_socket
354         self.server_address = server_address
355         self._apifiles = apifiles
356
357         if use_socket:
358             from . vpp_transport_socket import VppTransport
359         else:
360             from . vpp_transport_shmem import VppTransport
361
362         if not apifiles:
363             # Pick up API definitions from default directory
364             try:
365                 apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
366             except RuntimeError:
367                 # In test mode we don't care that we can't find the API files
368                 if testmode:
369                     apifiles = []
370                 else:
371                     raise VPPRuntimeError
372
373         for file in apifiles:
374             with open(file) as apidef_file:
375                 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
376                 self.messages.update(m)
377                 self.services.update(s)
378
379         self.apifiles = apifiles
380
381         # Basic sanity check
382         if len(self.messages) == 0 and not testmode:
383             raise VPPValueError(1, 'Missing JSON message definitions')
384
385         self.transport = VppTransport(self, read_timeout=read_timeout,
386                                       server_address=server_address)
387         # Make sure we allow VPP to clean up the message rings.
388         atexit.register(vpp_atexit, weakref.ref(self))
389
390     def get_function(self, name):
391         return getattr(self._api, name)
392
393
394     class ContextId(object):
395         """Multiprocessing-safe provider of unique context IDs."""
396         def __init__(self):
397             self.context = mp.Value(ctypes.c_uint, 0)
398             self.lock = mp.Lock()
399
400         def __call__(self):
401             """Get a new unique (or, at least, not recently used) context."""
402             with self.lock:
403                 self.context.value += 1
404                 return self.context.value
405     get_context = ContextId()
406
407     def get_type(self, name):
408         return vpp_get_type(name)
409
410     @property
411     def api(self):
412         if not hasattr(self, "_api"):
413             raise VPPApiError("Not connected, api definitions not available")
414         return self._api
415
416     def make_function(self, msg, i, multipart, do_async):
417         if (do_async):
418             def f(**kwargs):
419                 return self._call_vpp_async(i, msg, **kwargs)
420         else:
421             def f(**kwargs):
422                 return self._call_vpp(i, msg, multipart, **kwargs)
423
424         f.__name__ = str(msg.name)
425         f.__doc__ = ", ".join(["%s %s" %
426                                (msg.fieldtypes[j], k)
427                                for j, k in enumerate(msg.fields)])
428         f.msg = msg
429
430         return f
431
432     def _register_functions(self, do_async=False):
433         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
434         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
435         self._api = VppApiDynamicMethodHolder()
436         for name, msg in vpp_iterator(self.messages):
437             n = name + '_' + msg.crc[2:]
438             i = self.transport.get_msg_index(n)
439             if i > 0:
440                 self.id_msgdef[i] = msg
441                 self.id_names[i] = name
442
443                 # Create function for client side messages.
444                 if name in self.services:
445                     if 'stream' in self.services[name] and \
446                        self.services[name]['stream']:
447                         multipart = True
448                     else:
449                         multipart = False
450                     f = self.make_function(msg, i, multipart, do_async)
451                     setattr(self._api, name, FuncWrapper(f))
452             else:
453                 self.logger.debug(
454                     'No such message type or failed CRC checksum: %s', n)
455
456     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
457                          do_async):
458         pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
459
460         rv = self.transport.connect(name, pfx,
461                                     msg_handler, rx_qlen)
462         if rv != 0:
463             raise VPPIOError(2, 'Connect failed')
464         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
465         self._register_functions(do_async=do_async)
466
467         # Initialise control ping
468         crc = self.messages['control_ping'].crc
469         self.control_ping_index = self.transport.get_msg_index(
470             ('control_ping' + '_' + crc[2:]))
471         self.control_ping_msgdef = self.messages['control_ping']
472         if self.async_thread:
473             self.event_thread = threading.Thread(
474                 target=self.thread_msg_handler)
475             self.event_thread.daemon = True
476             self.event_thread.start()
477         else:
478             self.event_thread = None
479         return rv
480
481     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
482         """Attach to VPP.
483
484         name - the name of the client.
485         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
486         do_async - if true, messages are sent without waiting for a reply
487         rx_qlen - the length of the VPP message receive queue between
488         client and server.
489         """
490         msg_handler = self.transport.get_callback(do_async)
491         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
492                                      do_async)
493
494     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
495         """Attach to VPP in synchronous mode. Application must poll for events.
496
497         name - the name of the client.
498         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
499         rx_qlen - the length of the VPP message receive queue between
500         client and server.
501         """
502
503         return self.connect_internal(name, None, chroot_prefix, rx_qlen,
504                                      do_async=False)
505
506     def disconnect(self):
507         """Detach from VPP."""
508         rv = self.transport.disconnect()
509         if self.event_thread is not None:
510             self.message_queue.put("terminate event thread")
511         return rv
512
513     def msg_handler_sync(self, msg):
514         """Process an incoming message from VPP in sync mode.
515
516         The message may be a reply or it may be an async notification.
517         """
518         r = self.decode_incoming_msg(msg)
519         if r is None:
520             return
521
522         # If we have a context, then use the context to find any
523         # request waiting for a reply
524         context = 0
525         if hasattr(r, 'context') and r.context > 0:
526             context = r.context
527
528         if context == 0:
529             # No context -> async notification that we feed to the callback
530             self.message_queue.put_nowait(r)
531         else:
532             raise VPPIOError(2, 'RPC reply message received in event handler')
533
534     def has_context(self, msg):
535         if len(msg) < 10:
536             return False
537
538         header = VPPType('header_with_context', [['u16', 'msgid'],
539                                                  ['u32', 'client_index'],
540                                                  ['u32', 'context']])
541
542         (i, ci, context), size = header.unpack(msg, 0)
543         if self.id_names[i] == 'rx_thread_exit':
544             return
545
546         #
547         # Decode message and returns a tuple.
548         #
549         msgobj = self.id_msgdef[i]
550         if 'context' in msgobj.field_by_name and context >= 0:
551             return True
552         return False
553
554     def decode_incoming_msg(self, msg, no_type_conversion=False):
555         if not msg:
556             self.logger.warning('vpp_api.read failed')
557             return
558
559         (i, ci), size = self.header.unpack(msg, 0)
560         if self.id_names[i] == 'rx_thread_exit':
561             return
562
563         #
564         # Decode message and returns a tuple.
565         #
566         msgobj = self.id_msgdef[i]
567         if not msgobj:
568             raise VPPIOError(2, 'Reply message undefined')
569
570         r, size = msgobj.unpack(msg, ntc=no_type_conversion)
571         return r
572
573     def msg_handler_async(self, msg):
574         """Process a message from VPP in async mode.
575
576         In async mode, all messages are returned to the callback.
577         """
578         r = self.decode_incoming_msg(msg)
579         if r is None:
580             return
581
582         msgname = type(r).__name__
583
584         if self.event_callback:
585             self.event_callback(msgname, r)
586
587     def _control_ping(self, context):
588         """Send a ping command."""
589         self._call_vpp_async(self.control_ping_index,
590                              self.control_ping_msgdef,
591                              context=context)
592
593     def validate_args(self, msg, kwargs):
594         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
595         if d:
596             raise VPPValueError('Invalid argument {} to {}'
597                                 .format(list(d), msg.name))
598
599     def _call_vpp(self, i, msgdef, multipart, **kwargs):
600         """Given a message, send the message and await a reply.
601
602         msgdef - the message packing definition
603         i - the message type index
604         multipart - True if the message returns multiple
605         messages in return.
606         context - context number - chosen at random if not
607         supplied.
608         The remainder of the kwargs are the arguments to the API call.
609
610         The return value is the message or message array containing
611         the response.  It will raise an IOError exception if there was
612         no response within the timeout window.
613         """
614
615         if 'context' not in kwargs:
616             context = self.get_context()
617             kwargs['context'] = context
618         else:
619             context = kwargs['context']
620         kwargs['_vl_msg_id'] = i
621
622         no_type_conversion = kwargs.pop('_no_type_conversion', False)
623
624         try:
625             if self.transport.socket_index:
626                 kwargs['client_index'] = self.transport.socket_index
627         except AttributeError:
628             pass
629         self.validate_args(msgdef, kwargs)
630
631         s = 'Calling {}({})'.format(msgdef.name,
632             ','.join(['{!r}:{!r}'.format(k, v) for k, v in kwargs.items()]))
633         self.logger.debug(s)
634
635         b = msgdef.pack(kwargs)
636         self.transport.suspend()
637
638         self.transport.write(b)
639
640         if multipart:
641             # Send a ping after the request - we use its response
642             # to detect that we have seen all results.
643             self._control_ping(context)
644
645         # Block until we get a reply.
646         rl = []
647         while (True):
648             msg = self.transport.read()
649             if not msg:
650                 raise VPPIOError(2, 'VPP API client: read failed')
651             r = self.decode_incoming_msg(msg, no_type_conversion)
652             msgname = type(r).__name__
653             if context not in r or r.context == 0 or context != r.context:
654                 # Message being queued
655                 self.message_queue.put_nowait(r)
656                 continue
657
658             if not multipart:
659                 rl = r
660                 break
661             if msgname == 'control_ping_reply':
662                 break
663
664             rl.append(r)
665
666         self.transport.resume()
667
668         self.logger.debug('Return from {!r}'.format(r))
669         return rl
670
671     def _call_vpp_async(self, i, msg, **kwargs):
672         """Given a message, send the message and await a reply.
673
674         msgdef - the message packing definition
675         i - the message type index
676         context - context number - chosen at random if not
677         supplied.
678         The remainder of the kwargs are the arguments to the API call.
679         """
680         if 'context' not in kwargs:
681             context = self.get_context()
682             kwargs['context'] = context
683         else:
684             context = kwargs['context']
685         try:
686             if self.transport.socket_index:
687                 kwargs['client_index'] = self.transport.socket_index
688         except AttributeError:
689             kwargs['client_index'] = 0
690         kwargs['_vl_msg_id'] = i
691         b = msg.pack(kwargs)
692
693         self.transport.write(b)
694
695     def register_event_callback(self, callback):
696         """Register a callback for async messages.
697
698         This will be called for async notifications in sync mode,
699         and all messages in async mode.  In sync mode, replies to
700         requests will not come here.
701
702         callback is a fn(msg_type_name, msg_type) that will be
703         called when a message comes in.  While this function is
704         executing, note that (a) you are in a background thread and
705         may wish to use threading.Lock to protect your datastructures,
706         and (b) message processing from VPP will stop (so if you take
707         a long while about it you may provoke reply timeouts or cause
708         VPP to fill the RX buffer).  Passing None will disable the
709         callback.
710         """
711         self.event_callback = callback
712
713     def thread_msg_handler(self):
714         """Python thread calling the user registered message handler.
715
716         This is to emulate the old style event callback scheme. Modern
717         clients should provide their own thread to poll the event
718         queue.
719         """
720         while True:
721             r = self.message_queue.get()
722             if r == "terminate event thread":
723                 break
724             msgname = type(r).__name__
725             if self.event_callback:
726                 self.event_callback(msgname, r)
727
728     def __repr__(self):
729         return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
730                "logger=%s, read_timeout=%s, use_socket=%s, " \
731                "server_address='%s'>" % (
732                    self._apifiles, self.testmode, self.async_thread,
733                    self.logger, self.read_timeout, self.use_socket,
734                    self.server_address)
735
736
737 # Provide the old name for backward compatibility.
738 VPP = VPPApiClient
739
740 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4