3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
21 import multiprocessing as mp
30 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType
31 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
33 if sys.version[0] == '2':
38 __all__ = ('FuncWrapper', 'VPP', 'VppApiDynamicMethodHolder',
39 'VppEnum', 'VppEnumType',
40 'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
44 def metaclass(metaclass):
45 @functools.wraps(metaclass)
47 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
52 class VppEnumType(type):
53 def __getattr__(cls, name):
54 t = vpp_get_type(name)
58 @metaclass(VppEnumType)
59 class VppEnum(object):
63 def vpp_atexit(vpp_weakref):
64 """Clean up VPP connection on shutdown."""
65 vpp_instance = vpp_weakref()
66 if vpp_instance and vpp_instance.transport.connected:
67 vpp_instance.logger.debug('Cleaning up VPP on exit')
68 vpp_instance.disconnect()
71 if sys.version[0] == '2':
79 class VppApiDynamicMethodHolder(object):
83 class FuncWrapper(object):
84 def __init__(self, func):
86 self.__name__ = func.__name__
87 self.__doc__ = func.__doc__
89 def __call__(self, **kwargs):
90 return self._func(**kwargs)
93 return '<FuncWrapper(func=<%s(%s)>)>' % (self.__name__, self.__doc__)
96 class VPPApiError(Exception):
100 class VPPNotImplementedError(NotImplementedError):
104 class VPPIOError(IOError):
108 class VPPRuntimeError(RuntimeError):
112 class VPPValueError(ValueError):
116 class VPPApiClient(object):
119 This class provides the APIs to VPP. The APIs are loaded
120 from provided .api.json files and makes functions accordingly.
121 These functions are documented in the VPP .api files, as they
122 are dynamically created.
124 Additionally, VPP can send callback messages; this class
125 provides a means to register a callback function to receive
126 these messages in a background thread.
129 VPPApiError = VPPApiError
130 VPPRuntimeError = VPPRuntimeError
131 VPPValueError = VPPValueError
132 VPPNotImplementedError = VPPNotImplementedError
133 VPPIOError = VPPIOError
135 def process_json_file(self, apidef_file):
136 api = json.load(apidef_file)
138 for t in api['enums']:
139 t[0] = 'vl_api_' + t[0] + '_t'
140 types[t[0]] = {'type': 'enum', 'data': t}
141 for t in api['unions']:
142 t[0] = 'vl_api_' + t[0] + '_t'
143 types[t[0]] = {'type': 'union', 'data': t}
144 for t in api['types']:
145 t[0] = 'vl_api_' + t[0] + '_t'
146 types[t[0]] = {'type': 'type', 'data': t}
147 for t, v in api['aliases'].items():
148 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
149 self.services.update(api['services'])
154 for k, v in types.items():
156 if not vpp_get_type(k):
157 if v['type'] == 'enum':
159 VPPEnumType(t[0], t[1:])
162 elif v['type'] == 'union':
164 VPPUnionType(t[0], t[1:])
167 elif v['type'] == 'type':
172 elif v['type'] == 'alias':
177 if len(unresolved) == 0:
180 raise VPPValueError('Unresolved type definitions {}'
185 for m in api['messages']:
187 self.messages[m[0]] = VPPMessage(m[0], m[1:])
188 except VPPNotImplementedError:
189 self.logger.error('Not implemented error for {}'.format(m[0]))
191 def __init__(self, apifiles=None, testmode=False, async_thread=True,
192 logger=None, loglevel=None,
193 read_timeout=5, use_socket=False,
194 server_address='/run/vpp/api.sock'):
195 """Create a VPP API object.
197 apifiles is a list of files containing API
198 descriptions that will be loaded - methods will be
199 dynamically created reflecting these APIs. If not
200 provided this will load the API files from VPP's
201 default install location.
203 logger, if supplied, is the logging logger object to log to.
204 loglevel, if supplied, is the log level this logger is set
205 to report at (from the loglevels in the logging module).
208 logger = logging.getLogger(
209 "{}.{}".format(__name__, self.__class__.__name__))
210 if loglevel is not None:
211 logger.setLevel(loglevel)
218 self.header = VPPType('header', [['u16', 'msgid'],
219 ['u32', 'client_index']])
221 self.event_callback = None
222 self.message_queue = queue.Queue()
223 self.read_timeout = read_timeout
224 self.async_thread = async_thread
225 self.event_thread = None
226 self.testmode = testmode
227 self.use_socket = use_socket
228 self.server_address = server_address
229 self._apifiles = apifiles
232 from . vpp_transport_socket import VppTransport
234 from . vpp_transport_shmem import VppTransport
237 # Pick up API definitions from default directory
239 apifiles = self.find_api_files()
241 # In test mode we don't care that we can't find the API files
245 raise VPPRuntimeError
247 for file in apifiles:
248 with open(file) as apidef_file:
249 self.process_json_file(apidef_file)
251 self.apifiles = apifiles
254 if len(self.messages) == 0 and not testmode:
255 raise VPPValueError(1, 'Missing JSON message definitions')
257 self.transport = VppTransport(self, read_timeout=read_timeout,
258 server_address=server_address)
259 # Make sure we allow VPP to clean up the message rings.
260 atexit.register(vpp_atexit, weakref.ref(self))
262 class ContextId(object):
263 """Multiprocessing-safe provider of unique context IDs."""
265 self.context = mp.Value(ctypes.c_uint, 0)
266 self.lock = mp.Lock()
269 """Get a new unique (or, at least, not recently used) context."""
271 self.context.value += 1
272 return self.context.value
273 get_context = ContextId()
275 def get_type(self, name):
276 return vpp_get_type(name)
279 def find_api_dir(cls):
280 """Attempt to find the best directory in which API definition
281 files may reside. If the value VPP_API_DIR exists in the environment
282 then it is first on the search list. If we're inside a recognized
283 location in a VPP source tree (src/scripts and src/vpp-api/python)
284 then entries from there to the likely locations in build-root are
285 added. Finally the location used by system packages is added.
287 :returns: A single directory name, or None if no such directory
290 dirs = [cls.apidir] if cls.apidir else []
292 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
293 # in which case, plot a course to likely places in the src tree
294 import __main__ as main
295 if hasattr(main, '__file__'):
296 # get the path of the calling script
297 localdir = os.path.dirname(os.path.realpath(main.__file__))
299 # use cwd if there is no calling script
300 localdir = os.getcwd()
301 localdir_s = localdir.split(os.path.sep)
304 """Match dir against right-hand components of the script dir"""
305 d = dir.split('/') # param 'dir' assumes a / separator
307 return len(localdir_s) > length and localdir_s[-length:] == d
309 def sdir(srcdir, variant):
310 """Build a path from srcdir to the staged API files of
311 'variant' (typically '' or '_debug')"""
312 # Since 'core' and 'plugin' files are staged
313 # in separate directories, we target the parent dir.
314 return os.path.sep.join((
317 'install-vpp%s-native' % variant,
325 if dmatch('src/scripts'):
326 srcdir = os.path.sep.join(localdir_s[:-2])
327 elif dmatch('src/vpp-api/python'):
328 srcdir = os.path.sep.join(localdir_s[:-3])
330 # we're apparently running tests
331 srcdir = os.path.sep.join(localdir_s[:-1])
334 # we're in the source tree, try both the debug and release
336 dirs.append(sdir(srcdir, '_debug'))
337 dirs.append(sdir(srcdir, ''))
339 # Test for staged copies of the scripts
340 # For these, since we explicitly know if we're running a debug versus
341 # release variant, target only the relevant directory
342 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
343 srcdir = os.path.sep.join(localdir_s[:-4])
344 dirs.append(sdir(srcdir, '_debug'))
345 if dmatch('build-root/install-vpp-native/vpp/bin'):
346 srcdir = os.path.sep.join(localdir_s[:-4])
347 dirs.append(sdir(srcdir, ''))
349 # finally, try the location system packages typically install into
350 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
352 # check the directories for existence; first one wins
354 if os.path.isdir(dir):
360 def find_api_files(cls, api_dir=None, patterns='*'):
361 """Find API definition files from the given directory tree with the
362 given pattern. If no directory is given then find_api_dir() is used
363 to locate one. If no pattern is given then all definition files found
364 in the directory tree are used.
366 :param api_dir: A directory tree in which to locate API definition
367 files; subdirectories are descended into.
368 If this is None then find_api_dir() is called to discover it.
369 :param patterns: A list of patterns to use in each visited directory
370 when looking for files.
371 This can be a list/tuple object or a comma-separated string of
372 patterns. Each value in the list will have leading/trialing
374 The pattern specifies the first part of the filename, '.api.json'
376 The results are de-duplicated, thus overlapping patterns are fine.
377 If this is None it defaults to '*' meaning "all API files".
378 :returns: A list of file paths for the API files found.
381 api_dir = cls.find_api_dir()
383 raise VPPApiError("api_dir cannot be located")
385 if isinstance(patterns, list) or isinstance(patterns, tuple):
386 patterns = [p.strip() + '.api.json' for p in patterns]
388 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
391 for root, dirnames, files in os.walk(api_dir):
392 # iterate all given patterns and de-dup the result
393 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
394 for filename in files:
395 api_files.append(os.path.join(root, filename))
401 if not hasattr(self, "_api"):
402 raise VPPApiError("Not connected, api definitions not available")
405 def make_function(self, msg, i, multipart, do_async):
408 return self._call_vpp_async(i, msg, **kwargs)
411 return self._call_vpp(i, msg, multipart, **kwargs)
413 f.__name__ = str(msg.name)
414 f.__doc__ = ", ".join(["%s %s" %
415 (msg.fieldtypes[j], k)
416 for j, k in enumerate(msg.fields)])
421 def _register_functions(self, do_async=False):
422 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
423 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
424 self._api = VppApiDynamicMethodHolder()
425 for name, msg in vpp_iterator(self.messages):
426 n = name + '_' + msg.crc[2:]
427 i = self.transport.get_msg_index(n.encode('utf-8'))
429 self.id_msgdef[i] = msg
430 self.id_names[i] = name
432 # Create function for client side messages.
433 if name in self.services:
434 if 'stream' in self.services[name] and \
435 self.services[name]['stream']:
439 f = self.make_function(msg, i, multipart, do_async)
440 setattr(self._api, name, FuncWrapper(f))
443 'No such message type or failed CRC checksum: %s', n)
445 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
447 pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
449 rv = self.transport.connect(name.encode('utf-8'), pfx,
450 msg_handler, rx_qlen)
452 raise VPPIOError(2, 'Connect failed')
453 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
454 self._register_functions(do_async=do_async)
456 # Initialise control ping
457 crc = self.messages['control_ping'].crc
458 self.control_ping_index = self.transport.get_msg_index(
459 ('control_ping' + '_' + crc[2:]).encode('utf-8'))
460 self.control_ping_msgdef = self.messages['control_ping']
461 if self.async_thread:
462 self.event_thread = threading.Thread(
463 target=self.thread_msg_handler)
464 self.event_thread.daemon = True
465 self.event_thread.start()
467 self.event_thread = None
470 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
473 name - the name of the client.
474 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
475 do_async - if true, messages are sent without waiting for a reply
476 rx_qlen - the length of the VPP message receive queue between
479 msg_handler = self.transport.get_callback(do_async)
480 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
483 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
484 """Attach to VPP in synchronous mode. Application must poll for events.
486 name - the name of the client.
487 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
488 rx_qlen - the length of the VPP message receive queue between
492 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
495 def disconnect(self):
496 """Detach from VPP."""
497 rv = self.transport.disconnect()
498 if self.event_thread is not None:
499 self.message_queue.put("terminate event thread")
502 def msg_handler_sync(self, msg):
503 """Process an incoming message from VPP in sync mode.
505 The message may be a reply or it may be an async notification.
507 r = self.decode_incoming_msg(msg)
511 # If we have a context, then use the context to find any
512 # request waiting for a reply
514 if hasattr(r, 'context') and r.context > 0:
518 # No context -> async notification that we feed to the callback
519 self.message_queue.put_nowait(r)
521 raise VPPIOError(2, 'RPC reply message received in event handler')
523 def has_context(self, msg):
527 header = VPPType('header_with_context', [['u16', 'msgid'],
528 ['u32', 'client_index'],
531 (i, ci, context), size = header.unpack(msg, 0)
532 if self.id_names[i] == 'rx_thread_exit':
536 # Decode message and returns a tuple.
538 msgobj = self.id_msgdef[i]
539 if 'context' in msgobj.field_by_name and context >= 0:
543 def decode_incoming_msg(self, msg, no_type_conversion=False):
545 self.logger.warning('vpp_api.read failed')
548 (i, ci), size = self.header.unpack(msg, 0)
549 if self.id_names[i] == 'rx_thread_exit':
553 # Decode message and returns a tuple.
555 msgobj = self.id_msgdef[i]
557 raise VPPIOError(2, 'Reply message undefined')
559 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
562 def msg_handler_async(self, msg):
563 """Process a message from VPP in async mode.
565 In async mode, all messages are returned to the callback.
567 r = self.decode_incoming_msg(msg)
571 msgname = type(r).__name__
573 if self.event_callback:
574 self.event_callback(msgname, r)
576 def _control_ping(self, context):
577 """Send a ping command."""
578 self._call_vpp_async(self.control_ping_index,
579 self.control_ping_msgdef,
582 def validate_args(self, msg, kwargs):
583 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
585 raise VPPValueError('Invalid argument {} to {}'
586 .format(list(d), msg.name))
588 def _call_vpp(self, i, msgdef, multipart, **kwargs):
589 """Given a message, send the message and await a reply.
591 msgdef - the message packing definition
592 i - the message type index
593 multipart - True if the message returns multiple
595 context - context number - chosen at random if not
597 The remainder of the kwargs are the arguments to the API call.
599 The return value is the message or message array containing
600 the response. It will raise an IOError exception if there was
601 no response within the timeout window.
604 if 'context' not in kwargs:
605 context = self.get_context()
606 kwargs['context'] = context
608 context = kwargs['context']
609 kwargs['_vl_msg_id'] = i
611 no_type_conversion = kwargs.pop('_no_type_conversion', False)
614 if self.transport.socket_index:
615 kwargs['client_index'] = self.transport.socket_index
616 except AttributeError:
618 self.validate_args(msgdef, kwargs)
620 s = 'Calling {}({})'.format(msgdef.name,
621 ','.join(['{!r}:{!r}'.format(k, v) for k, v in kwargs.items()]))
624 b = msgdef.pack(kwargs)
625 self.transport.suspend()
627 self.transport.write(b)
630 # Send a ping after the request - we use its response
631 # to detect that we have seen all results.
632 self._control_ping(context)
634 # Block until we get a reply.
637 msg = self.transport.read()
639 raise VPPIOError(2, 'VPP API client: read failed')
640 r = self.decode_incoming_msg(msg, no_type_conversion)
641 msgname = type(r).__name__
642 if context not in r or r.context == 0 or context != r.context:
643 # Message being queued
644 self.message_queue.put_nowait(r)
650 if msgname == 'control_ping_reply':
655 self.transport.resume()
657 self.logger.debug('Return from {!r}'.format(r))
660 def _call_vpp_async(self, i, msg, **kwargs):
661 """Given a message, send the message and await a reply.
663 msgdef - the message packing definition
664 i - the message type index
665 context - context number - chosen at random if not
667 The remainder of the kwargs are the arguments to the API call.
669 if 'context' not in kwargs:
670 context = self.get_context()
671 kwargs['context'] = context
673 context = kwargs['context']
675 if self.transport.socket_index:
676 kwargs['client_index'] = self.transport.socket_index
677 except AttributeError:
678 kwargs['client_index'] = 0
679 kwargs['_vl_msg_id'] = i
682 self.transport.write(b)
684 def register_event_callback(self, callback):
685 """Register a callback for async messages.
687 This will be called for async notifications in sync mode,
688 and all messages in async mode. In sync mode, replies to
689 requests will not come here.
691 callback is a fn(msg_type_name, msg_type) that will be
692 called when a message comes in. While this function is
693 executing, note that (a) you are in a background thread and
694 may wish to use threading.Lock to protect your datastructures,
695 and (b) message processing from VPP will stop (so if you take
696 a long while about it you may provoke reply timeouts or cause
697 VPP to fill the RX buffer). Passing None will disable the
700 self.event_callback = callback
702 def thread_msg_handler(self):
703 """Python thread calling the user registered message handler.
705 This is to emulate the old style event callback scheme. Modern
706 clients should provide their own thread to poll the event
710 r = self.message_queue.get()
711 if r == "terminate event thread":
713 msgname = type(r).__name__
714 if self.event_callback:
715 self.event_callback(msgname, r)
718 return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
719 "logger=%s, read_timeout=%s, use_socket=%s, " \
720 "server_address='%s'>" % (
721 self._apifiles, self.testmode, self.async_thread,
722 self.logger, self.read_timeout, self.use_socket,
726 # Provide the old name for backward compatibility.
729 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4