3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
29 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
30 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
32 logger = logging.getLogger(__name__)
34 if sys.version[0] == '2':
40 class VppEnumType(type):
41 def __getattr__(cls, name):
42 t = vpp_get_type(name)
47 # class VppEnum(metaclass=VppEnumType):
49 class VppEnum(object):
50 __metaclass__ = VppEnumType
53 def vpp_atexit(vpp_weakref):
54 """Clean up VPP connection on shutdown."""
55 vpp_instance = vpp_weakref()
56 if vpp_instance and vpp_instance.transport.connected:
57 vpp_instance.logger.debug('Cleaning up VPP on exit')
58 vpp_instance.disconnect()
60 if sys.version[0] == '2':
68 def call_logger(msgdef, kwargs):
69 s = 'Calling {}('.format(msgdef.name)
70 for k, v in kwargs.items():
71 s += '{}:{} '.format(k, v)
77 s = 'Return from {}'.format(r)
81 class VppApiDynamicMethodHolder(object):
85 class FuncWrapper(object):
86 def __init__(self, func):
88 self.__name__ = func.__name__
90 def __call__(self, **kwargs):
91 return self._func(**kwargs)
94 class VPPApiError(Exception):
98 class VPPNotImplementedError(NotImplementedError):
102 class VPPIOError(IOError):
106 class VPPRuntimeError(RuntimeError):
110 class VPPValueError(ValueError):
117 This class provides the APIs to VPP. The APIs are loaded
118 from provided .api.json files and makes functions accordingly.
119 These functions are documented in the VPP .api files, as they
120 are dynamically created.
122 Additionally, VPP can send callback messages; this class
123 provides a means to register a callback function to receive
124 these messages in a background thread.
126 VPPApiError = VPPApiError
127 VPPRuntimeError = VPPRuntimeError
128 VPPValueError = VPPValueError
129 VPPNotImplementedError = VPPNotImplementedError
130 VPPIOError = VPPIOError
132 def process_json_file(self, apidef_file):
133 api = json.load(apidef_file)
135 for t in api['enums']:
136 t[0] = 'vl_api_' + t[0] + '_t'
137 types[t[0]] = {'type': 'enum', 'data': t}
138 for t in api['unions']:
139 t[0] = 'vl_api_' + t[0] + '_t'
140 types[t[0]] = {'type': 'union', 'data': t}
141 for t in api['types']:
142 t[0] = 'vl_api_' + t[0] + '_t'
143 types[t[0]] = {'type': 'type', 'data': t}
144 for t, v in api['aliases'].items():
145 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
146 self.services.update(api['services'])
151 for k, v in types.items():
153 if not vpp_get_type(k):
154 if v['type'] == 'enum':
156 VPPEnumType(t[0], t[1:])
159 elif v['type'] == 'union':
161 VPPUnionType(t[0], t[1:])
164 elif v['type'] == 'type':
169 elif v['type'] == 'alias':
174 if len(unresolved) == 0:
177 raise VPPValueError('Unresolved type definitions {}'
182 for m in api['messages']:
184 self.messages[m[0]] = VPPMessage(m[0], m[1:])
185 except VPPNotImplementedError:
186 self.logger.error('Not implemented error for {}'.format(m[0]))
188 def __init__(self, apifiles=None, testmode=False, async_thread=True,
189 logger=None, loglevel=None,
190 read_timeout=5, use_socket=False,
191 server_address='/run/vpp-api.sock'):
192 """Create a VPP API object.
194 apifiles is a list of files containing API
195 descriptions that will be loaded - methods will be
196 dynamically created reflecting these APIs. If not
197 provided this will load the API files from VPP's
198 default install location.
200 logger, if supplied, is the logging logger object to log to.
201 loglevel, if supplied, is the log level this logger is set
202 to report at (from the loglevels in the logging module).
205 logger = logging.getLogger(__name__)
206 if loglevel is not None:
207 logger.setLevel(loglevel)
214 self.header = VPPType('header', [['u16', 'msgid'],
215 ['u32', 'client_index']])
217 self.event_callback = None
218 self.message_queue = queue.Queue()
219 self.read_timeout = read_timeout
220 self.async_thread = async_thread
223 from . vpp_transport_socket import VppTransport
225 from . vpp_transport_shmem import VppTransport
228 # Pick up API definitions from default directory
230 apifiles = self.find_api_files()
232 # In test mode we don't care that we can't find the API files
236 raise VPPRuntimeError
238 for file in apifiles:
239 with open(file) as apidef_file:
240 self.process_json_file(apidef_file)
242 self.apifiles = apifiles
245 if len(self.messages) == 0 and not testmode:
246 raise VPPValueError(1, 'Missing JSON message definitions')
248 self.transport = VppTransport(self, read_timeout=read_timeout,
249 server_address=server_address)
250 # Make sure we allow VPP to clean up the message rings.
251 atexit.register(vpp_atexit, weakref.ref(self))
253 class ContextId(object):
254 """Thread-safe provider of unique context IDs."""
257 self.lock = threading.Lock()
260 """Get a new unique (or, at least, not recently used) context."""
264 get_context = ContextId()
266 def get_type(self, name):
267 return vpp_get_type(name)
270 def find_api_dir(cls):
271 """Attempt to find the best directory in which API definition
272 files may reside. If the value VPP_API_DIR exists in the environment
273 then it is first on the search list. If we're inside a recognized
274 location in a VPP source tree (src/scripts and src/vpp-api/python)
275 then entries from there to the likely locations in build-root are
276 added. Finally the location used by system packages is added.
278 :returns: A single directory name, or None if no such directory
283 if 'VPP_API_DIR' in os.environ:
284 dirs.append(os.environ['VPP_API_DIR'])
286 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
287 # in which case, plot a course to likely places in the src tree
288 import __main__ as main
289 if hasattr(main, '__file__'):
290 # get the path of the calling script
291 localdir = os.path.dirname(os.path.realpath(main.__file__))
293 # use cwd if there is no calling script
294 localdir = os.getcwd()
295 localdir_s = localdir.split(os.path.sep)
298 """Match dir against right-hand components of the script dir"""
299 d = dir.split('/') # param 'dir' assumes a / separator
301 return len(localdir_s) > length and localdir_s[-length:] == d
303 def sdir(srcdir, variant):
304 """Build a path from srcdir to the staged API files of
305 'variant' (typically '' or '_debug')"""
306 # Since 'core' and 'plugin' files are staged
307 # in separate directories, we target the parent dir.
308 return os.path.sep.join((
311 'install-vpp%s-native' % variant,
319 if dmatch('src/scripts'):
320 srcdir = os.path.sep.join(localdir_s[:-2])
321 elif dmatch('src/vpp-api/python'):
322 srcdir = os.path.sep.join(localdir_s[:-3])
324 # we're apparently running tests
325 srcdir = os.path.sep.join(localdir_s[:-1])
328 # we're in the source tree, try both the debug and release
330 dirs.append(sdir(srcdir, '_debug'))
331 dirs.append(sdir(srcdir, ''))
333 # Test for staged copies of the scripts
334 # For these, since we explicitly know if we're running a debug versus
335 # release variant, target only the relevant directory
336 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
337 srcdir = os.path.sep.join(localdir_s[:-4])
338 dirs.append(sdir(srcdir, '_debug'))
339 if dmatch('build-root/install-vpp-native/vpp/bin'):
340 srcdir = os.path.sep.join(localdir_s[:-4])
341 dirs.append(sdir(srcdir, ''))
343 # finally, try the location system packages typically install into
344 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
346 # check the directories for existance; first one wins
348 if os.path.isdir(dir):
354 def find_api_files(cls, api_dir=None, patterns='*'):
355 """Find API definition files from the given directory tree with the
356 given pattern. If no directory is given then find_api_dir() is used
357 to locate one. If no pattern is given then all definition files found
358 in the directory tree are used.
360 :param api_dir: A directory tree in which to locate API definition
361 files; subdirectories are descended into.
362 If this is None then find_api_dir() is called to discover it.
363 :param patterns: A list of patterns to use in each visited directory
364 when looking for files.
365 This can be a list/tuple object or a comma-separated string of
366 patterns. Each value in the list will have leading/trialing
368 The pattern specifies the first part of the filename, '.api.json'
370 The results are de-duplicated, thus overlapping patterns are fine.
371 If this is None it defaults to '*' meaning "all API files".
372 :returns: A list of file paths for the API files found.
375 api_dir = cls.find_api_dir()
377 raise VPPApiError("api_dir cannot be located")
379 if isinstance(patterns, list) or isinstance(patterns, tuple):
380 patterns = [p.strip() + '.api.json' for p in patterns]
382 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
385 for root, dirnames, files in os.walk(api_dir):
386 # iterate all given patterns and de-dup the result
387 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
388 for filename in files:
389 api_files.append(os.path.join(root, filename))
395 if not hasattr(self, "_api"):
396 raise VPPApiError("Not connected, api definitions not available")
399 def make_function(self, msg, i, multipart, do_async):
402 return self._call_vpp_async(i, msg, **kwargs)
405 return self._call_vpp(i, msg, multipart, **kwargs)
407 f.__name__ = str(msg.name)
408 f.__doc__ = ", ".join(["%s %s" %
409 (msg.fieldtypes[j], k)
410 for j, k in enumerate(msg.fields)])
413 def _register_functions(self, do_async=False):
414 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
415 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
416 self._api = VppApiDynamicMethodHolder()
417 for name, msg in vpp_iterator(self.messages):
418 n = name + '_' + msg.crc[2:]
419 i = self.transport.get_msg_index(n.encode())
421 self.id_msgdef[i] = msg
422 self.id_names[i] = name
424 # Create function for client side messages.
425 if name in self.services:
426 if 'stream' in self.services[name] and \
427 self.services[name]['stream']:
431 f = self.make_function(msg, i, multipart, do_async)
432 setattr(self._api, name, FuncWrapper(f))
435 'No such message type or failed CRC checksum: %s', n)
437 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
439 pfx = chroot_prefix.encode() if chroot_prefix else None
441 rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
443 raise VPPIOError(2, 'Connect failed')
444 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
445 self._register_functions(do_async=do_async)
447 # Initialise control ping
448 crc = self.messages['control_ping'].crc
449 self.control_ping_index = self.transport.get_msg_index(
450 ('control_ping' + '_' + crc[2:]).encode())
451 self.control_ping_msgdef = self.messages['control_ping']
452 if self.async_thread:
453 self.event_thread = threading.Thread(
454 target=self.thread_msg_handler)
455 self.event_thread.daemon = True
456 self.event_thread.start()
459 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
462 name - the name of the client.
463 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
464 do_async - if true, messages are sent without waiting for a reply
465 rx_qlen - the length of the VPP message receive queue between
468 msg_handler = self.transport.get_callback(do_async)
469 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
472 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
473 """Attach to VPP in synchronous mode. Application must poll for events.
475 name - the name of the client.
476 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
477 rx_qlen - the length of the VPP message receive queue between
481 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
484 def disconnect(self):
485 """Detach from VPP."""
486 rv = self.transport.disconnect()
487 self.message_queue.put("terminate event thread")
490 def msg_handler_sync(self, msg):
491 """Process an incoming message from VPP in sync mode.
493 The message may be a reply or it may be an async notification.
495 r = self.decode_incoming_msg(msg)
499 # If we have a context, then use the context to find any
500 # request waiting for a reply
502 if hasattr(r, 'context') and r.context > 0:
506 # No context -> async notification that we feed to the callback
507 self.message_queue.put_nowait(r)
509 raise VPPIOError(2, 'RPC reply message received in event handler')
511 def has_context(self, msg):
515 header = VPPType('header_with_context', [['u16', 'msgid'],
516 ['u32', 'client_index'],
519 (i, ci, context), size = header.unpack(msg, 0)
520 if self.id_names[i] == 'rx_thread_exit':
524 # Decode message and returns a tuple.
526 msgobj = self.id_msgdef[i]
527 if 'context' in msgobj.field_by_name and context >= 0:
531 def decode_incoming_msg(self, msg, no_type_conversion=False):
533 self.logger.warning('vpp_api.read failed')
536 (i, ci), size = self.header.unpack(msg, 0)
537 if self.id_names[i] == 'rx_thread_exit':
541 # Decode message and returns a tuple.
543 msgobj = self.id_msgdef[i]
545 raise VPPIOError(2, 'Reply message undefined')
547 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
550 def msg_handler_async(self, msg):
551 """Process a message from VPP in async mode.
553 In async mode, all messages are returned to the callback.
555 r = self.decode_incoming_msg(msg)
559 msgname = type(r).__name__
561 if self.event_callback:
562 self.event_callback(msgname, r)
564 def _control_ping(self, context):
565 """Send a ping command."""
566 self._call_vpp_async(self.control_ping_index,
567 self.control_ping_msgdef,
570 def validate_args(self, msg, kwargs):
571 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
573 raise VPPValueError('Invalid argument {} to {}'
574 .format(list(d), msg.name))
576 def _call_vpp(self, i, msgdef, multipart, **kwargs):
577 """Given a message, send the message and await a reply.
579 msgdef - the message packing definition
580 i - the message type index
581 multipart - True if the message returns multiple
583 context - context number - chosen at random if not
585 The remainder of the kwargs are the arguments to the API call.
587 The return value is the message or message array containing
588 the response. It will raise an IOError exception if there was
589 no response within the timeout window.
592 if 'context' not in kwargs:
593 context = self.get_context()
594 kwargs['context'] = context
596 context = kwargs['context']
597 kwargs['_vl_msg_id'] = i
599 no_type_conversion = kwargs.pop('_no_type_conversion', False)
602 if self.transport.socket_index:
603 kwargs['client_index'] = self.transport.socket_index
604 except AttributeError:
606 self.validate_args(msgdef, kwargs)
608 logging.debug(call_logger(msgdef, kwargs))
610 b = msgdef.pack(kwargs)
611 self.transport.suspend()
613 self.transport.write(b)
616 # Send a ping after the request - we use its response
617 # to detect that we have seen all results.
618 self._control_ping(context)
620 # Block until we get a reply.
623 msg = self.transport.read()
625 raise VPPIOError(2, 'VPP API client: read failed')
626 r = self.decode_incoming_msg(msg, no_type_conversion)
627 msgname = type(r).__name__
628 if context not in r or r.context == 0 or context != r.context:
629 # Message being queued
630 self.message_queue.put_nowait(r)
636 if msgname == 'control_ping_reply':
641 self.transport.resume()
643 logger.debug(return_logger(rl))
646 def _call_vpp_async(self, i, msg, **kwargs):
647 """Given a message, send the message and await a reply.
649 msgdef - the message packing definition
650 i - the message type index
651 context - context number - chosen at random if not
653 The remainder of the kwargs are the arguments to the API call.
655 if 'context' not in kwargs:
656 context = self.get_context()
657 kwargs['context'] = context
659 context = kwargs['context']
661 if self.transport.socket_index:
662 kwargs['client_index'] = self.transport.socket_index
663 except AttributeError:
664 kwargs['client_index'] = 0
665 kwargs['_vl_msg_id'] = i
668 self.transport.write(b)
670 def register_event_callback(self, callback):
671 """Register a callback for async messages.
673 This will be called for async notifications in sync mode,
674 and all messages in async mode. In sync mode, replies to
675 requests will not come here.
677 callback is a fn(msg_type_name, msg_type) that will be
678 called when a message comes in. While this function is
679 executing, note that (a) you are in a background thread and
680 may wish to use threading.Lock to protect your datastructures,
681 and (b) message processing from VPP will stop (so if you take
682 a long while about it you may provoke reply timeouts or cause
683 VPP to fill the RX buffer). Passing None will disable the
686 self.event_callback = callback
688 def thread_msg_handler(self):
689 """Python thread calling the user registered message handler.
691 This is to emulate the old style event callback scheme. Modern
692 clients should provide their own thread to poll the event
696 r = self.message_queue.get()
697 if r == "terminate event thread":
699 msgname = type(r).__name__
700 if self.event_callback:
701 self.event_callback(msgname, r)
704 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4