3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
30 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
31 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
32 from . macaddress import MACAddress, mac_pton, mac_ntop
34 logger = logging.getLogger(__name__)
36 if sys.version[0] == '2':
42 def metaclass(metaclass):
43 @functools.wraps(metaclass)
45 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
50 class VppEnumType(type):
51 def __getattr__(cls, name):
52 t = vpp_get_type(name)
56 @metaclass(VppEnumType)
57 class VppEnum(object):
61 def vpp_atexit(vpp_weakref):
62 """Clean up VPP connection on shutdown."""
63 vpp_instance = vpp_weakref()
64 if vpp_instance and vpp_instance.transport.connected:
65 vpp_instance.logger.debug('Cleaning up VPP on exit')
66 vpp_instance.disconnect()
69 if sys.version[0] == '2':
77 def call_logger(msgdef, kwargs):
78 s = 'Calling {}('.format(msgdef.name)
79 for k, v in kwargs.items():
80 s += '{}:{} '.format(k, v)
86 s = 'Return from {}'.format(r)
90 class VppApiDynamicMethodHolder(object):
94 class FuncWrapper(object):
95 def __init__(self, func):
97 self.__name__ = func.__name__
98 self.__doc__ = func.__doc__
100 def __call__(self, **kwargs):
101 return self._func(**kwargs)
104 class VPPApiError(Exception):
108 class VPPNotImplementedError(NotImplementedError):
112 class VPPIOError(IOError):
116 class VPPRuntimeError(RuntimeError):
120 class VPPValueError(ValueError):
124 class VPPApiClient(object):
127 This class provides the APIs to VPP. The APIs are loaded
128 from provided .api.json files and makes functions accordingly.
129 These functions are documented in the VPP .api files, as they
130 are dynamically created.
132 Additionally, VPP can send callback messages; this class
133 provides a means to register a callback function to receive
134 these messages in a background thread.
137 VPPApiError = VPPApiError
138 VPPRuntimeError = VPPRuntimeError
139 VPPValueError = VPPValueError
140 VPPNotImplementedError = VPPNotImplementedError
141 VPPIOError = VPPIOError
143 def process_json_file(self, apidef_file):
144 api = json.load(apidef_file)
146 for t in api['enums']:
147 t[0] = 'vl_api_' + t[0] + '_t'
148 types[t[0]] = {'type': 'enum', 'data': t}
149 for t in api['unions']:
150 t[0] = 'vl_api_' + t[0] + '_t'
151 types[t[0]] = {'type': 'union', 'data': t}
152 for t in api['types']:
153 t[0] = 'vl_api_' + t[0] + '_t'
154 types[t[0]] = {'type': 'type', 'data': t}
155 for t, v in api['aliases'].items():
156 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
157 self.services.update(api['services'])
162 for k, v in types.items():
164 if not vpp_get_type(k):
165 if v['type'] == 'enum':
167 VPPEnumType(t[0], t[1:])
170 elif v['type'] == 'union':
172 VPPUnionType(t[0], t[1:])
175 elif v['type'] == 'type':
180 elif v['type'] == 'alias':
185 if len(unresolved) == 0:
188 raise VPPValueError('Unresolved type definitions {}'
193 for m in api['messages']:
195 self.messages[m[0]] = VPPMessage(m[0], m[1:])
196 except VPPNotImplementedError:
197 self.logger.error('Not implemented error for {}'.format(m[0]))
199 def __init__(self, apifiles=None, testmode=False, async_thread=True,
200 logger=None, loglevel=None,
201 read_timeout=5, use_socket=False,
202 server_address='/run/vpp-api.sock'):
203 """Create a VPP API object.
205 apifiles is a list of files containing API
206 descriptions that will be loaded - methods will be
207 dynamically created reflecting these APIs. If not
208 provided this will load the API files from VPP's
209 default install location.
211 logger, if supplied, is the logging logger object to log to.
212 loglevel, if supplied, is the log level this logger is set
213 to report at (from the loglevels in the logging module).
216 logger = logging.getLogger(__name__)
217 if loglevel is not None:
218 logger.setLevel(loglevel)
225 self.header = VPPType('header', [['u16', 'msgid'],
226 ['u32', 'client_index']])
228 self.event_callback = None
229 self.message_queue = queue.Queue()
230 self.read_timeout = read_timeout
231 self.async_thread = async_thread
234 from . vpp_transport_socket import VppTransport
236 from . vpp_transport_shmem import VppTransport
239 # Pick up API definitions from default directory
241 apifiles = self.find_api_files()
243 # In test mode we don't care that we can't find the API files
247 raise VPPRuntimeError
249 for file in apifiles:
250 with open(file) as apidef_file:
251 self.process_json_file(apidef_file)
253 self.apifiles = apifiles
256 if len(self.messages) == 0 and not testmode:
257 raise VPPValueError(1, 'Missing JSON message definitions')
259 self.transport = VppTransport(self, read_timeout=read_timeout,
260 server_address=server_address)
261 # Make sure we allow VPP to clean up the message rings.
262 atexit.register(vpp_atexit, weakref.ref(self))
264 class ContextId(object):
265 """Thread-safe provider of unique context IDs."""
268 self.lock = threading.Lock()
271 """Get a new unique (or, at least, not recently used) context."""
275 get_context = ContextId()
277 def get_type(self, name):
278 return vpp_get_type(name)
281 def find_api_dir(cls):
282 """Attempt to find the best directory in which API definition
283 files may reside. If the value VPP_API_DIR exists in the environment
284 then it is first on the search list. If we're inside a recognized
285 location in a VPP source tree (src/scripts and src/vpp-api/python)
286 then entries from there to the likely locations in build-root are
287 added. Finally the location used by system packages is added.
289 :returns: A single directory name, or None if no such directory
292 dirs = [cls.apidir] if cls.apidir else []
294 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
295 # in which case, plot a course to likely places in the src tree
296 import __main__ as main
297 if hasattr(main, '__file__'):
298 # get the path of the calling script
299 localdir = os.path.dirname(os.path.realpath(main.__file__))
301 # use cwd if there is no calling script
302 localdir = os.getcwd()
303 localdir_s = localdir.split(os.path.sep)
306 """Match dir against right-hand components of the script dir"""
307 d = dir.split('/') # param 'dir' assumes a / separator
309 return len(localdir_s) > length and localdir_s[-length:] == d
311 def sdir(srcdir, variant):
312 """Build a path from srcdir to the staged API files of
313 'variant' (typically '' or '_debug')"""
314 # Since 'core' and 'plugin' files are staged
315 # in separate directories, we target the parent dir.
316 return os.path.sep.join((
319 'install-vpp%s-native' % variant,
327 if dmatch('src/scripts'):
328 srcdir = os.path.sep.join(localdir_s[:-2])
329 elif dmatch('src/vpp-api/python'):
330 srcdir = os.path.sep.join(localdir_s[:-3])
332 # we're apparently running tests
333 srcdir = os.path.sep.join(localdir_s[:-1])
336 # we're in the source tree, try both the debug and release
338 dirs.append(sdir(srcdir, '_debug'))
339 dirs.append(sdir(srcdir, ''))
341 # Test for staged copies of the scripts
342 # For these, since we explicitly know if we're running a debug versus
343 # release variant, target only the relevant directory
344 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
345 srcdir = os.path.sep.join(localdir_s[:-4])
346 dirs.append(sdir(srcdir, '_debug'))
347 if dmatch('build-root/install-vpp-native/vpp/bin'):
348 srcdir = os.path.sep.join(localdir_s[:-4])
349 dirs.append(sdir(srcdir, ''))
351 # finally, try the location system packages typically install into
352 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
354 # check the directories for existence; first one wins
356 if os.path.isdir(dir):
362 def find_api_files(cls, api_dir=None, patterns='*'):
363 """Find API definition files from the given directory tree with the
364 given pattern. If no directory is given then find_api_dir() is used
365 to locate one. If no pattern is given then all definition files found
366 in the directory tree are used.
368 :param api_dir: A directory tree in which to locate API definition
369 files; subdirectories are descended into.
370 If this is None then find_api_dir() is called to discover it.
371 :param patterns: A list of patterns to use in each visited directory
372 when looking for files.
373 This can be a list/tuple object or a comma-separated string of
374 patterns. Each value in the list will have leading/trialing
376 The pattern specifies the first part of the filename, '.api.json'
378 The results are de-duplicated, thus overlapping patterns are fine.
379 If this is None it defaults to '*' meaning "all API files".
380 :returns: A list of file paths for the API files found.
383 api_dir = cls.find_api_dir()
385 raise VPPApiError("api_dir cannot be located")
387 if isinstance(patterns, list) or isinstance(patterns, tuple):
388 patterns = [p.strip() + '.api.json' for p in patterns]
390 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
393 for root, dirnames, files in os.walk(api_dir):
394 # iterate all given patterns and de-dup the result
395 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
396 for filename in files:
397 api_files.append(os.path.join(root, filename))
403 if not hasattr(self, "_api"):
404 raise VPPApiError("Not connected, api definitions not available")
407 def make_function(self, msg, i, multipart, do_async):
410 return self._call_vpp_async(i, msg, **kwargs)
413 return self._call_vpp(i, msg, multipart, **kwargs)
415 f.__name__ = str(msg.name)
416 f.__doc__ = ", ".join(["%s %s" %
417 (msg.fieldtypes[j], k)
418 for j, k in enumerate(msg.fields)])
423 def _register_functions(self, do_async=False):
424 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
425 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
426 self._api = VppApiDynamicMethodHolder()
427 for name, msg in vpp_iterator(self.messages):
428 n = name + '_' + msg.crc[2:]
429 i = self.transport.get_msg_index(n.encode('utf-8'))
431 self.id_msgdef[i] = msg
432 self.id_names[i] = name
434 # Create function for client side messages.
435 if name in self.services:
436 if 'stream' in self.services[name] and \
437 self.services[name]['stream']:
441 f = self.make_function(msg, i, multipart, do_async)
442 setattr(self._api, name, FuncWrapper(f))
445 'No such message type or failed CRC checksum: %s', n)
447 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
449 pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
451 rv = self.transport.connect(name.encode('utf-8'), pfx,
452 msg_handler, rx_qlen)
454 raise VPPIOError(2, 'Connect failed')
455 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
456 self._register_functions(do_async=do_async)
458 # Initialise control ping
459 crc = self.messages['control_ping'].crc
460 self.control_ping_index = self.transport.get_msg_index(
461 ('control_ping' + '_' + crc[2:]).encode('utf-8'))
462 self.control_ping_msgdef = self.messages['control_ping']
463 if self.async_thread:
464 self.event_thread = threading.Thread(
465 target=self.thread_msg_handler)
466 self.event_thread.daemon = True
467 self.event_thread.start()
469 self.event_thread = None
472 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
475 name - the name of the client.
476 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
477 do_async - if true, messages are sent without waiting for a reply
478 rx_qlen - the length of the VPP message receive queue between
481 msg_handler = self.transport.get_callback(do_async)
482 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
485 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
486 """Attach to VPP in synchronous mode. Application must poll for events.
488 name - the name of the client.
489 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
490 rx_qlen - the length of the VPP message receive queue between
494 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
497 def disconnect(self):
498 """Detach from VPP."""
499 rv = self.transport.disconnect()
500 if self.event_thread is not None:
501 self.message_queue.put("terminate event thread")
504 def msg_handler_sync(self, msg):
505 """Process an incoming message from VPP in sync mode.
507 The message may be a reply or it may be an async notification.
509 r = self.decode_incoming_msg(msg)
513 # If we have a context, then use the context to find any
514 # request waiting for a reply
516 if hasattr(r, 'context') and r.context > 0:
520 # No context -> async notification that we feed to the callback
521 self.message_queue.put_nowait(r)
523 raise VPPIOError(2, 'RPC reply message received in event handler')
525 def has_context(self, msg):
529 header = VPPType('header_with_context', [['u16', 'msgid'],
530 ['u32', 'client_index'],
533 (i, ci, context), size = header.unpack(msg, 0)
534 if self.id_names[i] == 'rx_thread_exit':
538 # Decode message and returns a tuple.
540 msgobj = self.id_msgdef[i]
541 if 'context' in msgobj.field_by_name and context >= 0:
545 def decode_incoming_msg(self, msg, no_type_conversion=False):
547 self.logger.warning('vpp_api.read failed')
550 (i, ci), size = self.header.unpack(msg, 0)
551 if self.id_names[i] == 'rx_thread_exit':
555 # Decode message and returns a tuple.
557 msgobj = self.id_msgdef[i]
559 raise VPPIOError(2, 'Reply message undefined')
561 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
564 def msg_handler_async(self, msg):
565 """Process a message from VPP in async mode.
567 In async mode, all messages are returned to the callback.
569 r = self.decode_incoming_msg(msg)
573 msgname = type(r).__name__
575 if self.event_callback:
576 self.event_callback(msgname, r)
578 def _control_ping(self, context):
579 """Send a ping command."""
580 self._call_vpp_async(self.control_ping_index,
581 self.control_ping_msgdef,
584 def validate_args(self, msg, kwargs):
585 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
587 raise VPPValueError('Invalid argument {} to {}'
588 .format(list(d), msg.name))
590 def _call_vpp(self, i, msgdef, multipart, **kwargs):
591 """Given a message, send the message and await a reply.
593 msgdef - the message packing definition
594 i - the message type index
595 multipart - True if the message returns multiple
597 context - context number - chosen at random if not
599 The remainder of the kwargs are the arguments to the API call.
601 The return value is the message or message array containing
602 the response. It will raise an IOError exception if there was
603 no response within the timeout window.
606 if 'context' not in kwargs:
607 context = self.get_context()
608 kwargs['context'] = context
610 context = kwargs['context']
611 kwargs['_vl_msg_id'] = i
613 no_type_conversion = kwargs.pop('_no_type_conversion', False)
616 if self.transport.socket_index:
617 kwargs['client_index'] = self.transport.socket_index
618 except AttributeError:
620 self.validate_args(msgdef, kwargs)
622 logging.debug(call_logger(msgdef, kwargs))
624 b = msgdef.pack(kwargs)
625 self.transport.suspend()
627 self.transport.write(b)
630 # Send a ping after the request - we use its response
631 # to detect that we have seen all results.
632 self._control_ping(context)
634 # Block until we get a reply.
637 msg = self.transport.read()
639 raise VPPIOError(2, 'VPP API client: read failed')
640 r = self.decode_incoming_msg(msg, no_type_conversion)
641 msgname = type(r).__name__
642 if context not in r or r.context == 0 or context != r.context:
643 # Message being queued
644 self.message_queue.put_nowait(r)
650 if msgname == 'control_ping_reply':
655 self.transport.resume()
657 logger.debug(return_logger(rl))
660 def _call_vpp_async(self, i, msg, **kwargs):
661 """Given a message, send the message and await a reply.
663 msgdef - the message packing definition
664 i - the message type index
665 context - context number - chosen at random if not
667 The remainder of the kwargs are the arguments to the API call.
669 if 'context' not in kwargs:
670 context = self.get_context()
671 kwargs['context'] = context
673 context = kwargs['context']
675 if self.transport.socket_index:
676 kwargs['client_index'] = self.transport.socket_index
677 except AttributeError:
678 kwargs['client_index'] = 0
679 kwargs['_vl_msg_id'] = i
682 self.transport.write(b)
684 def register_event_callback(self, callback):
685 """Register a callback for async messages.
687 This will be called for async notifications in sync mode,
688 and all messages in async mode. In sync mode, replies to
689 requests will not come here.
691 callback is a fn(msg_type_name, msg_type) that will be
692 called when a message comes in. While this function is
693 executing, note that (a) you are in a background thread and
694 may wish to use threading.Lock to protect your datastructures,
695 and (b) message processing from VPP will stop (so if you take
696 a long while about it you may provoke reply timeouts or cause
697 VPP to fill the RX buffer). Passing None will disable the
700 self.event_callback = callback
702 def thread_msg_handler(self):
703 """Python thread calling the user registered message handler.
705 This is to emulate the old style event callback scheme. Modern
706 clients should provide their own thread to poll the event
710 r = self.message_queue.get()
711 if r == "terminate event thread":
713 msgname = type(r).__name__
714 if self.event_callback:
715 self.event_callback(msgname, r)
717 # Provide the old name for backward compatibility.
720 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4