3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
30 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
31 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
32 from . macaddress import MACAddress, mac_pton, mac_ntop
34 logger = logging.getLogger(__name__)
36 if sys.version[0] == '2':
42 def metaclass(metaclass):
43 @functools.wraps(metaclass)
45 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
50 class VppEnumType(type):
51 def __getattr__(cls, name):
52 t = vpp_get_type(name)
56 @metaclass(VppEnumType)
57 class VppEnum(object):
61 def vpp_atexit(vpp_weakref):
62 """Clean up VPP connection on shutdown."""
63 vpp_instance = vpp_weakref()
64 if vpp_instance and vpp_instance.transport.connected:
65 vpp_instance.logger.debug('Cleaning up VPP on exit')
66 vpp_instance.disconnect()
69 if sys.version[0] == '2':
77 def call_logger(msgdef, kwargs):
78 s = 'Calling {}('.format(msgdef.name)
79 for k, v in kwargs.items():
80 s += '{}:{} '.format(k, v)
86 s = 'Return from {}'.format(r)
90 class VppApiDynamicMethodHolder(object):
94 class FuncWrapper(object):
95 def __init__(self, func):
97 self.__name__ = func.__name__
98 self.__doc__ = func.__doc__
100 def __call__(self, **kwargs):
101 return self._func(**kwargs)
104 class VPPApiError(Exception):
108 class VPPNotImplementedError(NotImplementedError):
112 class VPPIOError(IOError):
116 class VPPRuntimeError(RuntimeError):
120 class VPPValueError(ValueError):
127 This class provides the APIs to VPP. The APIs are loaded
128 from provided .api.json files and makes functions accordingly.
129 These functions are documented in the VPP .api files, as they
130 are dynamically created.
132 Additionally, VPP can send callback messages; this class
133 provides a means to register a callback function to receive
134 these messages in a background thread.
136 VPPApiError = VPPApiError
137 VPPRuntimeError = VPPRuntimeError
138 VPPValueError = VPPValueError
139 VPPNotImplementedError = VPPNotImplementedError
140 VPPIOError = VPPIOError
142 def process_json_file(self, apidef_file):
143 api = json.load(apidef_file)
145 for t in api['enums']:
146 t[0] = 'vl_api_' + t[0] + '_t'
147 types[t[0]] = {'type': 'enum', 'data': t}
148 for t in api['unions']:
149 t[0] = 'vl_api_' + t[0] + '_t'
150 types[t[0]] = {'type': 'union', 'data': t}
151 for t in api['types']:
152 t[0] = 'vl_api_' + t[0] + '_t'
153 types[t[0]] = {'type': 'type', 'data': t}
154 for t, v in api['aliases'].items():
155 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
156 self.services.update(api['services'])
161 for k, v in types.items():
163 if not vpp_get_type(k):
164 if v['type'] == 'enum':
166 VPPEnumType(t[0], t[1:])
169 elif v['type'] == 'union':
171 VPPUnionType(t[0], t[1:])
174 elif v['type'] == 'type':
179 elif v['type'] == 'alias':
184 if len(unresolved) == 0:
187 raise VPPValueError('Unresolved type definitions {}'
192 for m in api['messages']:
194 self.messages[m[0]] = VPPMessage(m[0], m[1:])
195 except VPPNotImplementedError:
196 self.logger.error('Not implemented error for {}'.format(m[0]))
198 def __init__(self, apifiles=None, testmode=False, async_thread=True,
199 logger=None, loglevel=None,
200 read_timeout=5, use_socket=False,
201 server_address='/run/vpp-api.sock'):
202 """Create a VPP API object.
204 apifiles is a list of files containing API
205 descriptions that will be loaded - methods will be
206 dynamically created reflecting these APIs. If not
207 provided this will load the API files from VPP's
208 default install location.
210 logger, if supplied, is the logging logger object to log to.
211 loglevel, if supplied, is the log level this logger is set
212 to report at (from the loglevels in the logging module).
215 logger = logging.getLogger(__name__)
216 if loglevel is not None:
217 logger.setLevel(loglevel)
224 self.header = VPPType('header', [['u16', 'msgid'],
225 ['u32', 'client_index']])
227 self.event_callback = None
228 self.message_queue = queue.Queue()
229 self.read_timeout = read_timeout
230 self.async_thread = async_thread
233 from . vpp_transport_socket import VppTransport
235 from . vpp_transport_shmem import VppTransport
238 # Pick up API definitions from default directory
240 apifiles = self.find_api_files()
242 # In test mode we don't care that we can't find the API files
246 raise VPPRuntimeError
248 for file in apifiles:
249 with open(file) as apidef_file:
250 self.process_json_file(apidef_file)
252 self.apifiles = apifiles
255 if len(self.messages) == 0 and not testmode:
256 raise VPPValueError(1, 'Missing JSON message definitions')
258 self.transport = VppTransport(self, read_timeout=read_timeout,
259 server_address=server_address)
260 # Make sure we allow VPP to clean up the message rings.
261 atexit.register(vpp_atexit, weakref.ref(self))
263 class ContextId(object):
264 """Thread-safe provider of unique context IDs."""
267 self.lock = threading.Lock()
270 """Get a new unique (or, at least, not recently used) context."""
274 get_context = ContextId()
276 def get_type(self, name):
277 return vpp_get_type(name)
280 def find_api_dir(cls):
281 """Attempt to find the best directory in which API definition
282 files may reside. If the value VPP_API_DIR exists in the environment
283 then it is first on the search list. If we're inside a recognized
284 location in a VPP source tree (src/scripts and src/vpp-api/python)
285 then entries from there to the likely locations in build-root are
286 added. Finally the location used by system packages is added.
288 :returns: A single directory name, or None if no such directory
293 if 'VPP_API_DIR' in os.environ:
294 dirs.append(os.environ['VPP_API_DIR'])
296 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
297 # in which case, plot a course to likely places in the src tree
298 import __main__ as main
299 if hasattr(main, '__file__'):
300 # get the path of the calling script
301 localdir = os.path.dirname(os.path.realpath(main.__file__))
303 # use cwd if there is no calling script
304 localdir = os.getcwd()
305 localdir_s = localdir.split(os.path.sep)
308 """Match dir against right-hand components of the script dir"""
309 d = dir.split('/') # param 'dir' assumes a / separator
311 return len(localdir_s) > length and localdir_s[-length:] == d
313 def sdir(srcdir, variant):
314 """Build a path from srcdir to the staged API files of
315 'variant' (typically '' or '_debug')"""
316 # Since 'core' and 'plugin' files are staged
317 # in separate directories, we target the parent dir.
318 return os.path.sep.join((
321 'install-vpp%s-native' % variant,
329 if dmatch('src/scripts'):
330 srcdir = os.path.sep.join(localdir_s[:-2])
331 elif dmatch('src/vpp-api/python'):
332 srcdir = os.path.sep.join(localdir_s[:-3])
334 # we're apparently running tests
335 srcdir = os.path.sep.join(localdir_s[:-1])
338 # we're in the source tree, try both the debug and release
340 dirs.append(sdir(srcdir, '_debug'))
341 dirs.append(sdir(srcdir, ''))
343 # Test for staged copies of the scripts
344 # For these, since we explicitly know if we're running a debug versus
345 # release variant, target only the relevant directory
346 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
347 srcdir = os.path.sep.join(localdir_s[:-4])
348 dirs.append(sdir(srcdir, '_debug'))
349 if dmatch('build-root/install-vpp-native/vpp/bin'):
350 srcdir = os.path.sep.join(localdir_s[:-4])
351 dirs.append(sdir(srcdir, ''))
353 # finally, try the location system packages typically install into
354 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
356 # check the directories for existance; first one wins
358 if os.path.isdir(dir):
364 def find_api_files(cls, api_dir=None, patterns='*'):
365 """Find API definition files from the given directory tree with the
366 given pattern. If no directory is given then find_api_dir() is used
367 to locate one. If no pattern is given then all definition files found
368 in the directory tree are used.
370 :param api_dir: A directory tree in which to locate API definition
371 files; subdirectories are descended into.
372 If this is None then find_api_dir() is called to discover it.
373 :param patterns: A list of patterns to use in each visited directory
374 when looking for files.
375 This can be a list/tuple object or a comma-separated string of
376 patterns. Each value in the list will have leading/trialing
378 The pattern specifies the first part of the filename, '.api.json'
380 The results are de-duplicated, thus overlapping patterns are fine.
381 If this is None it defaults to '*' meaning "all API files".
382 :returns: A list of file paths for the API files found.
385 api_dir = cls.find_api_dir()
387 raise VPPApiError("api_dir cannot be located")
389 if isinstance(patterns, list) or isinstance(patterns, tuple):
390 patterns = [p.strip() + '.api.json' for p in patterns]
392 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
395 for root, dirnames, files in os.walk(api_dir):
396 # iterate all given patterns and de-dup the result
397 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
398 for filename in files:
399 api_files.append(os.path.join(root, filename))
405 if not hasattr(self, "_api"):
406 raise VPPApiError("Not connected, api definitions not available")
409 def make_function(self, msg, i, multipart, do_async):
412 return self._call_vpp_async(i, msg, **kwargs)
415 return self._call_vpp(i, msg, multipart, **kwargs)
417 f.__name__ = str(msg.name)
418 f.__doc__ = ", ".join(["%s %s" %
419 (msg.fieldtypes[j], k)
420 for j, k in enumerate(msg.fields)])
425 def _register_functions(self, do_async=False):
426 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
427 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
428 self._api = VppApiDynamicMethodHolder()
429 for name, msg in vpp_iterator(self.messages):
430 n = name + '_' + msg.crc[2:]
431 i = self.transport.get_msg_index(n.encode('utf-8'))
433 self.id_msgdef[i] = msg
434 self.id_names[i] = name
436 # Create function for client side messages.
437 if name in self.services:
438 if 'stream' in self.services[name] and \
439 self.services[name]['stream']:
443 f = self.make_function(msg, i, multipart, do_async)
444 setattr(self._api, name, FuncWrapper(f))
447 'No such message type or failed CRC checksum: %s', n)
449 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
451 pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
453 rv = self.transport.connect(name.encode('utf-8'), pfx,
454 msg_handler, rx_qlen)
456 raise VPPIOError(2, 'Connect failed')
457 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
458 self._register_functions(do_async=do_async)
460 # Initialise control ping
461 crc = self.messages['control_ping'].crc
462 self.control_ping_index = self.transport.get_msg_index(
463 ('control_ping' + '_' + crc[2:]).encode('utf-8'))
464 self.control_ping_msgdef = self.messages['control_ping']
465 if self.async_thread:
466 self.event_thread = threading.Thread(
467 target=self.thread_msg_handler)
468 self.event_thread.daemon = True
469 self.event_thread.start()
472 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
475 name - the name of the client.
476 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
477 do_async - if true, messages are sent without waiting for a reply
478 rx_qlen - the length of the VPP message receive queue between
481 msg_handler = self.transport.get_callback(do_async)
482 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
485 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
486 """Attach to VPP in synchronous mode. Application must poll for events.
488 name - the name of the client.
489 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
490 rx_qlen - the length of the VPP message receive queue between
494 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
497 def disconnect(self):
498 """Detach from VPP."""
499 rv = self.transport.disconnect()
500 self.message_queue.put("terminate event thread")
503 def msg_handler_sync(self, msg):
504 """Process an incoming message from VPP in sync mode.
506 The message may be a reply or it may be an async notification.
508 r = self.decode_incoming_msg(msg)
512 # If we have a context, then use the context to find any
513 # request waiting for a reply
515 if hasattr(r, 'context') and r.context > 0:
519 # No context -> async notification that we feed to the callback
520 self.message_queue.put_nowait(r)
522 raise VPPIOError(2, 'RPC reply message received in event handler')
524 def has_context(self, msg):
528 header = VPPType('header_with_context', [['u16', 'msgid'],
529 ['u32', 'client_index'],
532 (i, ci, context), size = header.unpack(msg, 0)
533 if self.id_names[i] == 'rx_thread_exit':
537 # Decode message and returns a tuple.
539 msgobj = self.id_msgdef[i]
540 if 'context' in msgobj.field_by_name and context >= 0:
544 def decode_incoming_msg(self, msg, no_type_conversion=False):
546 self.logger.warning('vpp_api.read failed')
549 (i, ci), size = self.header.unpack(msg, 0)
550 if self.id_names[i] == 'rx_thread_exit':
554 # Decode message and returns a tuple.
556 msgobj = self.id_msgdef[i]
558 raise VPPIOError(2, 'Reply message undefined')
560 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
563 def msg_handler_async(self, msg):
564 """Process a message from VPP in async mode.
566 In async mode, all messages are returned to the callback.
568 r = self.decode_incoming_msg(msg)
572 msgname = type(r).__name__
574 if self.event_callback:
575 self.event_callback(msgname, r)
577 def _control_ping(self, context):
578 """Send a ping command."""
579 self._call_vpp_async(self.control_ping_index,
580 self.control_ping_msgdef,
583 def validate_args(self, msg, kwargs):
584 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
586 raise VPPValueError('Invalid argument {} to {}'
587 .format(list(d), msg.name))
589 def _call_vpp(self, i, msgdef, multipart, **kwargs):
590 """Given a message, send the message and await a reply.
592 msgdef - the message packing definition
593 i - the message type index
594 multipart - True if the message returns multiple
596 context - context number - chosen at random if not
598 The remainder of the kwargs are the arguments to the API call.
600 The return value is the message or message array containing
601 the response. It will raise an IOError exception if there was
602 no response within the timeout window.
605 if 'context' not in kwargs:
606 context = self.get_context()
607 kwargs['context'] = context
609 context = kwargs['context']
610 kwargs['_vl_msg_id'] = i
612 no_type_conversion = kwargs.pop('_no_type_conversion', False)
615 if self.transport.socket_index:
616 kwargs['client_index'] = self.transport.socket_index
617 except AttributeError:
619 self.validate_args(msgdef, kwargs)
621 logging.debug(call_logger(msgdef, kwargs))
623 b = msgdef.pack(kwargs)
624 self.transport.suspend()
626 self.transport.write(b)
629 # Send a ping after the request - we use its response
630 # to detect that we have seen all results.
631 self._control_ping(context)
633 # Block until we get a reply.
636 msg = self.transport.read()
638 raise VPPIOError(2, 'VPP API client: read failed')
639 r = self.decode_incoming_msg(msg, no_type_conversion)
640 msgname = type(r).__name__
641 if context not in r or r.context == 0 or context != r.context:
642 # Message being queued
643 self.message_queue.put_nowait(r)
649 if msgname == 'control_ping_reply':
654 self.transport.resume()
656 logger.debug(return_logger(rl))
659 def _call_vpp_async(self, i, msg, **kwargs):
660 """Given a message, send the message and await a reply.
662 msgdef - the message packing definition
663 i - the message type index
664 context - context number - chosen at random if not
666 The remainder of the kwargs are the arguments to the API call.
668 if 'context' not in kwargs:
669 context = self.get_context()
670 kwargs['context'] = context
672 context = kwargs['context']
674 if self.transport.socket_index:
675 kwargs['client_index'] = self.transport.socket_index
676 except AttributeError:
677 kwargs['client_index'] = 0
678 kwargs['_vl_msg_id'] = i
681 self.transport.write(b)
683 def register_event_callback(self, callback):
684 """Register a callback for async messages.
686 This will be called for async notifications in sync mode,
687 and all messages in async mode. In sync mode, replies to
688 requests will not come here.
690 callback is a fn(msg_type_name, msg_type) that will be
691 called when a message comes in. While this function is
692 executing, note that (a) you are in a background thread and
693 may wish to use threading.Lock to protect your datastructures,
694 and (b) message processing from VPP will stop (so if you take
695 a long while about it you may provoke reply timeouts or cause
696 VPP to fill the RX buffer). Passing None will disable the
699 self.event_callback = callback
701 def thread_msg_handler(self):
702 """Python thread calling the user registered message handler.
704 This is to emulate the old style event callback scheme. Modern
705 clients should provide their own thread to poll the event
709 r = self.message_queue.get()
710 if r == "terminate event thread":
712 msgname = type(r).__name__
713 if self.event_callback:
714 self.event_callback(msgname, r)
717 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4