3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
29 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
30 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
31 from . macaddress import MACAddress, mac_pton, mac_ntop
33 logger = logging.getLogger(__name__)
35 if sys.version[0] == '2':
41 class VppEnumType(type):
42 def __getattr__(cls, name):
43 t = vpp_get_type(name)
48 # class VppEnum(metaclass=VppEnumType):
50 class VppEnum(object):
51 __metaclass__ = VppEnumType
54 def vpp_atexit(vpp_weakref):
55 """Clean up VPP connection on shutdown."""
56 vpp_instance = vpp_weakref()
57 if vpp_instance and vpp_instance.transport.connected:
58 vpp_instance.logger.debug('Cleaning up VPP on exit')
59 vpp_instance.disconnect()
62 if sys.version[0] == '2':
70 def call_logger(msgdef, kwargs):
71 s = 'Calling {}('.format(msgdef.name)
72 for k, v in kwargs.items():
73 s += '{}:{} '.format(k, v)
79 s = 'Return from {}'.format(r)
83 class VppApiDynamicMethodHolder(object):
87 class FuncWrapper(object):
88 def __init__(self, func):
90 self.__name__ = func.__name__
92 def __call__(self, **kwargs):
93 return self._func(**kwargs)
96 class VPPApiError(Exception):
100 class VPPNotImplementedError(NotImplementedError):
104 class VPPIOError(IOError):
108 class VPPRuntimeError(RuntimeError):
112 class VPPValueError(ValueError):
119 This class provides the APIs to VPP. The APIs are loaded
120 from provided .api.json files and makes functions accordingly.
121 These functions are documented in the VPP .api files, as they
122 are dynamically created.
124 Additionally, VPP can send callback messages; this class
125 provides a means to register a callback function to receive
126 these messages in a background thread.
128 VPPApiError = VPPApiError
129 VPPRuntimeError = VPPRuntimeError
130 VPPValueError = VPPValueError
131 VPPNotImplementedError = VPPNotImplementedError
132 VPPIOError = VPPIOError
134 def process_json_file(self, apidef_file):
135 api = json.load(apidef_file)
137 for t in api['enums']:
138 t[0] = 'vl_api_' + t[0] + '_t'
139 types[t[0]] = {'type': 'enum', 'data': t}
140 for t in api['unions']:
141 t[0] = 'vl_api_' + t[0] + '_t'
142 types[t[0]] = {'type': 'union', 'data': t}
143 for t in api['types']:
144 t[0] = 'vl_api_' + t[0] + '_t'
145 types[t[0]] = {'type': 'type', 'data': t}
146 for t, v in api['aliases'].items():
147 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
148 self.services.update(api['services'])
153 for k, v in types.items():
155 if not vpp_get_type(k):
156 if v['type'] == 'enum':
158 VPPEnumType(t[0], t[1:])
161 elif v['type'] == 'union':
163 VPPUnionType(t[0], t[1:])
166 elif v['type'] == 'type':
171 elif v['type'] == 'alias':
176 if len(unresolved) == 0:
179 raise VPPValueError('Unresolved type definitions {}'
184 for m in api['messages']:
186 self.messages[m[0]] = VPPMessage(m[0], m[1:])
187 except VPPNotImplementedError:
188 self.logger.error('Not implemented error for {}'.format(m[0]))
190 def __init__(self, apifiles=None, testmode=False, async_thread=True,
191 logger=None, loglevel=None,
192 read_timeout=5, use_socket=False,
193 server_address='/run/vpp-api.sock'):
194 """Create a VPP API object.
196 apifiles is a list of files containing API
197 descriptions that will be loaded - methods will be
198 dynamically created reflecting these APIs. If not
199 provided this will load the API files from VPP's
200 default install location.
202 logger, if supplied, is the logging logger object to log to.
203 loglevel, if supplied, is the log level this logger is set
204 to report at (from the loglevels in the logging module).
207 logger = logging.getLogger(__name__)
208 if loglevel is not None:
209 logger.setLevel(loglevel)
216 self.header = VPPType('header', [['u16', 'msgid'],
217 ['u32', 'client_index']])
219 self.event_callback = None
220 self.message_queue = queue.Queue()
221 self.read_timeout = read_timeout
222 self.async_thread = async_thread
225 from . vpp_transport_socket import VppTransport
227 from . vpp_transport_shmem import VppTransport
230 # Pick up API definitions from default directory
232 apifiles = self.find_api_files()
234 # In test mode we don't care that we can't find the API files
238 raise VPPRuntimeError
240 for file in apifiles:
241 with open(file) as apidef_file:
242 self.process_json_file(apidef_file)
244 self.apifiles = apifiles
247 if len(self.messages) == 0 and not testmode:
248 raise VPPValueError(1, 'Missing JSON message definitions')
250 self.transport = VppTransport(self, read_timeout=read_timeout,
251 server_address=server_address)
252 # Make sure we allow VPP to clean up the message rings.
253 atexit.register(vpp_atexit, weakref.ref(self))
255 class ContextId(object):
256 """Thread-safe provider of unique context IDs."""
259 self.lock = threading.Lock()
262 """Get a new unique (or, at least, not recently used) context."""
266 get_context = ContextId()
268 def get_type(self, name):
269 return vpp_get_type(name)
272 def find_api_dir(cls):
273 """Attempt to find the best directory in which API definition
274 files may reside. If the value VPP_API_DIR exists in the environment
275 then it is first on the search list. If we're inside a recognized
276 location in a VPP source tree (src/scripts and src/vpp-api/python)
277 then entries from there to the likely locations in build-root are
278 added. Finally the location used by system packages is added.
280 :returns: A single directory name, or None if no such directory
285 if 'VPP_API_DIR' in os.environ:
286 dirs.append(os.environ['VPP_API_DIR'])
288 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
289 # in which case, plot a course to likely places in the src tree
290 import __main__ as main
291 if hasattr(main, '__file__'):
292 # get the path of the calling script
293 localdir = os.path.dirname(os.path.realpath(main.__file__))
295 # use cwd if there is no calling script
296 localdir = os.getcwd()
297 localdir_s = localdir.split(os.path.sep)
300 """Match dir against right-hand components of the script dir"""
301 d = dir.split('/') # param 'dir' assumes a / separator
303 return len(localdir_s) > length and localdir_s[-length:] == d
305 def sdir(srcdir, variant):
306 """Build a path from srcdir to the staged API files of
307 'variant' (typically '' or '_debug')"""
308 # Since 'core' and 'plugin' files are staged
309 # in separate directories, we target the parent dir.
310 return os.path.sep.join((
313 'install-vpp%s-native' % variant,
321 if dmatch('src/scripts'):
322 srcdir = os.path.sep.join(localdir_s[:-2])
323 elif dmatch('src/vpp-api/python'):
324 srcdir = os.path.sep.join(localdir_s[:-3])
326 # we're apparently running tests
327 srcdir = os.path.sep.join(localdir_s[:-1])
330 # we're in the source tree, try both the debug and release
332 dirs.append(sdir(srcdir, '_debug'))
333 dirs.append(sdir(srcdir, ''))
335 # Test for staged copies of the scripts
336 # For these, since we explicitly know if we're running a debug versus
337 # release variant, target only the relevant directory
338 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
339 srcdir = os.path.sep.join(localdir_s[:-4])
340 dirs.append(sdir(srcdir, '_debug'))
341 if dmatch('build-root/install-vpp-native/vpp/bin'):
342 srcdir = os.path.sep.join(localdir_s[:-4])
343 dirs.append(sdir(srcdir, ''))
345 # finally, try the location system packages typically install into
346 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
348 # check the directories for existance; first one wins
350 if os.path.isdir(dir):
356 def find_api_files(cls, api_dir=None, patterns='*'):
357 """Find API definition files from the given directory tree with the
358 given pattern. If no directory is given then find_api_dir() is used
359 to locate one. If no pattern is given then all definition files found
360 in the directory tree are used.
362 :param api_dir: A directory tree in which to locate API definition
363 files; subdirectories are descended into.
364 If this is None then find_api_dir() is called to discover it.
365 :param patterns: A list of patterns to use in each visited directory
366 when looking for files.
367 This can be a list/tuple object or a comma-separated string of
368 patterns. Each value in the list will have leading/trialing
370 The pattern specifies the first part of the filename, '.api.json'
372 The results are de-duplicated, thus overlapping patterns are fine.
373 If this is None it defaults to '*' meaning "all API files".
374 :returns: A list of file paths for the API files found.
377 api_dir = cls.find_api_dir()
379 raise VPPApiError("api_dir cannot be located")
381 if isinstance(patterns, list) or isinstance(patterns, tuple):
382 patterns = [p.strip() + '.api.json' for p in patterns]
384 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
387 for root, dirnames, files in os.walk(api_dir):
388 # iterate all given patterns and de-dup the result
389 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
390 for filename in files:
391 api_files.append(os.path.join(root, filename))
397 if not hasattr(self, "_api"):
398 raise VPPApiError("Not connected, api definitions not available")
401 def make_function(self, msg, i, multipart, do_async):
404 return self._call_vpp_async(i, msg, **kwargs)
407 return self._call_vpp(i, msg, multipart, **kwargs)
409 f.__name__ = str(msg.name)
410 f.__doc__ = ", ".join(["%s %s" %
411 (msg.fieldtypes[j], k)
412 for j, k in enumerate(msg.fields)])
415 def _register_functions(self, do_async=False):
416 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
417 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
418 self._api = VppApiDynamicMethodHolder()
419 for name, msg in vpp_iterator(self.messages):
420 n = name + '_' + msg.crc[2:]
421 i = self.transport.get_msg_index(n.encode())
423 self.id_msgdef[i] = msg
424 self.id_names[i] = name
426 # Create function for client side messages.
427 if name in self.services:
428 if 'stream' in self.services[name] and \
429 self.services[name]['stream']:
433 f = self.make_function(msg, i, multipart, do_async)
434 setattr(self._api, name, FuncWrapper(f))
437 'No such message type or failed CRC checksum: %s', n)
439 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
441 pfx = chroot_prefix.encode() if chroot_prefix else None
443 rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
445 raise VPPIOError(2, 'Connect failed')
446 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
447 self._register_functions(do_async=do_async)
449 # Initialise control ping
450 crc = self.messages['control_ping'].crc
451 self.control_ping_index = self.transport.get_msg_index(
452 ('control_ping' + '_' + crc[2:]).encode())
453 self.control_ping_msgdef = self.messages['control_ping']
454 if self.async_thread:
455 self.event_thread = threading.Thread(
456 target=self.thread_msg_handler)
457 self.event_thread.daemon = True
458 self.event_thread.start()
461 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
464 name - the name of the client.
465 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
466 do_async - if true, messages are sent without waiting for a reply
467 rx_qlen - the length of the VPP message receive queue between
470 msg_handler = self.transport.get_callback(do_async)
471 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
474 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
475 """Attach to VPP in synchronous mode. Application must poll for events.
477 name - the name of the client.
478 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
479 rx_qlen - the length of the VPP message receive queue between
483 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
486 def disconnect(self):
487 """Detach from VPP."""
488 rv = self.transport.disconnect()
489 self.message_queue.put("terminate event thread")
492 def msg_handler_sync(self, msg):
493 """Process an incoming message from VPP in sync mode.
495 The message may be a reply or it may be an async notification.
497 r = self.decode_incoming_msg(msg)
501 # If we have a context, then use the context to find any
502 # request waiting for a reply
504 if hasattr(r, 'context') and r.context > 0:
508 # No context -> async notification that we feed to the callback
509 self.message_queue.put_nowait(r)
511 raise VPPIOError(2, 'RPC reply message received in event handler')
513 def has_context(self, msg):
517 header = VPPType('header_with_context', [['u16', 'msgid'],
518 ['u32', 'client_index'],
521 (i, ci, context), size = header.unpack(msg, 0)
522 if self.id_names[i] == 'rx_thread_exit':
526 # Decode message and returns a tuple.
528 msgobj = self.id_msgdef[i]
529 if 'context' in msgobj.field_by_name and context >= 0:
533 def decode_incoming_msg(self, msg, no_type_conversion=False):
535 self.logger.warning('vpp_api.read failed')
538 (i, ci), size = self.header.unpack(msg, 0)
539 if self.id_names[i] == 'rx_thread_exit':
543 # Decode message and returns a tuple.
545 msgobj = self.id_msgdef[i]
547 raise VPPIOError(2, 'Reply message undefined')
549 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
552 def msg_handler_async(self, msg):
553 """Process a message from VPP in async mode.
555 In async mode, all messages are returned to the callback.
557 r = self.decode_incoming_msg(msg)
561 msgname = type(r).__name__
563 if self.event_callback:
564 self.event_callback(msgname, r)
566 def _control_ping(self, context):
567 """Send a ping command."""
568 self._call_vpp_async(self.control_ping_index,
569 self.control_ping_msgdef,
572 def validate_args(self, msg, kwargs):
573 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
575 raise VPPValueError('Invalid argument {} to {}'
576 .format(list(d), msg.name))
578 def _call_vpp(self, i, msgdef, multipart, **kwargs):
579 """Given a message, send the message and await a reply.
581 msgdef - the message packing definition
582 i - the message type index
583 multipart - True if the message returns multiple
585 context - context number - chosen at random if not
587 The remainder of the kwargs are the arguments to the API call.
589 The return value is the message or message array containing
590 the response. It will raise an IOError exception if there was
591 no response within the timeout window.
594 if 'context' not in kwargs:
595 context = self.get_context()
596 kwargs['context'] = context
598 context = kwargs['context']
599 kwargs['_vl_msg_id'] = i
601 no_type_conversion = kwargs.pop('_no_type_conversion', False)
604 if self.transport.socket_index:
605 kwargs['client_index'] = self.transport.socket_index
606 except AttributeError:
608 self.validate_args(msgdef, kwargs)
610 logging.debug(call_logger(msgdef, kwargs))
612 b = msgdef.pack(kwargs)
613 self.transport.suspend()
615 self.transport.write(b)
618 # Send a ping after the request - we use its response
619 # to detect that we have seen all results.
620 self._control_ping(context)
622 # Block until we get a reply.
625 msg = self.transport.read()
627 raise VPPIOError(2, 'VPP API client: read failed')
628 r = self.decode_incoming_msg(msg, no_type_conversion)
629 msgname = type(r).__name__
630 if context not in r or r.context == 0 or context != r.context:
631 # Message being queued
632 self.message_queue.put_nowait(r)
638 if msgname == 'control_ping_reply':
643 self.transport.resume()
645 logger.debug(return_logger(rl))
648 def _call_vpp_async(self, i, msg, **kwargs):
649 """Given a message, send the message and await a reply.
651 msgdef - the message packing definition
652 i - the message type index
653 context - context number - chosen at random if not
655 The remainder of the kwargs are the arguments to the API call.
657 if 'context' not in kwargs:
658 context = self.get_context()
659 kwargs['context'] = context
661 context = kwargs['context']
663 if self.transport.socket_index:
664 kwargs['client_index'] = self.transport.socket_index
665 except AttributeError:
666 kwargs['client_index'] = 0
667 kwargs['_vl_msg_id'] = i
670 self.transport.write(b)
672 def register_event_callback(self, callback):
673 """Register a callback for async messages.
675 This will be called for async notifications in sync mode,
676 and all messages in async mode. In sync mode, replies to
677 requests will not come here.
679 callback is a fn(msg_type_name, msg_type) that will be
680 called when a message comes in. While this function is
681 executing, note that (a) you are in a background thread and
682 may wish to use threading.Lock to protect your datastructures,
683 and (b) message processing from VPP will stop (so if you take
684 a long while about it you may provoke reply timeouts or cause
685 VPP to fill the RX buffer). Passing None will disable the
688 self.event_callback = callback
690 def thread_msg_handler(self):
691 """Python thread calling the user registered message handler.
693 This is to emulate the old style event callback scheme. Modern
694 clients should provide their own thread to poll the event
698 r = self.message_queue.get()
699 if r == "terminate event thread":
701 msgname = type(r).__name__
702 if self.event_callback:
703 self.event_callback(msgname, r)
706 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4