3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
21 import multiprocessing as mp
30 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType
31 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
33 logger = logging.getLogger(__name__)
35 if sys.version[0] == '2':
40 __all__ = ('FuncWrapper', 'VPP', 'VppApiDynamicMethodHolder',
41 'VppEnum', 'VppEnumType',
42 'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
46 def metaclass(metaclass):
47 @functools.wraps(metaclass)
49 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
54 class VppEnumType(type):
55 def __getattr__(cls, name):
56 t = vpp_get_type(name)
60 @metaclass(VppEnumType)
61 class VppEnum(object):
65 def vpp_atexit(vpp_weakref):
66 """Clean up VPP connection on shutdown."""
67 vpp_instance = vpp_weakref()
68 if vpp_instance and vpp_instance.transport.connected:
69 vpp_instance.logger.debug('Cleaning up VPP on exit')
70 vpp_instance.disconnect()
73 if sys.version[0] == '2':
81 def call_logger(msgdef, kwargs):
82 s = 'Calling {}('.format(msgdef.name)
83 for k, v in kwargs.items():
84 s += '{}:{} '.format(k, v)
90 s = 'Return from {}'.format(r)
94 class VppApiDynamicMethodHolder(object):
98 class FuncWrapper(object):
99 def __init__(self, func):
101 self.__name__ = func.__name__
102 self.__doc__ = func.__doc__
104 def __call__(self, **kwargs):
105 return self._func(**kwargs)
108 return '<FuncWrapper(func=<%s(%s)>)>' % (self.__name__, self.__doc__)
111 class VPPApiError(Exception):
115 class VPPNotImplementedError(NotImplementedError):
119 class VPPIOError(IOError):
123 class VPPRuntimeError(RuntimeError):
127 class VPPValueError(ValueError):
131 class VPPApiClient(object):
134 This class provides the APIs to VPP. The APIs are loaded
135 from provided .api.json files and makes functions accordingly.
136 These functions are documented in the VPP .api files, as they
137 are dynamically created.
139 Additionally, VPP can send callback messages; this class
140 provides a means to register a callback function to receive
141 these messages in a background thread.
144 VPPApiError = VPPApiError
145 VPPRuntimeError = VPPRuntimeError
146 VPPValueError = VPPValueError
147 VPPNotImplementedError = VPPNotImplementedError
148 VPPIOError = VPPIOError
150 def process_json_file(self, apidef_file):
151 api = json.load(apidef_file)
153 for t in api['enums']:
154 t[0] = 'vl_api_' + t[0] + '_t'
155 types[t[0]] = {'type': 'enum', 'data': t}
156 for t in api['unions']:
157 t[0] = 'vl_api_' + t[0] + '_t'
158 types[t[0]] = {'type': 'union', 'data': t}
159 for t in api['types']:
160 t[0] = 'vl_api_' + t[0] + '_t'
161 types[t[0]] = {'type': 'type', 'data': t}
162 for t, v in api['aliases'].items():
163 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
164 self.services.update(api['services'])
169 for k, v in types.items():
171 if not vpp_get_type(k):
172 if v['type'] == 'enum':
174 VPPEnumType(t[0], t[1:])
177 elif v['type'] == 'union':
179 VPPUnionType(t[0], t[1:])
182 elif v['type'] == 'type':
187 elif v['type'] == 'alias':
192 if len(unresolved) == 0:
195 raise VPPValueError('Unresolved type definitions {}'
200 for m in api['messages']:
202 self.messages[m[0]] = VPPMessage(m[0], m[1:])
203 except VPPNotImplementedError:
204 self.logger.error('Not implemented error for {}'.format(m[0]))
206 def __init__(self, apifiles=None, testmode=False, async_thread=True,
207 logger=None, loglevel=None,
208 read_timeout=5, use_socket=False,
209 server_address='/run/vpp-api.sock'):
210 """Create a VPP API object.
212 apifiles is a list of files containing API
213 descriptions that will be loaded - methods will be
214 dynamically created reflecting these APIs. If not
215 provided this will load the API files from VPP's
216 default install location.
218 logger, if supplied, is the logging logger object to log to.
219 loglevel, if supplied, is the log level this logger is set
220 to report at (from the loglevels in the logging module).
223 logger = logging.getLogger(__name__)
224 if loglevel is not None:
225 logger.setLevel(loglevel)
232 self.header = VPPType('header', [['u16', 'msgid'],
233 ['u32', 'client_index']])
235 self.event_callback = None
236 self.message_queue = queue.Queue()
237 self.read_timeout = read_timeout
238 self.async_thread = async_thread
239 self.event_thread = None
240 self.testmode = testmode
241 self.use_socket = use_socket
242 self.server_address = server_address
243 self._apifiles = apifiles
246 from . vpp_transport_socket import VppTransport
248 from . vpp_transport_shmem import VppTransport
251 # Pick up API definitions from default directory
253 apifiles = self.find_api_files()
255 # In test mode we don't care that we can't find the API files
259 raise VPPRuntimeError
261 for file in apifiles:
262 with open(file) as apidef_file:
263 self.process_json_file(apidef_file)
265 self.apifiles = apifiles
268 if len(self.messages) == 0 and not testmode:
269 raise VPPValueError(1, 'Missing JSON message definitions')
271 self.transport = VppTransport(self, read_timeout=read_timeout,
272 server_address=server_address)
273 # Make sure we allow VPP to clean up the message rings.
274 atexit.register(vpp_atexit, weakref.ref(self))
276 class ContextId(object):
277 """Multiprocessing-safe provider of unique context IDs."""
279 self.context = mp.Value(ctypes.c_uint, 0)
280 self.lock = mp.Lock()
283 """Get a new unique (or, at least, not recently used) context."""
285 self.context.value += 1
286 return self.context.value
287 get_context = ContextId()
289 def get_type(self, name):
290 return vpp_get_type(name)
293 def find_api_dir(cls):
294 """Attempt to find the best directory in which API definition
295 files may reside. If the value VPP_API_DIR exists in the environment
296 then it is first on the search list. If we're inside a recognized
297 location in a VPP source tree (src/scripts and src/vpp-api/python)
298 then entries from there to the likely locations in build-root are
299 added. Finally the location used by system packages is added.
301 :returns: A single directory name, or None if no such directory
304 dirs = [cls.apidir] if cls.apidir else []
306 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
307 # in which case, plot a course to likely places in the src tree
308 import __main__ as main
309 if hasattr(main, '__file__'):
310 # get the path of the calling script
311 localdir = os.path.dirname(os.path.realpath(main.__file__))
313 # use cwd if there is no calling script
314 localdir = os.getcwd()
315 localdir_s = localdir.split(os.path.sep)
318 """Match dir against right-hand components of the script dir"""
319 d = dir.split('/') # param 'dir' assumes a / separator
321 return len(localdir_s) > length and localdir_s[-length:] == d
323 def sdir(srcdir, variant):
324 """Build a path from srcdir to the staged API files of
325 'variant' (typically '' or '_debug')"""
326 # Since 'core' and 'plugin' files are staged
327 # in separate directories, we target the parent dir.
328 return os.path.sep.join((
331 'install-vpp%s-native' % variant,
339 if dmatch('src/scripts'):
340 srcdir = os.path.sep.join(localdir_s[:-2])
341 elif dmatch('src/vpp-api/python'):
342 srcdir = os.path.sep.join(localdir_s[:-3])
344 # we're apparently running tests
345 srcdir = os.path.sep.join(localdir_s[:-1])
348 # we're in the source tree, try both the debug and release
350 dirs.append(sdir(srcdir, '_debug'))
351 dirs.append(sdir(srcdir, ''))
353 # Test for staged copies of the scripts
354 # For these, since we explicitly know if we're running a debug versus
355 # release variant, target only the relevant directory
356 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
357 srcdir = os.path.sep.join(localdir_s[:-4])
358 dirs.append(sdir(srcdir, '_debug'))
359 if dmatch('build-root/install-vpp-native/vpp/bin'):
360 srcdir = os.path.sep.join(localdir_s[:-4])
361 dirs.append(sdir(srcdir, ''))
363 # finally, try the location system packages typically install into
364 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
366 # check the directories for existence; first one wins
368 if os.path.isdir(dir):
374 def find_api_files(cls, api_dir=None, patterns='*'):
375 """Find API definition files from the given directory tree with the
376 given pattern. If no directory is given then find_api_dir() is used
377 to locate one. If no pattern is given then all definition files found
378 in the directory tree are used.
380 :param api_dir: A directory tree in which to locate API definition
381 files; subdirectories are descended into.
382 If this is None then find_api_dir() is called to discover it.
383 :param patterns: A list of patterns to use in each visited directory
384 when looking for files.
385 This can be a list/tuple object or a comma-separated string of
386 patterns. Each value in the list will have leading/trialing
388 The pattern specifies the first part of the filename, '.api.json'
390 The results are de-duplicated, thus overlapping patterns are fine.
391 If this is None it defaults to '*' meaning "all API files".
392 :returns: A list of file paths for the API files found.
395 api_dir = cls.find_api_dir()
397 raise VPPApiError("api_dir cannot be located")
399 if isinstance(patterns, list) or isinstance(patterns, tuple):
400 patterns = [p.strip() + '.api.json' for p in patterns]
402 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
405 for root, dirnames, files in os.walk(api_dir):
406 # iterate all given patterns and de-dup the result
407 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
408 for filename in files:
409 api_files.append(os.path.join(root, filename))
415 if not hasattr(self, "_api"):
416 raise VPPApiError("Not connected, api definitions not available")
419 def make_function(self, msg, i, multipart, do_async):
422 return self._call_vpp_async(i, msg, **kwargs)
425 return self._call_vpp(i, msg, multipart, **kwargs)
427 f.__name__ = str(msg.name)
428 f.__doc__ = ", ".join(["%s %s" %
429 (msg.fieldtypes[j], k)
430 for j, k in enumerate(msg.fields)])
435 def _register_functions(self, do_async=False):
436 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
437 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
438 self._api = VppApiDynamicMethodHolder()
439 for name, msg in vpp_iterator(self.messages):
440 n = name + '_' + msg.crc[2:]
441 i = self.transport.get_msg_index(n.encode('utf-8'))
443 self.id_msgdef[i] = msg
444 self.id_names[i] = name
446 # Create function for client side messages.
447 if name in self.services:
448 if 'stream' in self.services[name] and \
449 self.services[name]['stream']:
453 f = self.make_function(msg, i, multipart, do_async)
454 setattr(self._api, name, FuncWrapper(f))
457 'No such message type or failed CRC checksum: %s', n)
459 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
461 pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
463 rv = self.transport.connect(name.encode('utf-8'), pfx,
464 msg_handler, rx_qlen)
466 raise VPPIOError(2, 'Connect failed')
467 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
468 self._register_functions(do_async=do_async)
470 # Initialise control ping
471 crc = self.messages['control_ping'].crc
472 self.control_ping_index = self.transport.get_msg_index(
473 ('control_ping' + '_' + crc[2:]).encode('utf-8'))
474 self.control_ping_msgdef = self.messages['control_ping']
475 if self.async_thread:
476 self.event_thread = threading.Thread(
477 target=self.thread_msg_handler)
478 self.event_thread.daemon = True
479 self.event_thread.start()
481 self.event_thread = None
484 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
487 name - the name of the client.
488 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
489 do_async - if true, messages are sent without waiting for a reply
490 rx_qlen - the length of the VPP message receive queue between
493 msg_handler = self.transport.get_callback(do_async)
494 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
497 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
498 """Attach to VPP in synchronous mode. Application must poll for events.
500 name - the name of the client.
501 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
502 rx_qlen - the length of the VPP message receive queue between
506 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
509 def disconnect(self):
510 """Detach from VPP."""
511 rv = self.transport.disconnect()
512 if self.event_thread is not None:
513 self.message_queue.put("terminate event thread")
516 def msg_handler_sync(self, msg):
517 """Process an incoming message from VPP in sync mode.
519 The message may be a reply or it may be an async notification.
521 r = self.decode_incoming_msg(msg)
525 # If we have a context, then use the context to find any
526 # request waiting for a reply
528 if hasattr(r, 'context') and r.context > 0:
532 # No context -> async notification that we feed to the callback
533 self.message_queue.put_nowait(r)
535 raise VPPIOError(2, 'RPC reply message received in event handler')
537 def has_context(self, msg):
541 header = VPPType('header_with_context', [['u16', 'msgid'],
542 ['u32', 'client_index'],
545 (i, ci, context), size = header.unpack(msg, 0)
546 if self.id_names[i] == 'rx_thread_exit':
550 # Decode message and returns a tuple.
552 msgobj = self.id_msgdef[i]
553 if 'context' in msgobj.field_by_name and context >= 0:
557 def decode_incoming_msg(self, msg, no_type_conversion=False):
559 self.logger.warning('vpp_api.read failed')
562 (i, ci), size = self.header.unpack(msg, 0)
563 if self.id_names[i] == 'rx_thread_exit':
567 # Decode message and returns a tuple.
569 msgobj = self.id_msgdef[i]
571 raise VPPIOError(2, 'Reply message undefined')
573 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
576 def msg_handler_async(self, msg):
577 """Process a message from VPP in async mode.
579 In async mode, all messages are returned to the callback.
581 r = self.decode_incoming_msg(msg)
585 msgname = type(r).__name__
587 if self.event_callback:
588 self.event_callback(msgname, r)
590 def _control_ping(self, context):
591 """Send a ping command."""
592 self._call_vpp_async(self.control_ping_index,
593 self.control_ping_msgdef,
596 def validate_args(self, msg, kwargs):
597 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
599 raise VPPValueError('Invalid argument {} to {}'
600 .format(list(d), msg.name))
602 def _call_vpp(self, i, msgdef, multipart, **kwargs):
603 """Given a message, send the message and await a reply.
605 msgdef - the message packing definition
606 i - the message type index
607 multipart - True if the message returns multiple
609 context - context number - chosen at random if not
611 The remainder of the kwargs are the arguments to the API call.
613 The return value is the message or message array containing
614 the response. It will raise an IOError exception if there was
615 no response within the timeout window.
618 if 'context' not in kwargs:
619 context = self.get_context()
620 kwargs['context'] = context
622 context = kwargs['context']
623 kwargs['_vl_msg_id'] = i
625 no_type_conversion = kwargs.pop('_no_type_conversion', False)
628 if self.transport.socket_index:
629 kwargs['client_index'] = self.transport.socket_index
630 except AttributeError:
632 self.validate_args(msgdef, kwargs)
634 logging.debug(call_logger(msgdef, kwargs))
636 b = msgdef.pack(kwargs)
637 self.transport.suspend()
639 self.transport.write(b)
642 # Send a ping after the request - we use its response
643 # to detect that we have seen all results.
644 self._control_ping(context)
646 # Block until we get a reply.
649 msg = self.transport.read()
651 raise VPPIOError(2, 'VPP API client: read failed')
652 r = self.decode_incoming_msg(msg, no_type_conversion)
653 msgname = type(r).__name__
654 if context not in r or r.context == 0 or context != r.context:
655 # Message being queued
656 self.message_queue.put_nowait(r)
662 if msgname == 'control_ping_reply':
667 self.transport.resume()
669 logger.debug(return_logger(rl))
672 def _call_vpp_async(self, i, msg, **kwargs):
673 """Given a message, send the message and await a reply.
675 msgdef - the message packing definition
676 i - the message type index
677 context - context number - chosen at random if not
679 The remainder of the kwargs are the arguments to the API call.
681 if 'context' not in kwargs:
682 context = self.get_context()
683 kwargs['context'] = context
685 context = kwargs['context']
687 if self.transport.socket_index:
688 kwargs['client_index'] = self.transport.socket_index
689 except AttributeError:
690 kwargs['client_index'] = 0
691 kwargs['_vl_msg_id'] = i
694 self.transport.write(b)
696 def register_event_callback(self, callback):
697 """Register a callback for async messages.
699 This will be called for async notifications in sync mode,
700 and all messages in async mode. In sync mode, replies to
701 requests will not come here.
703 callback is a fn(msg_type_name, msg_type) that will be
704 called when a message comes in. While this function is
705 executing, note that (a) you are in a background thread and
706 may wish to use threading.Lock to protect your datastructures,
707 and (b) message processing from VPP will stop (so if you take
708 a long while about it you may provoke reply timeouts or cause
709 VPP to fill the RX buffer). Passing None will disable the
712 self.event_callback = callback
714 def thread_msg_handler(self):
715 """Python thread calling the user registered message handler.
717 This is to emulate the old style event callback scheme. Modern
718 clients should provide their own thread to poll the event
722 r = self.message_queue.get()
723 if r == "terminate event thread":
725 msgname = type(r).__name__
726 if self.event_callback:
727 self.event_callback(msgname, r)
730 return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
731 "logger=%s, read_timeout=%s, use_socket=%s, " \
732 "server_address='%s'>" % (
733 self._apifiles, self.testmode, self.async_thread,
734 self.logger, self.read_timeout, self.use_socket,
738 # Provide the old name for backward compatibility.
741 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4