3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
29 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
30 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
32 if sys.version[0] == '2':
38 class VppEnumType(type):
39 def __getattr__(cls, name):
40 t = vpp_get_type(name)
45 # class VppEnum(metaclass=VppEnumType):
47 class VppEnum(object):
48 __metaclass__ = VppEnumType
51 def vpp_atexit(vpp_weakref):
52 """Clean up VPP connection on shutdown."""
53 vpp_instance = vpp_weakref()
54 if vpp_instance and vpp_instance.transport.connected:
55 vpp_instance.logger.debug('Cleaning up VPP on exit')
56 vpp_instance.disconnect()
58 if sys.version[0] == '2':
66 class VppApiDynamicMethodHolder(object):
70 class FuncWrapper(object):
71 def __init__(self, func):
73 self.__name__ = func.__name__
75 def __call__(self, **kwargs):
76 return self._func(**kwargs)
79 class VPPApiError(Exception):
83 class VPPNotImplementedError(NotImplementedError):
87 class VPPIOError(IOError):
91 class VPPRuntimeError(RuntimeError):
95 class VPPValueError(ValueError):
102 This class provides the APIs to VPP. The APIs are loaded
103 from provided .api.json files and makes functions accordingly.
104 These functions are documented in the VPP .api files, as they
105 are dynamically created.
107 Additionally, VPP can send callback messages; this class
108 provides a means to register a callback function to receive
109 these messages in a background thread.
111 VPPApiError = VPPApiError
112 VPPRuntimeError = VPPRuntimeError
113 VPPValueError = VPPValueError
114 VPPNotImplementedError = VPPNotImplementedError
115 VPPIOError = VPPIOError
117 def process_json_file(self, apidef_file):
118 api = json.load(apidef_file)
120 for t in api['enums']:
121 t[0] = 'vl_api_' + t[0] + '_t'
122 types[t[0]] = {'type': 'enum', 'data': t}
123 for t in api['unions']:
124 t[0] = 'vl_api_' + t[0] + '_t'
125 types[t[0]] = {'type': 'union', 'data': t}
126 for t in api['types']:
127 t[0] = 'vl_api_' + t[0] + '_t'
128 types[t[0]] = {'type': 'type', 'data': t}
129 for t, v in api['aliases'].items():
130 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
131 self.services.update(api['services'])
136 for k, v in types.items():
138 if not vpp_get_type(k):
139 if v['type'] == 'enum':
141 VPPEnumType(t[0], t[1:])
144 elif v['type'] == 'union':
146 VPPUnionType(t[0], t[1:])
149 elif v['type'] == 'type':
154 elif v['type'] == 'alias':
159 if len(unresolved) == 0:
162 raise VPPValueError('Unresolved type definitions {}'
167 for m in api['messages']:
169 self.messages[m[0]] = VPPMessage(m[0], m[1:])
170 except VPPNotImplementedError:
171 self.logger.error('Not implemented error for {}'.format(m[0]))
173 def __init__(self, apifiles=None, testmode=False, async_thread=True,
174 logger=None, loglevel=None,
175 read_timeout=5, use_socket=False,
176 server_address='/run/vpp-api.sock'):
177 """Create a VPP API object.
179 apifiles is a list of files containing API
180 descriptions that will be loaded - methods will be
181 dynamically created reflecting these APIs. If not
182 provided this will load the API files from VPP's
183 default install location.
185 logger, if supplied, is the logging logger object to log to.
186 loglevel, if supplied, is the log level this logger is set
187 to report at (from the loglevels in the logging module).
190 logger = logging.getLogger(__name__)
191 if loglevel is not None:
192 logger.setLevel(loglevel)
199 self.header = VPPType('header', [['u16', 'msgid'],
200 ['u32', 'client_index']])
202 self.event_callback = None
203 self.message_queue = queue.Queue()
204 self.read_timeout = read_timeout
205 self.async_thread = async_thread
208 from . vpp_transport_socket import VppTransport
210 from . vpp_transport_shmem import VppTransport
213 # Pick up API definitions from default directory
215 apifiles = self.find_api_files()
217 # In test mode we don't care that we can't find the API files
221 raise VPPRuntimeError
223 for file in apifiles:
224 with open(file) as apidef_file:
225 self.process_json_file(apidef_file)
227 self.apifiles = apifiles
230 if len(self.messages) == 0 and not testmode:
231 raise VPPValueError(1, 'Missing JSON message definitions')
233 self.transport = VppTransport(self, read_timeout=read_timeout,
234 server_address=server_address)
235 # Make sure we allow VPP to clean up the message rings.
236 atexit.register(vpp_atexit, weakref.ref(self))
238 class ContextId(object):
239 """Thread-safe provider of unique context IDs."""
242 self.lock = threading.Lock()
245 """Get a new unique (or, at least, not recently used) context."""
249 get_context = ContextId()
251 def get_type(self, name):
252 return vpp_get_type(name)
255 def find_api_dir(cls):
256 """Attempt to find the best directory in which API definition
257 files may reside. If the value VPP_API_DIR exists in the environment
258 then it is first on the search list. If we're inside a recognized
259 location in a VPP source tree (src/scripts and src/vpp-api/python)
260 then entries from there to the likely locations in build-root are
261 added. Finally the location used by system packages is added.
263 :returns: A single directory name, or None if no such directory
268 if 'VPP_API_DIR' in os.environ:
269 dirs.append(os.environ['VPP_API_DIR'])
271 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
272 # in which case, plot a course to likely places in the src tree
273 import __main__ as main
274 if hasattr(main, '__file__'):
275 # get the path of the calling script
276 localdir = os.path.dirname(os.path.realpath(main.__file__))
278 # use cwd if there is no calling script
279 localdir = os.getcwd()
280 localdir_s = localdir.split(os.path.sep)
283 """Match dir against right-hand components of the script dir"""
284 d = dir.split('/') # param 'dir' assumes a / separator
286 return len(localdir_s) > length and localdir_s[-length:] == d
288 def sdir(srcdir, variant):
289 """Build a path from srcdir to the staged API files of
290 'variant' (typically '' or '_debug')"""
291 # Since 'core' and 'plugin' files are staged
292 # in separate directories, we target the parent dir.
293 return os.path.sep.join((
296 'install-vpp%s-native' % variant,
304 if dmatch('src/scripts'):
305 srcdir = os.path.sep.join(localdir_s[:-2])
306 elif dmatch('src/vpp-api/python'):
307 srcdir = os.path.sep.join(localdir_s[:-3])
309 # we're apparently running tests
310 srcdir = os.path.sep.join(localdir_s[:-1])
313 # we're in the source tree, try both the debug and release
315 dirs.append(sdir(srcdir, '_debug'))
316 dirs.append(sdir(srcdir, ''))
318 # Test for staged copies of the scripts
319 # For these, since we explicitly know if we're running a debug versus
320 # release variant, target only the relevant directory
321 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
322 srcdir = os.path.sep.join(localdir_s[:-4])
323 dirs.append(sdir(srcdir, '_debug'))
324 if dmatch('build-root/install-vpp-native/vpp/bin'):
325 srcdir = os.path.sep.join(localdir_s[:-4])
326 dirs.append(sdir(srcdir, ''))
328 # finally, try the location system packages typically install into
329 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
331 # check the directories for existance; first one wins
333 if os.path.isdir(dir):
339 def find_api_files(cls, api_dir=None, patterns='*'):
340 """Find API definition files from the given directory tree with the
341 given pattern. If no directory is given then find_api_dir() is used
342 to locate one. If no pattern is given then all definition files found
343 in the directory tree are used.
345 :param api_dir: A directory tree in which to locate API definition
346 files; subdirectories are descended into.
347 If this is None then find_api_dir() is called to discover it.
348 :param patterns: A list of patterns to use in each visited directory
349 when looking for files.
350 This can be a list/tuple object or a comma-separated string of
351 patterns. Each value in the list will have leading/trialing
353 The pattern specifies the first part of the filename, '.api.json'
355 The results are de-duplicated, thus overlapping patterns are fine.
356 If this is None it defaults to '*' meaning "all API files".
357 :returns: A list of file paths for the API files found.
360 api_dir = cls.find_api_dir()
362 raise VPPApiError("api_dir cannot be located")
364 if isinstance(patterns, list) or isinstance(patterns, tuple):
365 patterns = [p.strip() + '.api.json' for p in patterns]
367 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
370 for root, dirnames, files in os.walk(api_dir):
371 # iterate all given patterns and de-dup the result
372 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
373 for filename in files:
374 api_files.append(os.path.join(root, filename))
380 if not hasattr(self, "_api"):
381 raise VPPApiError("Not connected, api definitions not available")
384 def make_function(self, msg, i, multipart, do_async):
387 return self._call_vpp_async(i, msg, **kwargs)
390 return self._call_vpp(i, msg, multipart, **kwargs)
392 f.__name__ = str(msg.name)
393 f.__doc__ = ", ".join(["%s %s" %
394 (msg.fieldtypes[j], k)
395 for j, k in enumerate(msg.fields)])
398 def _register_functions(self, do_async=False):
399 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
400 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
401 self._api = VppApiDynamicMethodHolder()
402 for name, msg in vpp_iterator(self.messages):
403 n = name + '_' + msg.crc[2:]
404 i = self.transport.get_msg_index(n.encode())
406 self.id_msgdef[i] = msg
407 self.id_names[i] = name
409 # Create function for client side messages.
410 if name in self.services:
411 if 'stream' in self.services[name] and \
412 self.services[name]['stream']:
416 f = self.make_function(msg, i, multipart, do_async)
417 setattr(self._api, name, FuncWrapper(f))
420 'No such message type or failed CRC checksum: %s', n)
422 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
424 pfx = chroot_prefix.encode() if chroot_prefix else None
426 rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
428 raise VPPIOError(2, 'Connect failed')
429 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
430 self._register_functions(do_async=do_async)
432 # Initialise control ping
433 crc = self.messages['control_ping'].crc
434 self.control_ping_index = self.transport.get_msg_index(
435 ('control_ping' + '_' + crc[2:]).encode())
436 self.control_ping_msgdef = self.messages['control_ping']
437 if self.async_thread:
438 self.event_thread = threading.Thread(
439 target=self.thread_msg_handler)
440 self.event_thread.daemon = True
441 self.event_thread.start()
444 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
447 name - the name of the client.
448 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
449 do_async - if true, messages are sent without waiting for a reply
450 rx_qlen - the length of the VPP message receive queue between
453 msg_handler = self.transport.get_callback(do_async)
454 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
457 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
458 """Attach to VPP in synchronous mode. Application must poll for events.
460 name - the name of the client.
461 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
462 rx_qlen - the length of the VPP message receive queue between
466 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
469 def disconnect(self):
470 """Detach from VPP."""
471 rv = self.transport.disconnect()
472 self.message_queue.put("terminate event thread")
475 def msg_handler_sync(self, msg):
476 """Process an incoming message from VPP in sync mode.
478 The message may be a reply or it may be an async notification.
480 r = self.decode_incoming_msg(msg)
484 # If we have a context, then use the context to find any
485 # request waiting for a reply
487 if hasattr(r, 'context') and r.context > 0:
491 # No context -> async notification that we feed to the callback
492 self.message_queue.put_nowait(r)
494 raise VPPIOError(2, 'RPC reply message received in event handler')
496 def decode_incoming_msg(self, msg, no_type_conversion=False):
498 self.logger.warning('vpp_api.read failed')
500 (i, ci), size = self.header.unpack(msg, 0)
501 if self.id_names[i] == 'rx_thread_exit':
505 # Decode message and returns a tuple.
507 msgobj = self.id_msgdef[i]
509 raise VPPIOError(2, 'Reply message undefined')
511 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
514 def msg_handler_async(self, msg):
515 """Process a message from VPP in async mode.
517 In async mode, all messages are returned to the callback.
519 r = self.decode_incoming_msg(msg)
523 msgname = type(r).__name__
525 if self.event_callback:
526 self.event_callback(msgname, r)
528 def _control_ping(self, context):
529 """Send a ping command."""
530 self._call_vpp_async(self.control_ping_index,
531 self.control_ping_msgdef,
534 def validate_args(self, msg, kwargs):
535 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
537 raise VPPValueError('Invalid argument {} to {}'
538 .format(list(d), msg.name))
540 def _call_vpp(self, i, msg, multipart, **kwargs):
541 """Given a message, send the message and await a reply.
543 msgdef - the message packing definition
544 i - the message type index
545 multipart - True if the message returns multiple
547 context - context number - chosen at random if not
549 The remainder of the kwargs are the arguments to the API call.
551 The return value is the message or message array containing
552 the response. It will raise an IOError exception if there was
553 no response within the timeout window.
556 if 'context' not in kwargs:
557 context = self.get_context()
558 kwargs['context'] = context
560 context = kwargs['context']
561 kwargs['_vl_msg_id'] = i
563 no_type_conversion = kwargs.pop('_no_type_conversion', False)
566 if self.transport.socket_index:
567 kwargs['client_index'] = self.transport.socket_index
568 except AttributeError:
570 self.validate_args(msg, kwargs)
572 self.transport.suspend()
574 self.transport.write(b)
577 # Send a ping after the request - we use its response
578 # to detect that we have seen all results.
579 self._control_ping(context)
581 # Block until we get a reply.
584 msg = self.transport.read()
586 raise VPPIOError(2, 'VPP API client: read failed')
587 r = self.decode_incoming_msg(msg, no_type_conversion)
588 msgname = type(r).__name__
589 if context not in r or r.context == 0 or context != r.context:
590 # Message being queued
591 self.message_queue.put_nowait(r)
597 if msgname == 'control_ping_reply':
602 self.transport.resume()
606 def _call_vpp_async(self, i, msg, **kwargs):
607 """Given a message, send the message and await a reply.
609 msgdef - the message packing definition
610 i - the message type index
611 context - context number - chosen at random if not
613 The remainder of the kwargs are the arguments to the API call.
615 if 'context' not in kwargs:
616 context = self.get_context()
617 kwargs['context'] = context
619 context = kwargs['context']
621 if self.transport.socket_index:
622 kwargs['client_index'] = self.transport.socket_index
623 except AttributeError:
624 kwargs['client_index'] = 0
625 kwargs['_vl_msg_id'] = i
628 self.transport.write(b)
630 def register_event_callback(self, callback):
631 """Register a callback for async messages.
633 This will be called for async notifications in sync mode,
634 and all messages in async mode. In sync mode, replies to
635 requests will not come here.
637 callback is a fn(msg_type_name, msg_type) that will be
638 called when a message comes in. While this function is
639 executing, note that (a) you are in a background thread and
640 may wish to use threading.Lock to protect your datastructures,
641 and (b) message processing from VPP will stop (so if you take
642 a long while about it you may provoke reply timeouts or cause
643 VPP to fill the RX buffer). Passing None will disable the
646 self.event_callback = callback
648 def thread_msg_handler(self):
649 """Python thread calling the user registered message handler.
651 This is to emulate the old style event callback scheme. Modern
652 clients should provide their own thread to poll the event
656 r = self.message_queue.get()
657 if r == "terminate event thread":
659 msgname = type(r).__name__
660 if self.event_callback:
661 self.event_callback(msgname, r)
664 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4