3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
21 import multiprocessing as mp
30 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType
31 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
33 logger = logging.getLogger(__name__)
35 if sys.version[0] == '2':
40 __all__ = ('FuncWrapper', 'VPP', 'VppApiDynamicMethodHolder',
41 'VppEnum', 'VppEnumType',
42 'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
46 def metaclass(metaclass):
47 @functools.wraps(metaclass)
49 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
54 class VppEnumType(type):
55 def __getattr__(cls, name):
56 t = vpp_get_type(name)
60 @metaclass(VppEnumType)
61 class VppEnum(object):
65 def vpp_atexit(vpp_weakref):
66 """Clean up VPP connection on shutdown."""
67 vpp_instance = vpp_weakref()
68 if vpp_instance and vpp_instance.transport.connected:
69 vpp_instance.logger.debug('Cleaning up VPP on exit')
70 vpp_instance.disconnect()
73 if sys.version[0] == '2':
81 def call_logger(msgdef, kwargs):
82 s = 'Calling {}('.format(msgdef.name)
83 for k, v in kwargs.items():
84 s += '{}:{} '.format(k, v)
90 s = 'Return from {}'.format(r)
94 class VppApiDynamicMethodHolder(object):
98 class FuncWrapper(object):
99 def __init__(self, func):
101 self.__name__ = func.__name__
102 self.__doc__ = func.__doc__
104 def __call__(self, **kwargs):
105 return self._func(**kwargs)
108 class VPPApiError(Exception):
112 class VPPNotImplementedError(NotImplementedError):
116 class VPPIOError(IOError):
120 class VPPRuntimeError(RuntimeError):
124 class VPPValueError(ValueError):
128 class VPPApiClient(object):
131 This class provides the APIs to VPP. The APIs are loaded
132 from provided .api.json files and makes functions accordingly.
133 These functions are documented in the VPP .api files, as they
134 are dynamically created.
136 Additionally, VPP can send callback messages; this class
137 provides a means to register a callback function to receive
138 these messages in a background thread.
141 VPPApiError = VPPApiError
142 VPPRuntimeError = VPPRuntimeError
143 VPPValueError = VPPValueError
144 VPPNotImplementedError = VPPNotImplementedError
145 VPPIOError = VPPIOError
147 def process_json_file(self, apidef_file):
148 api = json.load(apidef_file)
150 for t in api['enums']:
151 t[0] = 'vl_api_' + t[0] + '_t'
152 types[t[0]] = {'type': 'enum', 'data': t}
153 for t in api['unions']:
154 t[0] = 'vl_api_' + t[0] + '_t'
155 types[t[0]] = {'type': 'union', 'data': t}
156 for t in api['types']:
157 t[0] = 'vl_api_' + t[0] + '_t'
158 types[t[0]] = {'type': 'type', 'data': t}
159 for t, v in api['aliases'].items():
160 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
161 self.services.update(api['services'])
166 for k, v in types.items():
168 if not vpp_get_type(k):
169 if v['type'] == 'enum':
171 VPPEnumType(t[0], t[1:])
174 elif v['type'] == 'union':
176 VPPUnionType(t[0], t[1:])
179 elif v['type'] == 'type':
184 elif v['type'] == 'alias':
189 if len(unresolved) == 0:
192 raise VPPValueError('Unresolved type definitions {}'
197 for m in api['messages']:
199 self.messages[m[0]] = VPPMessage(m[0], m[1:])
200 except VPPNotImplementedError:
201 self.logger.error('Not implemented error for {}'.format(m[0]))
203 def __init__(self, apifiles=None, testmode=False, async_thread=True,
204 logger=None, loglevel=None,
205 read_timeout=5, use_socket=False,
206 server_address='/run/vpp-api.sock'):
207 """Create a VPP API object.
209 apifiles is a list of files containing API
210 descriptions that will be loaded - methods will be
211 dynamically created reflecting these APIs. If not
212 provided this will load the API files from VPP's
213 default install location.
215 logger, if supplied, is the logging logger object to log to.
216 loglevel, if supplied, is the log level this logger is set
217 to report at (from the loglevels in the logging module).
220 logger = logging.getLogger(__name__)
221 if loglevel is not None:
222 logger.setLevel(loglevel)
229 self.header = VPPType('header', [['u16', 'msgid'],
230 ['u32', 'client_index']])
232 self.event_callback = None
233 self.message_queue = queue.Queue()
234 self.read_timeout = read_timeout
235 self.async_thread = async_thread
236 self.event_thread = None
237 self.testmode = testmode
238 self.use_socket = use_socket
239 self.server_address = server_address
240 self._apifiles = apifiles
243 from . vpp_transport_socket import VppTransport
245 from . vpp_transport_shmem import VppTransport
248 # Pick up API definitions from default directory
250 apifiles = self.find_api_files()
252 # In test mode we don't care that we can't find the API files
256 raise VPPRuntimeError
258 for file in apifiles:
259 with open(file) as apidef_file:
260 self.process_json_file(apidef_file)
262 self.apifiles = apifiles
265 if len(self.messages) == 0 and not testmode:
266 raise VPPValueError(1, 'Missing JSON message definitions')
268 self.transport = VppTransport(self, read_timeout=read_timeout,
269 server_address=server_address)
270 # Make sure we allow VPP to clean up the message rings.
271 atexit.register(vpp_atexit, weakref.ref(self))
273 class ContextId(object):
274 """Multiprocessing-safe provider of unique context IDs."""
276 self.context = mp.Value(ctypes.c_uint, 0)
277 self.lock = mp.Lock()
280 """Get a new unique (or, at least, not recently used) context."""
282 self.context.value += 1
283 return self.context.value
284 get_context = ContextId()
286 def get_type(self, name):
287 return vpp_get_type(name)
290 def find_api_dir(cls):
291 """Attempt to find the best directory in which API definition
292 files may reside. If the value VPP_API_DIR exists in the environment
293 then it is first on the search list. If we're inside a recognized
294 location in a VPP source tree (src/scripts and src/vpp-api/python)
295 then entries from there to the likely locations in build-root are
296 added. Finally the location used by system packages is added.
298 :returns: A single directory name, or None if no such directory
301 dirs = [cls.apidir] if cls.apidir else []
303 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
304 # in which case, plot a course to likely places in the src tree
305 import __main__ as main
306 if hasattr(main, '__file__'):
307 # get the path of the calling script
308 localdir = os.path.dirname(os.path.realpath(main.__file__))
310 # use cwd if there is no calling script
311 localdir = os.getcwd()
312 localdir_s = localdir.split(os.path.sep)
315 """Match dir against right-hand components of the script dir"""
316 d = dir.split('/') # param 'dir' assumes a / separator
318 return len(localdir_s) > length and localdir_s[-length:] == d
320 def sdir(srcdir, variant):
321 """Build a path from srcdir to the staged API files of
322 'variant' (typically '' or '_debug')"""
323 # Since 'core' and 'plugin' files are staged
324 # in separate directories, we target the parent dir.
325 return os.path.sep.join((
328 'install-vpp%s-native' % variant,
336 if dmatch('src/scripts'):
337 srcdir = os.path.sep.join(localdir_s[:-2])
338 elif dmatch('src/vpp-api/python'):
339 srcdir = os.path.sep.join(localdir_s[:-3])
341 # we're apparently running tests
342 srcdir = os.path.sep.join(localdir_s[:-1])
345 # we're in the source tree, try both the debug and release
347 dirs.append(sdir(srcdir, '_debug'))
348 dirs.append(sdir(srcdir, ''))
350 # Test for staged copies of the scripts
351 # For these, since we explicitly know if we're running a debug versus
352 # release variant, target only the relevant directory
353 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
354 srcdir = os.path.sep.join(localdir_s[:-4])
355 dirs.append(sdir(srcdir, '_debug'))
356 if dmatch('build-root/install-vpp-native/vpp/bin'):
357 srcdir = os.path.sep.join(localdir_s[:-4])
358 dirs.append(sdir(srcdir, ''))
360 # finally, try the location system packages typically install into
361 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
363 # check the directories for existence; first one wins
365 if os.path.isdir(dir):
371 def find_api_files(cls, api_dir=None, patterns='*'):
372 """Find API definition files from the given directory tree with the
373 given pattern. If no directory is given then find_api_dir() is used
374 to locate one. If no pattern is given then all definition files found
375 in the directory tree are used.
377 :param api_dir: A directory tree in which to locate API definition
378 files; subdirectories are descended into.
379 If this is None then find_api_dir() is called to discover it.
380 :param patterns: A list of patterns to use in each visited directory
381 when looking for files.
382 This can be a list/tuple object or a comma-separated string of
383 patterns. Each value in the list will have leading/trialing
385 The pattern specifies the first part of the filename, '.api.json'
387 The results are de-duplicated, thus overlapping patterns are fine.
388 If this is None it defaults to '*' meaning "all API files".
389 :returns: A list of file paths for the API files found.
392 api_dir = cls.find_api_dir()
394 raise VPPApiError("api_dir cannot be located")
396 if isinstance(patterns, list) or isinstance(patterns, tuple):
397 patterns = [p.strip() + '.api.json' for p in patterns]
399 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
402 for root, dirnames, files in os.walk(api_dir):
403 # iterate all given patterns and de-dup the result
404 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
405 for filename in files:
406 api_files.append(os.path.join(root, filename))
412 if not hasattr(self, "_api"):
413 raise VPPApiError("Not connected, api definitions not available")
416 def make_function(self, msg, i, multipart, do_async):
419 return self._call_vpp_async(i, msg, **kwargs)
422 return self._call_vpp(i, msg, multipart, **kwargs)
424 f.__name__ = str(msg.name)
425 f.__doc__ = ", ".join(["%s %s" %
426 (msg.fieldtypes[j], k)
427 for j, k in enumerate(msg.fields)])
432 def _register_functions(self, do_async=False):
433 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
434 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
435 self._api = VppApiDynamicMethodHolder()
436 for name, msg in vpp_iterator(self.messages):
437 n = name + '_' + msg.crc[2:]
438 i = self.transport.get_msg_index(n.encode('utf-8'))
440 self.id_msgdef[i] = msg
441 self.id_names[i] = name
443 # Create function for client side messages.
444 if name in self.services:
445 if 'stream' in self.services[name] and \
446 self.services[name]['stream']:
450 f = self.make_function(msg, i, multipart, do_async)
451 setattr(self._api, name, FuncWrapper(f))
454 'No such message type or failed CRC checksum: %s', n)
456 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
458 pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
460 rv = self.transport.connect(name.encode('utf-8'), pfx,
461 msg_handler, rx_qlen)
463 raise VPPIOError(2, 'Connect failed')
464 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
465 self._register_functions(do_async=do_async)
467 # Initialise control ping
468 crc = self.messages['control_ping'].crc
469 self.control_ping_index = self.transport.get_msg_index(
470 ('control_ping' + '_' + crc[2:]).encode('utf-8'))
471 self.control_ping_msgdef = self.messages['control_ping']
472 if self.async_thread:
473 self.event_thread = threading.Thread(
474 target=self.thread_msg_handler)
475 self.event_thread.daemon = True
476 self.event_thread.start()
478 self.event_thread = None
481 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
484 name - the name of the client.
485 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
486 do_async - if true, messages are sent without waiting for a reply
487 rx_qlen - the length of the VPP message receive queue between
490 msg_handler = self.transport.get_callback(do_async)
491 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
494 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
495 """Attach to VPP in synchronous mode. Application must poll for events.
497 name - the name of the client.
498 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
499 rx_qlen - the length of the VPP message receive queue between
503 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
506 def disconnect(self):
507 """Detach from VPP."""
508 rv = self.transport.disconnect()
509 if self.event_thread is not None:
510 self.message_queue.put("terminate event thread")
513 def msg_handler_sync(self, msg):
514 """Process an incoming message from VPP in sync mode.
516 The message may be a reply or it may be an async notification.
518 r = self.decode_incoming_msg(msg)
522 # If we have a context, then use the context to find any
523 # request waiting for a reply
525 if hasattr(r, 'context') and r.context > 0:
529 # No context -> async notification that we feed to the callback
530 self.message_queue.put_nowait(r)
532 raise VPPIOError(2, 'RPC reply message received in event handler')
534 def has_context(self, msg):
538 header = VPPType('header_with_context', [['u16', 'msgid'],
539 ['u32', 'client_index'],
542 (i, ci, context), size = header.unpack(msg, 0)
543 if self.id_names[i] == 'rx_thread_exit':
547 # Decode message and returns a tuple.
549 msgobj = self.id_msgdef[i]
550 if 'context' in msgobj.field_by_name and context >= 0:
554 def decode_incoming_msg(self, msg, no_type_conversion=False):
556 self.logger.warning('vpp_api.read failed')
559 (i, ci), size = self.header.unpack(msg, 0)
560 if self.id_names[i] == 'rx_thread_exit':
564 # Decode message and returns a tuple.
566 msgobj = self.id_msgdef[i]
568 raise VPPIOError(2, 'Reply message undefined')
570 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
573 def msg_handler_async(self, msg):
574 """Process a message from VPP in async mode.
576 In async mode, all messages are returned to the callback.
578 r = self.decode_incoming_msg(msg)
582 msgname = type(r).__name__
584 if self.event_callback:
585 self.event_callback(msgname, r)
587 def _control_ping(self, context):
588 """Send a ping command."""
589 self._call_vpp_async(self.control_ping_index,
590 self.control_ping_msgdef,
593 def validate_args(self, msg, kwargs):
594 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
596 raise VPPValueError('Invalid argument {} to {}'
597 .format(list(d), msg.name))
599 def _call_vpp(self, i, msgdef, multipart, **kwargs):
600 """Given a message, send the message and await a reply.
602 msgdef - the message packing definition
603 i - the message type index
604 multipart - True if the message returns multiple
606 context - context number - chosen at random if not
608 The remainder of the kwargs are the arguments to the API call.
610 The return value is the message or message array containing
611 the response. It will raise an IOError exception if there was
612 no response within the timeout window.
615 if 'context' not in kwargs:
616 context = self.get_context()
617 kwargs['context'] = context
619 context = kwargs['context']
620 kwargs['_vl_msg_id'] = i
622 no_type_conversion = kwargs.pop('_no_type_conversion', False)
625 if self.transport.socket_index:
626 kwargs['client_index'] = self.transport.socket_index
627 except AttributeError:
629 self.validate_args(msgdef, kwargs)
631 logging.debug(call_logger(msgdef, kwargs))
633 b = msgdef.pack(kwargs)
634 self.transport.suspend()
636 self.transport.write(b)
639 # Send a ping after the request - we use its response
640 # to detect that we have seen all results.
641 self._control_ping(context)
643 # Block until we get a reply.
646 msg = self.transport.read()
648 raise VPPIOError(2, 'VPP API client: read failed')
649 r = self.decode_incoming_msg(msg, no_type_conversion)
650 msgname = type(r).__name__
651 if context not in r or r.context == 0 or context != r.context:
652 # Message being queued
653 self.message_queue.put_nowait(r)
659 if msgname == 'control_ping_reply':
664 self.transport.resume()
666 logger.debug(return_logger(rl))
669 def _call_vpp_async(self, i, msg, **kwargs):
670 """Given a message, send the message and await a reply.
672 msgdef - the message packing definition
673 i - the message type index
674 context - context number - chosen at random if not
676 The remainder of the kwargs are the arguments to the API call.
678 if 'context' not in kwargs:
679 context = self.get_context()
680 kwargs['context'] = context
682 context = kwargs['context']
684 if self.transport.socket_index:
685 kwargs['client_index'] = self.transport.socket_index
686 except AttributeError:
687 kwargs['client_index'] = 0
688 kwargs['_vl_msg_id'] = i
691 self.transport.write(b)
693 def register_event_callback(self, callback):
694 """Register a callback for async messages.
696 This will be called for async notifications in sync mode,
697 and all messages in async mode. In sync mode, replies to
698 requests will not come here.
700 callback is a fn(msg_type_name, msg_type) that will be
701 called when a message comes in. While this function is
702 executing, note that (a) you are in a background thread and
703 may wish to use threading.Lock to protect your datastructures,
704 and (b) message processing from VPP will stop (so if you take
705 a long while about it you may provoke reply timeouts or cause
706 VPP to fill the RX buffer). Passing None will disable the
709 self.event_callback = callback
711 def thread_msg_handler(self):
712 """Python thread calling the user registered message handler.
714 This is to emulate the old style event callback scheme. Modern
715 clients should provide their own thread to poll the event
719 r = self.message_queue.get()
720 if r == "terminate event thread":
722 msgname = type(r).__name__
723 if self.event_callback:
724 self.event_callback(msgname, r)
727 return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
728 "logger=%s, read_timeout=%s, use_socket=%s, " \
729 "server_address='%s'>" % (
730 self._apifiles, self.testmode, self.async_thread,
731 self.logger, self.read_timeout, self.use_socket,
735 # Provide the old name for backward compatibility.
738 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4