3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
21 import multiprocessing as mp
32 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
33 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
34 from . macaddress import MACAddress, mac_pton, mac_ntop
36 logger = logging.getLogger(__name__)
38 if sys.version[0] == '2':
44 def metaclass(metaclass):
45 @functools.wraps(metaclass)
47 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
52 class VppEnumType(type):
53 def __getattr__(cls, name):
54 t = vpp_get_type(name)
58 @metaclass(VppEnumType)
59 class VppEnum(object):
63 def vpp_atexit(vpp_weakref):
64 """Clean up VPP connection on shutdown."""
65 vpp_instance = vpp_weakref()
66 if vpp_instance and vpp_instance.transport.connected:
67 vpp_instance.logger.debug('Cleaning up VPP on exit')
68 vpp_instance.disconnect()
71 if sys.version[0] == '2':
79 def call_logger(msgdef, kwargs):
80 s = 'Calling {}('.format(msgdef.name)
81 for k, v in kwargs.items():
82 s += '{}:{} '.format(k, v)
88 s = 'Return from {}'.format(r)
92 class VppApiDynamicMethodHolder(object):
96 class FuncWrapper(object):
97 def __init__(self, func):
99 self.__name__ = func.__name__
100 self.__doc__ = func.__doc__
102 def __call__(self, **kwargs):
103 return self._func(**kwargs)
106 class VPPApiError(Exception):
110 class VPPNotImplementedError(NotImplementedError):
114 class VPPIOError(IOError):
118 class VPPRuntimeError(RuntimeError):
122 class VPPValueError(ValueError):
126 class VPPApiClient(object):
129 This class provides the APIs to VPP. The APIs are loaded
130 from provided .api.json files and makes functions accordingly.
131 These functions are documented in the VPP .api files, as they
132 are dynamically created.
134 Additionally, VPP can send callback messages; this class
135 provides a means to register a callback function to receive
136 these messages in a background thread.
139 VPPApiError = VPPApiError
140 VPPRuntimeError = VPPRuntimeError
141 VPPValueError = VPPValueError
142 VPPNotImplementedError = VPPNotImplementedError
143 VPPIOError = VPPIOError
145 def process_json_file(self, apidef_file):
146 api = json.load(apidef_file)
148 for t in api['enums']:
149 t[0] = 'vl_api_' + t[0] + '_t'
150 types[t[0]] = {'type': 'enum', 'data': t}
151 for t in api['unions']:
152 t[0] = 'vl_api_' + t[0] + '_t'
153 types[t[0]] = {'type': 'union', 'data': t}
154 for t in api['types']:
155 t[0] = 'vl_api_' + t[0] + '_t'
156 types[t[0]] = {'type': 'type', 'data': t}
157 for t, v in api['aliases'].items():
158 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
159 self.services.update(api['services'])
164 for k, v in types.items():
166 if not vpp_get_type(k):
167 if v['type'] == 'enum':
169 VPPEnumType(t[0], t[1:])
172 elif v['type'] == 'union':
174 VPPUnionType(t[0], t[1:])
177 elif v['type'] == 'type':
182 elif v['type'] == 'alias':
187 if len(unresolved) == 0:
190 raise VPPValueError('Unresolved type definitions {}'
195 for m in api['messages']:
197 self.messages[m[0]] = VPPMessage(m[0], m[1:])
198 except VPPNotImplementedError:
199 self.logger.error('Not implemented error for {}'.format(m[0]))
201 def __init__(self, apifiles=None, testmode=False, async_thread=True,
202 logger=None, loglevel=None,
203 read_timeout=5, use_socket=False,
204 server_address='/run/vpp-api.sock'):
205 """Create a VPP API object.
207 apifiles is a list of files containing API
208 descriptions that will be loaded - methods will be
209 dynamically created reflecting these APIs. If not
210 provided this will load the API files from VPP's
211 default install location.
213 logger, if supplied, is the logging logger object to log to.
214 loglevel, if supplied, is the log level this logger is set
215 to report at (from the loglevels in the logging module).
218 logger = logging.getLogger(__name__)
219 if loglevel is not None:
220 logger.setLevel(loglevel)
227 self.header = VPPType('header', [['u16', 'msgid'],
228 ['u32', 'client_index']])
230 self.event_callback = None
231 self.message_queue = queue.Queue()
232 self.read_timeout = read_timeout
233 self.async_thread = async_thread
234 self.event_thread = None
235 self.testmode = testmode
236 self.use_socket = use_socket
237 self.server_address = server_address
238 self._apifiles = apifiles
241 from . vpp_transport_socket import VppTransport
243 from . vpp_transport_shmem import VppTransport
246 # Pick up API definitions from default directory
248 apifiles = self.find_api_files()
250 # In test mode we don't care that we can't find the API files
254 raise VPPRuntimeError
256 for file in apifiles:
257 with open(file) as apidef_file:
258 self.process_json_file(apidef_file)
260 self.apifiles = apifiles
263 if len(self.messages) == 0 and not testmode:
264 raise VPPValueError(1, 'Missing JSON message definitions')
266 self.transport = VppTransport(self, read_timeout=read_timeout,
267 server_address=server_address)
268 # Make sure we allow VPP to clean up the message rings.
269 atexit.register(vpp_atexit, weakref.ref(self))
271 class ContextId(object):
272 """Multiprocessing-safe provider of unique context IDs."""
274 self.context = mp.Value(ctypes.c_uint, 0)
275 self.lock = mp.Lock()
278 """Get a new unique (or, at least, not recently used) context."""
280 self.context.value += 1
281 return self.context.value
282 get_context = ContextId()
284 def get_type(self, name):
285 return vpp_get_type(name)
288 def find_api_dir(cls):
289 """Attempt to find the best directory in which API definition
290 files may reside. If the value VPP_API_DIR exists in the environment
291 then it is first on the search list. If we're inside a recognized
292 location in a VPP source tree (src/scripts and src/vpp-api/python)
293 then entries from there to the likely locations in build-root are
294 added. Finally the location used by system packages is added.
296 :returns: A single directory name, or None if no such directory
299 dirs = [cls.apidir] if cls.apidir else []
301 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
302 # in which case, plot a course to likely places in the src tree
303 import __main__ as main
304 if hasattr(main, '__file__'):
305 # get the path of the calling script
306 localdir = os.path.dirname(os.path.realpath(main.__file__))
308 # use cwd if there is no calling script
309 localdir = os.getcwd()
310 localdir_s = localdir.split(os.path.sep)
313 """Match dir against right-hand components of the script dir"""
314 d = dir.split('/') # param 'dir' assumes a / separator
316 return len(localdir_s) > length and localdir_s[-length:] == d
318 def sdir(srcdir, variant):
319 """Build a path from srcdir to the staged API files of
320 'variant' (typically '' or '_debug')"""
321 # Since 'core' and 'plugin' files are staged
322 # in separate directories, we target the parent dir.
323 return os.path.sep.join((
326 'install-vpp%s-native' % variant,
334 if dmatch('src/scripts'):
335 srcdir = os.path.sep.join(localdir_s[:-2])
336 elif dmatch('src/vpp-api/python'):
337 srcdir = os.path.sep.join(localdir_s[:-3])
339 # we're apparently running tests
340 srcdir = os.path.sep.join(localdir_s[:-1])
343 # we're in the source tree, try both the debug and release
345 dirs.append(sdir(srcdir, '_debug'))
346 dirs.append(sdir(srcdir, ''))
348 # Test for staged copies of the scripts
349 # For these, since we explicitly know if we're running a debug versus
350 # release variant, target only the relevant directory
351 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
352 srcdir = os.path.sep.join(localdir_s[:-4])
353 dirs.append(sdir(srcdir, '_debug'))
354 if dmatch('build-root/install-vpp-native/vpp/bin'):
355 srcdir = os.path.sep.join(localdir_s[:-4])
356 dirs.append(sdir(srcdir, ''))
358 # finally, try the location system packages typically install into
359 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
361 # check the directories for existence; first one wins
363 if os.path.isdir(dir):
369 def find_api_files(cls, api_dir=None, patterns='*'):
370 """Find API definition files from the given directory tree with the
371 given pattern. If no directory is given then find_api_dir() is used
372 to locate one. If no pattern is given then all definition files found
373 in the directory tree are used.
375 :param api_dir: A directory tree in which to locate API definition
376 files; subdirectories are descended into.
377 If this is None then find_api_dir() is called to discover it.
378 :param patterns: A list of patterns to use in each visited directory
379 when looking for files.
380 This can be a list/tuple object or a comma-separated string of
381 patterns. Each value in the list will have leading/trialing
383 The pattern specifies the first part of the filename, '.api.json'
385 The results are de-duplicated, thus overlapping patterns are fine.
386 If this is None it defaults to '*' meaning "all API files".
387 :returns: A list of file paths for the API files found.
390 api_dir = cls.find_api_dir()
392 raise VPPApiError("api_dir cannot be located")
394 if isinstance(patterns, list) or isinstance(patterns, tuple):
395 patterns = [p.strip() + '.api.json' for p in patterns]
397 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
400 for root, dirnames, files in os.walk(api_dir):
401 # iterate all given patterns and de-dup the result
402 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
403 for filename in files:
404 api_files.append(os.path.join(root, filename))
410 if not hasattr(self, "_api"):
411 raise VPPApiError("Not connected, api definitions not available")
414 def make_function(self, msg, i, multipart, do_async):
417 return self._call_vpp_async(i, msg, **kwargs)
420 return self._call_vpp(i, msg, multipart, **kwargs)
422 f.__name__ = str(msg.name)
423 f.__doc__ = ", ".join(["%s %s" %
424 (msg.fieldtypes[j], k)
425 for j, k in enumerate(msg.fields)])
430 def _register_functions(self, do_async=False):
431 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
432 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
433 self._api = VppApiDynamicMethodHolder()
434 for name, msg in vpp_iterator(self.messages):
435 n = name + '_' + msg.crc[2:]
436 i = self.transport.get_msg_index(n.encode('utf-8'))
438 self.id_msgdef[i] = msg
439 self.id_names[i] = name
441 # Create function for client side messages.
442 if name in self.services:
443 if 'stream' in self.services[name] and \
444 self.services[name]['stream']:
448 f = self.make_function(msg, i, multipart, do_async)
449 setattr(self._api, name, FuncWrapper(f))
452 'No such message type or failed CRC checksum: %s', n)
454 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
456 pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
458 rv = self.transport.connect(name.encode('utf-8'), pfx,
459 msg_handler, rx_qlen)
461 raise VPPIOError(2, 'Connect failed')
462 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
463 self._register_functions(do_async=do_async)
465 # Initialise control ping
466 crc = self.messages['control_ping'].crc
467 self.control_ping_index = self.transport.get_msg_index(
468 ('control_ping' + '_' + crc[2:]).encode('utf-8'))
469 self.control_ping_msgdef = self.messages['control_ping']
470 if self.async_thread:
471 self.event_thread = threading.Thread(
472 target=self.thread_msg_handler)
473 self.event_thread.daemon = True
474 self.event_thread.start()
476 self.event_thread = None
479 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
482 name - the name of the client.
483 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
484 do_async - if true, messages are sent without waiting for a reply
485 rx_qlen - the length of the VPP message receive queue between
488 msg_handler = self.transport.get_callback(do_async)
489 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
492 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
493 """Attach to VPP in synchronous mode. Application must poll for events.
495 name - the name of the client.
496 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
497 rx_qlen - the length of the VPP message receive queue between
501 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
504 def disconnect(self):
505 """Detach from VPP."""
506 rv = self.transport.disconnect()
507 if self.event_thread is not None:
508 self.message_queue.put("terminate event thread")
511 def msg_handler_sync(self, msg):
512 """Process an incoming message from VPP in sync mode.
514 The message may be a reply or it may be an async notification.
516 r = self.decode_incoming_msg(msg)
520 # If we have a context, then use the context to find any
521 # request waiting for a reply
523 if hasattr(r, 'context') and r.context > 0:
527 # No context -> async notification that we feed to the callback
528 self.message_queue.put_nowait(r)
530 raise VPPIOError(2, 'RPC reply message received in event handler')
532 def has_context(self, msg):
536 header = VPPType('header_with_context', [['u16', 'msgid'],
537 ['u32', 'client_index'],
540 (i, ci, context), size = header.unpack(msg, 0)
541 if self.id_names[i] == 'rx_thread_exit':
545 # Decode message and returns a tuple.
547 msgobj = self.id_msgdef[i]
548 if 'context' in msgobj.field_by_name and context >= 0:
552 def decode_incoming_msg(self, msg, no_type_conversion=False):
554 self.logger.warning('vpp_api.read failed')
557 (i, ci), size = self.header.unpack(msg, 0)
558 if self.id_names[i] == 'rx_thread_exit':
562 # Decode message and returns a tuple.
564 msgobj = self.id_msgdef[i]
566 raise VPPIOError(2, 'Reply message undefined')
568 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
571 def msg_handler_async(self, msg):
572 """Process a message from VPP in async mode.
574 In async mode, all messages are returned to the callback.
576 r = self.decode_incoming_msg(msg)
580 msgname = type(r).__name__
582 if self.event_callback:
583 self.event_callback(msgname, r)
585 def _control_ping(self, context):
586 """Send a ping command."""
587 self._call_vpp_async(self.control_ping_index,
588 self.control_ping_msgdef,
591 def validate_args(self, msg, kwargs):
592 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
594 raise VPPValueError('Invalid argument {} to {}'
595 .format(list(d), msg.name))
597 def _call_vpp(self, i, msgdef, multipart, **kwargs):
598 """Given a message, send the message and await a reply.
600 msgdef - the message packing definition
601 i - the message type index
602 multipart - True if the message returns multiple
604 context - context number - chosen at random if not
606 The remainder of the kwargs are the arguments to the API call.
608 The return value is the message or message array containing
609 the response. It will raise an IOError exception if there was
610 no response within the timeout window.
613 if 'context' not in kwargs:
614 context = self.get_context()
615 kwargs['context'] = context
617 context = kwargs['context']
618 kwargs['_vl_msg_id'] = i
620 no_type_conversion = kwargs.pop('_no_type_conversion', False)
623 if self.transport.socket_index:
624 kwargs['client_index'] = self.transport.socket_index
625 except AttributeError:
627 self.validate_args(msgdef, kwargs)
629 logging.debug(call_logger(msgdef, kwargs))
631 b = msgdef.pack(kwargs)
632 self.transport.suspend()
634 self.transport.write(b)
637 # Send a ping after the request - we use its response
638 # to detect that we have seen all results.
639 self._control_ping(context)
641 # Block until we get a reply.
644 msg = self.transport.read()
646 raise VPPIOError(2, 'VPP API client: read failed')
647 r = self.decode_incoming_msg(msg, no_type_conversion)
648 msgname = type(r).__name__
649 if context not in r or r.context == 0 or context != r.context:
650 # Message being queued
651 self.message_queue.put_nowait(r)
657 if msgname == 'control_ping_reply':
662 self.transport.resume()
664 logger.debug(return_logger(rl))
667 def _call_vpp_async(self, i, msg, **kwargs):
668 """Given a message, send the message and await a reply.
670 msgdef - the message packing definition
671 i - the message type index
672 context - context number - chosen at random if not
674 The remainder of the kwargs are the arguments to the API call.
676 if 'context' not in kwargs:
677 context = self.get_context()
678 kwargs['context'] = context
680 context = kwargs['context']
682 if self.transport.socket_index:
683 kwargs['client_index'] = self.transport.socket_index
684 except AttributeError:
685 kwargs['client_index'] = 0
686 kwargs['_vl_msg_id'] = i
689 self.transport.write(b)
691 def register_event_callback(self, callback):
692 """Register a callback for async messages.
694 This will be called for async notifications in sync mode,
695 and all messages in async mode. In sync mode, replies to
696 requests will not come here.
698 callback is a fn(msg_type_name, msg_type) that will be
699 called when a message comes in. While this function is
700 executing, note that (a) you are in a background thread and
701 may wish to use threading.Lock to protect your datastructures,
702 and (b) message processing from VPP will stop (so if you take
703 a long while about it you may provoke reply timeouts or cause
704 VPP to fill the RX buffer). Passing None will disable the
707 self.event_callback = callback
709 def thread_msg_handler(self):
710 """Python thread calling the user registered message handler.
712 This is to emulate the old style event callback scheme. Modern
713 clients should provide their own thread to poll the event
717 r = self.message_queue.get()
718 if r == "terminate event thread":
720 msgname = type(r).__name__
721 if self.event_callback:
722 self.event_callback(msgname, r)
725 return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
726 "logger=%s, read_timeout=%s, use_socket=%s, " \
727 "server_address='%s'>" % (
728 self._apifiles, self.testmode, self.async_thread,
729 self.logger, self.read_timeout, self.use_socket,
733 # Provide the old name for backward compatibility.
736 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4