vpp_papi: Context_id allocator for running forked.
[vpp.git] / src / vpp-api / python / vpp_papi / vpp_papi.py
1 #!/usr/bin/env python
2 #
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 from __future__ import print_function
18 from __future__ import absolute_import
19 import ctypes
20 import sys
21 import multiprocessing as mp
22 import os
23 import logging
24 import collections
25 import struct
26 import functools
27 import json
28 import threading
29 import fnmatch
30 import weakref
31 import atexit
32 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
33 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
34 from . macaddress import MACAddress, mac_pton, mac_ntop
35
36 logger = logging.getLogger(__name__)
37
38 if sys.version[0] == '2':
39     import Queue as queue
40 else:
41     import queue as queue
42
43
44 def metaclass(metaclass):
45     @functools.wraps(metaclass)
46     def wrapper(cls):
47         return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
48
49     return wrapper
50
51
52 class VppEnumType(type):
53     def __getattr__(cls, name):
54         t = vpp_get_type(name)
55         return t.enum
56
57
58 @metaclass(VppEnumType)
59 class VppEnum(object):
60     pass
61
62
63 def vpp_atexit(vpp_weakref):
64     """Clean up VPP connection on shutdown."""
65     vpp_instance = vpp_weakref()
66     if vpp_instance and vpp_instance.transport.connected:
67         vpp_instance.logger.debug('Cleaning up VPP on exit')
68         vpp_instance.disconnect()
69
70
71 if sys.version[0] == '2':
72     def vpp_iterator(d):
73         return d.iteritems()
74 else:
75     def vpp_iterator(d):
76         return d.items()
77
78
79 def call_logger(msgdef, kwargs):
80     s = 'Calling {}('.format(msgdef.name)
81     for k, v in kwargs.items():
82         s += '{}:{} '.format(k, v)
83     s += ')'
84     return s
85
86
87 def return_logger(r):
88     s = 'Return from {}'.format(r)
89     return s
90
91
92 class VppApiDynamicMethodHolder(object):
93     pass
94
95
96 class FuncWrapper(object):
97     def __init__(self, func):
98         self._func = func
99         self.__name__ = func.__name__
100         self.__doc__ = func.__doc__
101
102     def __call__(self, **kwargs):
103         return self._func(**kwargs)
104
105
106 class VPPApiError(Exception):
107     pass
108
109
110 class VPPNotImplementedError(NotImplementedError):
111     pass
112
113
114 class VPPIOError(IOError):
115     pass
116
117
118 class VPPRuntimeError(RuntimeError):
119     pass
120
121
122 class VPPValueError(ValueError):
123     pass
124
125
126 class VPPApiClient(object):
127     """VPP interface.
128
129     This class provides the APIs to VPP.  The APIs are loaded
130     from provided .api.json files and makes functions accordingly.
131     These functions are documented in the VPP .api files, as they
132     are dynamically created.
133
134     Additionally, VPP can send callback messages; this class
135     provides a means to register a callback function to receive
136     these messages in a background thread.
137     """
138     apidir = None
139     VPPApiError = VPPApiError
140     VPPRuntimeError = VPPRuntimeError
141     VPPValueError = VPPValueError
142     VPPNotImplementedError = VPPNotImplementedError
143     VPPIOError = VPPIOError
144
145     def process_json_file(self, apidef_file):
146         api = json.load(apidef_file)
147         types = {}
148         for t in api['enums']:
149             t[0] = 'vl_api_' + t[0] + '_t'
150             types[t[0]] = {'type': 'enum', 'data': t}
151         for t in api['unions']:
152             t[0] = 'vl_api_' + t[0] + '_t'
153             types[t[0]] = {'type': 'union', 'data': t}
154         for t in api['types']:
155             t[0] = 'vl_api_' + t[0] + '_t'
156             types[t[0]] = {'type': 'type', 'data': t}
157         for t, v in api['aliases'].items():
158             types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
159         self.services.update(api['services'])
160
161         i = 0
162         while True:
163             unresolved = {}
164             for k, v in types.items():
165                 t = v['data']
166                 if not vpp_get_type(k):
167                     if v['type'] == 'enum':
168                         try:
169                             VPPEnumType(t[0], t[1:])
170                         except ValueError:
171                             unresolved[k] = v
172                     elif v['type'] == 'union':
173                         try:
174                             VPPUnionType(t[0], t[1:])
175                         except ValueError:
176                             unresolved[k] = v
177                     elif v['type'] == 'type':
178                         try:
179                             VPPType(t[0], t[1:])
180                         except ValueError:
181                             unresolved[k] = v
182                     elif v['type'] == 'alias':
183                         try:
184                             VPPTypeAlias(k, t)
185                         except ValueError:
186                             unresolved[k] = v
187             if len(unresolved) == 0:
188                 break
189             if i > 3:
190                 raise VPPValueError('Unresolved type definitions {}'
191                                     .format(unresolved))
192             types = unresolved
193             i += 1
194
195         for m in api['messages']:
196             try:
197                 self.messages[m[0]] = VPPMessage(m[0], m[1:])
198             except VPPNotImplementedError:
199                 self.logger.error('Not implemented error for {}'.format(m[0]))
200
201     def __init__(self, apifiles=None, testmode=False, async_thread=True,
202                  logger=None, loglevel=None,
203                  read_timeout=5, use_socket=False,
204                  server_address='/run/vpp-api.sock'):
205         """Create a VPP API object.
206
207         apifiles is a list of files containing API
208         descriptions that will be loaded - methods will be
209         dynamically created reflecting these APIs.  If not
210         provided this will load the API files from VPP's
211         default install location.
212
213         logger, if supplied, is the logging logger object to log to.
214         loglevel, if supplied, is the log level this logger is set
215         to report at (from the loglevels in the logging module).
216         """
217         if logger is None:
218             logger = logging.getLogger(__name__)
219             if loglevel is not None:
220                 logger.setLevel(loglevel)
221         self.logger = logger
222
223         self.messages = {}
224         self.services = {}
225         self.id_names = []
226         self.id_msgdef = []
227         self.header = VPPType('header', [['u16', 'msgid'],
228                                          ['u32', 'client_index']])
229         self.apifiles = []
230         self.event_callback = None
231         self.message_queue = queue.Queue()
232         self.read_timeout = read_timeout
233         self.async_thread = async_thread
234         self.event_thread = None
235
236         if use_socket:
237             from . vpp_transport_socket import VppTransport
238         else:
239             from . vpp_transport_shmem import VppTransport
240
241         if not apifiles:
242             # Pick up API definitions from default directory
243             try:
244                 apifiles = self.find_api_files()
245             except RuntimeError:
246                 # In test mode we don't care that we can't find the API files
247                 if testmode:
248                     apifiles = []
249                 else:
250                     raise VPPRuntimeError
251
252         for file in apifiles:
253             with open(file) as apidef_file:
254                 self.process_json_file(apidef_file)
255
256         self.apifiles = apifiles
257
258         # Basic sanity check
259         if len(self.messages) == 0 and not testmode:
260             raise VPPValueError(1, 'Missing JSON message definitions')
261
262         self.transport = VppTransport(self, read_timeout=read_timeout,
263                                       server_address=server_address)
264         # Make sure we allow VPP to clean up the message rings.
265         atexit.register(vpp_atexit, weakref.ref(self))
266
267     class ContextId(object):
268         """Multiprocessing-safe provider of unique context IDs."""
269         def __init__(self):
270             self.context = mp.Value(ctypes.c_uint, 0)
271             self.lock = mp.Lock()
272
273         def __call__(self):
274             """Get a new unique (or, at least, not recently used) context."""
275             with self.lock:
276                 self.context.value += 1
277                 return self.context.value
278     get_context = ContextId()
279
280     def get_type(self, name):
281         return vpp_get_type(name)
282
283     @classmethod
284     def find_api_dir(cls):
285         """Attempt to find the best directory in which API definition
286         files may reside. If the value VPP_API_DIR exists in the environment
287         then it is first on the search list. If we're inside a recognized
288         location in a VPP source tree (src/scripts and src/vpp-api/python)
289         then entries from there to the likely locations in build-root are
290         added. Finally the location used by system packages is added.
291
292         :returns: A single directory name, or None if no such directory
293             could be found.
294         """
295         dirs = [cls.apidir] if cls.apidir else []
296
297         # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
298         # in which case, plot a course to likely places in the src tree
299         import __main__ as main
300         if hasattr(main, '__file__'):
301             # get the path of the calling script
302             localdir = os.path.dirname(os.path.realpath(main.__file__))
303         else:
304             # use cwd if there is no calling script
305             localdir = os.getcwd()
306         localdir_s = localdir.split(os.path.sep)
307
308         def dmatch(dir):
309             """Match dir against right-hand components of the script dir"""
310             d = dir.split('/')  # param 'dir' assumes a / separator
311             length = len(d)
312             return len(localdir_s) > length and localdir_s[-length:] == d
313
314         def sdir(srcdir, variant):
315             """Build a path from srcdir to the staged API files of
316             'variant'  (typically '' or '_debug')"""
317             # Since 'core' and 'plugin' files are staged
318             # in separate directories, we target the parent dir.
319             return os.path.sep.join((
320                 srcdir,
321                 'build-root',
322                 'install-vpp%s-native' % variant,
323                 'vpp',
324                 'share',
325                 'vpp',
326                 'api',
327             ))
328
329         srcdir = None
330         if dmatch('src/scripts'):
331             srcdir = os.path.sep.join(localdir_s[:-2])
332         elif dmatch('src/vpp-api/python'):
333             srcdir = os.path.sep.join(localdir_s[:-3])
334         elif dmatch('test'):
335             # we're apparently running tests
336             srcdir = os.path.sep.join(localdir_s[:-1])
337
338         if srcdir:
339             # we're in the source tree, try both the debug and release
340             # variants.
341             dirs.append(sdir(srcdir, '_debug'))
342             dirs.append(sdir(srcdir, ''))
343
344         # Test for staged copies of the scripts
345         # For these, since we explicitly know if we're running a debug versus
346         # release variant, target only the relevant directory
347         if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
348             srcdir = os.path.sep.join(localdir_s[:-4])
349             dirs.append(sdir(srcdir, '_debug'))
350         if dmatch('build-root/install-vpp-native/vpp/bin'):
351             srcdir = os.path.sep.join(localdir_s[:-4])
352             dirs.append(sdir(srcdir, ''))
353
354         # finally, try the location system packages typically install into
355         dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
356
357         # check the directories for existence; first one wins
358         for dir in dirs:
359             if os.path.isdir(dir):
360                 return dir
361
362         return None
363
364     @classmethod
365     def find_api_files(cls, api_dir=None, patterns='*'):
366         """Find API definition files from the given directory tree with the
367         given pattern. If no directory is given then find_api_dir() is used
368         to locate one. If no pattern is given then all definition files found
369         in the directory tree are used.
370
371         :param api_dir: A directory tree in which to locate API definition
372             files; subdirectories are descended into.
373             If this is None then find_api_dir() is called to discover it.
374         :param patterns: A list of patterns to use in each visited directory
375             when looking for files.
376             This can be a list/tuple object or a comma-separated string of
377             patterns. Each value in the list will have leading/trialing
378             whitespace stripped.
379             The pattern specifies the first part of the filename, '.api.json'
380             is appended.
381             The results are de-duplicated, thus overlapping patterns are fine.
382             If this is None it defaults to '*' meaning "all API files".
383         :returns: A list of file paths for the API files found.
384         """
385         if api_dir is None:
386             api_dir = cls.find_api_dir()
387             if api_dir is None:
388                 raise VPPApiError("api_dir cannot be located")
389
390         if isinstance(patterns, list) or isinstance(patterns, tuple):
391             patterns = [p.strip() + '.api.json' for p in patterns]
392         else:
393             patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
394
395         api_files = []
396         for root, dirnames, files in os.walk(api_dir):
397             # iterate all given patterns and de-dup the result
398             files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
399             for filename in files:
400                 api_files.append(os.path.join(root, filename))
401
402         return api_files
403
404     @property
405     def api(self):
406         if not hasattr(self, "_api"):
407             raise VPPApiError("Not connected, api definitions not available")
408         return self._api
409
410     def make_function(self, msg, i, multipart, do_async):
411         if (do_async):
412             def f(**kwargs):
413                 return self._call_vpp_async(i, msg, **kwargs)
414         else:
415             def f(**kwargs):
416                 return self._call_vpp(i, msg, multipart, **kwargs)
417
418         f.__name__ = str(msg.name)
419         f.__doc__ = ", ".join(["%s %s" %
420                                (msg.fieldtypes[j], k)
421                                for j, k in enumerate(msg.fields)])
422         f.msg = msg
423
424         return f
425
426     def _register_functions(self, do_async=False):
427         self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
428         self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
429         self._api = VppApiDynamicMethodHolder()
430         for name, msg in vpp_iterator(self.messages):
431             n = name + '_' + msg.crc[2:]
432             i = self.transport.get_msg_index(n.encode('utf-8'))
433             if i > 0:
434                 self.id_msgdef[i] = msg
435                 self.id_names[i] = name
436
437                 # Create function for client side messages.
438                 if name in self.services:
439                     if 'stream' in self.services[name] and \
440                        self.services[name]['stream']:
441                         multipart = True
442                     else:
443                         multipart = False
444                     f = self.make_function(msg, i, multipart, do_async)
445                     setattr(self._api, name, FuncWrapper(f))
446             else:
447                 self.logger.debug(
448                     'No such message type or failed CRC checksum: %s', n)
449
450     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
451                          do_async):
452         pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
453
454         rv = self.transport.connect(name.encode('utf-8'), pfx,
455                                     msg_handler, rx_qlen)
456         if rv != 0:
457             raise VPPIOError(2, 'Connect failed')
458         self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
459         self._register_functions(do_async=do_async)
460
461         # Initialise control ping
462         crc = self.messages['control_ping'].crc
463         self.control_ping_index = self.transport.get_msg_index(
464             ('control_ping' + '_' + crc[2:]).encode('utf-8'))
465         self.control_ping_msgdef = self.messages['control_ping']
466         if self.async_thread:
467             self.event_thread = threading.Thread(
468                 target=self.thread_msg_handler)
469             self.event_thread.daemon = True
470             self.event_thread.start()
471         else:
472             self.event_thread = None
473         return rv
474
475     def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
476         """Attach to VPP.
477
478         name - the name of the client.
479         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
480         do_async - if true, messages are sent without waiting for a reply
481         rx_qlen - the length of the VPP message receive queue between
482         client and server.
483         """
484         msg_handler = self.transport.get_callback(do_async)
485         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
486                                      do_async)
487
488     def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
489         """Attach to VPP in synchronous mode. Application must poll for events.
490
491         name - the name of the client.
492         chroot_prefix - if VPP is chroot'ed, the prefix of the jail
493         rx_qlen - the length of the VPP message receive queue between
494         client and server.
495         """
496
497         return self.connect_internal(name, None, chroot_prefix, rx_qlen,
498                                      do_async=False)
499
500     def disconnect(self):
501         """Detach from VPP."""
502         rv = self.transport.disconnect()
503         if self.event_thread is not None:
504             self.message_queue.put("terminate event thread")
505         return rv
506
507     def msg_handler_sync(self, msg):
508         """Process an incoming message from VPP in sync mode.
509
510         The message may be a reply or it may be an async notification.
511         """
512         r = self.decode_incoming_msg(msg)
513         if r is None:
514             return
515
516         # If we have a context, then use the context to find any
517         # request waiting for a reply
518         context = 0
519         if hasattr(r, 'context') and r.context > 0:
520             context = r.context
521
522         if context == 0:
523             # No context -> async notification that we feed to the callback
524             self.message_queue.put_nowait(r)
525         else:
526             raise VPPIOError(2, 'RPC reply message received in event handler')
527
528     def has_context(self, msg):
529         if len(msg) < 10:
530             return False
531
532         header = VPPType('header_with_context', [['u16', 'msgid'],
533                                                  ['u32', 'client_index'],
534                                                  ['u32', 'context']])
535
536         (i, ci, context), size = header.unpack(msg, 0)
537         if self.id_names[i] == 'rx_thread_exit':
538             return
539
540         #
541         # Decode message and returns a tuple.
542         #
543         msgobj = self.id_msgdef[i]
544         if 'context' in msgobj.field_by_name and context >= 0:
545             return True
546         return False
547
548     def decode_incoming_msg(self, msg, no_type_conversion=False):
549         if not msg:
550             self.logger.warning('vpp_api.read failed')
551             return
552
553         (i, ci), size = self.header.unpack(msg, 0)
554         if self.id_names[i] == 'rx_thread_exit':
555             return
556
557         #
558         # Decode message and returns a tuple.
559         #
560         msgobj = self.id_msgdef[i]
561         if not msgobj:
562             raise VPPIOError(2, 'Reply message undefined')
563
564         r, size = msgobj.unpack(msg, ntc=no_type_conversion)
565         return r
566
567     def msg_handler_async(self, msg):
568         """Process a message from VPP in async mode.
569
570         In async mode, all messages are returned to the callback.
571         """
572         r = self.decode_incoming_msg(msg)
573         if r is None:
574             return
575
576         msgname = type(r).__name__
577
578         if self.event_callback:
579             self.event_callback(msgname, r)
580
581     def _control_ping(self, context):
582         """Send a ping command."""
583         self._call_vpp_async(self.control_ping_index,
584                              self.control_ping_msgdef,
585                              context=context)
586
587     def validate_args(self, msg, kwargs):
588         d = set(kwargs.keys()) - set(msg.field_by_name.keys())
589         if d:
590             raise VPPValueError('Invalid argument {} to {}'
591                                 .format(list(d), msg.name))
592
593     def _call_vpp(self, i, msgdef, multipart, **kwargs):
594         """Given a message, send the message and await a reply.
595
596         msgdef - the message packing definition
597         i - the message type index
598         multipart - True if the message returns multiple
599         messages in return.
600         context - context number - chosen at random if not
601         supplied.
602         The remainder of the kwargs are the arguments to the API call.
603
604         The return value is the message or message array containing
605         the response.  It will raise an IOError exception if there was
606         no response within the timeout window.
607         """
608
609         if 'context' not in kwargs:
610             context = self.get_context()
611             kwargs['context'] = context
612         else:
613             context = kwargs['context']
614         kwargs['_vl_msg_id'] = i
615
616         no_type_conversion = kwargs.pop('_no_type_conversion', False)
617
618         try:
619             if self.transport.socket_index:
620                 kwargs['client_index'] = self.transport.socket_index
621         except AttributeError:
622             pass
623         self.validate_args(msgdef, kwargs)
624
625         logging.debug(call_logger(msgdef, kwargs))
626
627         b = msgdef.pack(kwargs)
628         self.transport.suspend()
629
630         self.transport.write(b)
631
632         if multipart:
633             # Send a ping after the request - we use its response
634             # to detect that we have seen all results.
635             self._control_ping(context)
636
637         # Block until we get a reply.
638         rl = []
639         while (True):
640             msg = self.transport.read()
641             if not msg:
642                 raise VPPIOError(2, 'VPP API client: read failed')
643             r = self.decode_incoming_msg(msg, no_type_conversion)
644             msgname = type(r).__name__
645             if context not in r or r.context == 0 or context != r.context:
646                 # Message being queued
647                 self.message_queue.put_nowait(r)
648                 continue
649
650             if not multipart:
651                 rl = r
652                 break
653             if msgname == 'control_ping_reply':
654                 break
655
656             rl.append(r)
657
658         self.transport.resume()
659
660         logger.debug(return_logger(rl))
661         return rl
662
663     def _call_vpp_async(self, i, msg, **kwargs):
664         """Given a message, send the message and await a reply.
665
666         msgdef - the message packing definition
667         i - the message type index
668         context - context number - chosen at random if not
669         supplied.
670         The remainder of the kwargs are the arguments to the API call.
671         """
672         if 'context' not in kwargs:
673             context = self.get_context()
674             kwargs['context'] = context
675         else:
676             context = kwargs['context']
677         try:
678             if self.transport.socket_index:
679                 kwargs['client_index'] = self.transport.socket_index
680         except AttributeError:
681             kwargs['client_index'] = 0
682         kwargs['_vl_msg_id'] = i
683         b = msg.pack(kwargs)
684
685         self.transport.write(b)
686
687     def register_event_callback(self, callback):
688         """Register a callback for async messages.
689
690         This will be called for async notifications in sync mode,
691         and all messages in async mode.  In sync mode, replies to
692         requests will not come here.
693
694         callback is a fn(msg_type_name, msg_type) that will be
695         called when a message comes in.  While this function is
696         executing, note that (a) you are in a background thread and
697         may wish to use threading.Lock to protect your datastructures,
698         and (b) message processing from VPP will stop (so if you take
699         a long while about it you may provoke reply timeouts or cause
700         VPP to fill the RX buffer).  Passing None will disable the
701         callback.
702         """
703         self.event_callback = callback
704
705     def thread_msg_handler(self):
706         """Python thread calling the user registered message handler.
707
708         This is to emulate the old style event callback scheme. Modern
709         clients should provide their own thread to poll the event
710         queue.
711         """
712         while True:
713             r = self.message_queue.get()
714             if r == "terminate event thread":
715                 break
716             msgname = type(r).__name__
717             if self.event_callback:
718                 self.event_callback(msgname, r)
719
720 # Provide the old name for backward compatibility.
721 VPP = VPPApiClient
722
723 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4