3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
21 import multiprocessing as mp
31 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType
32 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
34 if sys.version[0] == '2':
39 __all__ = ('FuncWrapper', 'VPP', 'VppApiDynamicMethodHolder',
40 'VppEnum', 'VppEnumType',
41 'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
45 def metaclass(metaclass):
46 @functools.wraps(metaclass)
48 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
53 class VppEnumType(type):
54 def __getattr__(cls, name):
55 t = vpp_get_type(name)
59 @metaclass(VppEnumType)
60 class VppEnum(object):
64 def vpp_atexit(vpp_weakref):
65 """Clean up VPP connection on shutdown."""
66 vpp_instance = vpp_weakref()
67 if vpp_instance and vpp_instance.transport.connected:
68 vpp_instance.logger.debug('Cleaning up VPP on exit')
69 vpp_instance.disconnect()
72 if sys.version[0] == '2':
80 class VppApiDynamicMethodHolder(object):
84 class FuncWrapper(object):
85 def __init__(self, func):
87 self.__name__ = func.__name__
88 self.__doc__ = func.__doc__
90 def __call__(self, **kwargs):
91 return self._func(**kwargs)
94 return '<FuncWrapper(func=<%s(%s)>)>' % (self.__name__, self.__doc__)
97 class VPPApiError(Exception):
101 class VPPNotImplementedError(NotImplementedError):
105 class VPPIOError(IOError):
109 class VPPRuntimeError(RuntimeError):
113 class VPPValueError(ValueError):
116 class VPPApiJSONFiles(object):
118 def find_api_dir(cls, dirs):
119 """Attempt to find the best directory in which API definition
120 files may reside. If the value VPP_API_DIR exists in the environment
121 then it is first on the search list. If we're inside a recognized
122 location in a VPP source tree (src/scripts and src/vpp-api/python)
123 then entries from there to the likely locations in build-root are
124 added. Finally the location used by system packages is added.
126 :returns: A single directory name, or None if no such directory
130 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
131 # in which case, plot a course to likely places in the src tree
132 import __main__ as main
133 if hasattr(main, '__file__'):
134 # get the path of the calling script
135 localdir = os.path.dirname(os.path.realpath(main.__file__))
137 # use cwd if there is no calling script
138 localdir = os.getcwd()
139 localdir_s = localdir.split(os.path.sep)
142 """Match dir against right-hand components of the script dir"""
143 d = dir.split('/') # param 'dir' assumes a / separator
145 return len(localdir_s) > length and localdir_s[-length:] == d
147 def sdir(srcdir, variant):
148 """Build a path from srcdir to the staged API files of
149 'variant' (typically '' or '_debug')"""
150 # Since 'core' and 'plugin' files are staged
151 # in separate directories, we target the parent dir.
152 return os.path.sep.join((
155 'install-vpp%s-native' % variant,
163 if dmatch('src/scripts'):
164 srcdir = os.path.sep.join(localdir_s[:-2])
165 elif dmatch('src/vpp-api/python'):
166 srcdir = os.path.sep.join(localdir_s[:-3])
168 # we're apparently running tests
169 srcdir = os.path.sep.join(localdir_s[:-1])
172 # we're in the source tree, try both the debug and release
174 dirs.append(sdir(srcdir, '_debug'))
175 dirs.append(sdir(srcdir, ''))
177 # Test for staged copies of the scripts
178 # For these, since we explicitly know if we're running a debug versus
179 # release variant, target only the relevant directory
180 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
181 srcdir = os.path.sep.join(localdir_s[:-4])
182 dirs.append(sdir(srcdir, '_debug'))
183 if dmatch('build-root/install-vpp-native/vpp/bin'):
184 srcdir = os.path.sep.join(localdir_s[:-4])
185 dirs.append(sdir(srcdir, ''))
187 # finally, try the location system packages typically install into
188 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
190 # check the directories for existence; first one wins
192 if os.path.isdir(dir):
198 def find_api_files(cls, api_dir=None, patterns='*'):
199 """Find API definition files from the given directory tree with the
200 given pattern. If no directory is given then find_api_dir() is used
201 to locate one. If no pattern is given then all definition files found
202 in the directory tree are used.
204 :param api_dir: A directory tree in which to locate API definition
205 files; subdirectories are descended into.
206 If this is None then find_api_dir() is called to discover it.
207 :param patterns: A list of patterns to use in each visited directory
208 when looking for files.
209 This can be a list/tuple object or a comma-separated string of
210 patterns. Each value in the list will have leading/trialing
212 The pattern specifies the first part of the filename, '.api.json'
214 The results are de-duplicated, thus overlapping patterns are fine.
215 If this is None it defaults to '*' meaning "all API files".
216 :returns: A list of file paths for the API files found.
219 api_dir = cls.find_api_dir([])
221 raise VPPApiError("api_dir cannot be located")
223 if isinstance(patterns, list) or isinstance(patterns, tuple):
224 patterns = [p.strip() + '.api.json' for p in patterns]
226 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
229 for root, dirnames, files in os.walk(api_dir):
230 # iterate all given patterns and de-dup the result
231 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
232 for filename in files:
233 api_files.append(os.path.join(root, filename))
238 def process_json_file(self, apidef_file):
239 api = json.load(apidef_file)
243 for t in api['enums']:
244 t[0] = 'vl_api_' + t[0] + '_t'
245 types[t[0]] = {'type': 'enum', 'data': t}
246 for t in api['unions']:
247 t[0] = 'vl_api_' + t[0] + '_t'
248 types[t[0]] = {'type': 'union', 'data': t}
249 for t in api['types']:
250 t[0] = 'vl_api_' + t[0] + '_t'
251 types[t[0]] = {'type': 'type', 'data': t}
252 for t, v in api['aliases'].items():
253 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
254 services.update(api['services'])
259 for k, v in types.items():
261 if not vpp_get_type(k):
262 if v['type'] == 'enum':
264 VPPEnumType(t[0], t[1:])
267 elif v['type'] == 'union':
269 VPPUnionType(t[0], t[1:])
272 elif v['type'] == 'type':
277 elif v['type'] == 'alias':
282 if len(unresolved) == 0:
285 raise VPPValueError('Unresolved type definitions {}'
290 for m in api['messages']:
292 messages[m[0]] = VPPMessage(m[0], m[1:])
293 except VPPNotImplementedError:
295 self.logger.error('Not implemented error for {}'.format(m[0]))
296 return messages, services
298 class VPPApiClient(object):
301 This class provides the APIs to VPP. The APIs are loaded
302 from provided .api.json files and makes functions accordingly.
303 These functions are documented in the VPP .api files, as they
304 are dynamically created.
306 Additionally, VPP can send callback messages; this class
307 provides a means to register a callback function to receive
308 these messages in a background thread.
311 VPPApiError = VPPApiError
312 VPPRuntimeError = VPPRuntimeError
313 VPPValueError = VPPValueError
314 VPPNotImplementedError = VPPNotImplementedError
315 VPPIOError = VPPIOError
318 def __init__(self, apifiles=None, testmode=False, async_thread=True,
319 logger=None, loglevel=None,
320 read_timeout=5, use_socket=False,
321 server_address='/run/vpp/api.sock'):
322 """Create a VPP API object.
324 apifiles is a list of files containing API
325 descriptions that will be loaded - methods will be
326 dynamically created reflecting these APIs. If not
327 provided this will load the API files from VPP's
328 default install location.
330 logger, if supplied, is the logging logger object to log to.
331 loglevel, if supplied, is the log level this logger is set
332 to report at (from the loglevels in the logging module).
335 logger = logging.getLogger(
336 "{}.{}".format(__name__, self.__class__.__name__))
337 if loglevel is not None:
338 logger.setLevel(loglevel)
345 self.header = VPPType('header', [['u16', 'msgid'],
346 ['u32', 'client_index']])
348 self.event_callback = None
349 self.message_queue = queue.Queue()
350 self.read_timeout = read_timeout
351 self.async_thread = async_thread
352 self.event_thread = None
353 self.testmode = testmode
354 self.use_socket = use_socket
355 self.server_address = server_address
356 self._apifiles = apifiles
360 from . vpp_transport_socket import VppTransport
362 from . vpp_transport_shmem import VppTransport
365 # Pick up API definitions from default directory
367 apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
369 # In test mode we don't care that we can't find the API files
373 raise VPPRuntimeError
375 for file in apifiles:
376 with open(file) as apidef_file:
377 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
378 self.messages.update(m)
379 self.services.update(s)
381 self.apifiles = apifiles
384 if len(self.messages) == 0 and not testmode:
385 raise VPPValueError(1, 'Missing JSON message definitions')
387 self.transport = VppTransport(self, read_timeout=read_timeout,
388 server_address=server_address)
389 # Make sure we allow VPP to clean up the message rings.
390 atexit.register(vpp_atexit, weakref.ref(self))
392 def get_function(self, name):
393 return getattr(self._api, name)
396 class ContextId(object):
397 """Multiprocessing-safe provider of unique context IDs."""
399 self.context = mp.Value(ctypes.c_uint, 0)
400 self.lock = mp.Lock()
403 """Get a new unique (or, at least, not recently used) context."""
405 self.context.value += 1
406 return self.context.value
407 get_context = ContextId()
409 def get_type(self, name):
410 return vpp_get_type(name)
414 if not hasattr(self, "_api"):
415 raise VPPApiError("Not connected, api definitions not available")
418 def make_function(self, msg, i, multipart, do_async):
421 return self._call_vpp_async(i, msg, **kwargs)
424 return self._call_vpp(i, msg, multipart, **kwargs)
426 f.__name__ = str(msg.name)
427 f.__doc__ = ", ".join(["%s %s" %
428 (msg.fieldtypes[j], k)
429 for j, k in enumerate(msg.fields)])
434 def _register_functions(self, do_async=False):
435 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
436 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
437 self._api = VppApiDynamicMethodHolder()
438 for name, msg in vpp_iterator(self.messages):
439 n = name + '_' + msg.crc[2:]
440 i = self.transport.get_msg_index(n)
442 self.id_msgdef[i] = msg
443 self.id_names[i] = name
445 # Create function for client side messages.
446 if name in self.services:
447 if 'stream' in self.services[name] and \
448 self.services[name]['stream']:
452 f = self.make_function(msg, i, multipart, do_async)
453 setattr(self._api, name, FuncWrapper(f))
456 'No such message type or failed CRC checksum: %s', n)
458 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
460 pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
462 rv = self.transport.connect(name, pfx,
463 msg_handler, rx_qlen)
465 raise VPPIOError(2, 'Connect failed')
466 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
467 self._register_functions(do_async=do_async)
469 # Initialise control ping
470 crc = self.messages['control_ping'].crc
471 self.control_ping_index = self.transport.get_msg_index(
472 ('control_ping' + '_' + crc[2:]))
473 self.control_ping_msgdef = self.messages['control_ping']
474 if self.async_thread:
475 self.event_thread = threading.Thread(
476 target=self.thread_msg_handler)
477 self.event_thread.daemon = True
478 self.event_thread.start()
480 self.event_thread = None
483 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
486 name - the name of the client.
487 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
488 do_async - if true, messages are sent without waiting for a reply
489 rx_qlen - the length of the VPP message receive queue between
492 msg_handler = self.transport.get_callback(do_async)
493 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
496 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
497 """Attach to VPP in synchronous mode. Application must poll for events.
499 name - the name of the client.
500 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
501 rx_qlen - the length of the VPP message receive queue between
505 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
508 def disconnect(self):
509 """Detach from VPP."""
510 rv = self.transport.disconnect()
511 if self.event_thread is not None:
512 self.message_queue.put("terminate event thread")
515 def msg_handler_sync(self, msg):
516 """Process an incoming message from VPP in sync mode.
518 The message may be a reply or it may be an async notification.
520 r = self.decode_incoming_msg(msg)
524 # If we have a context, then use the context to find any
525 # request waiting for a reply
527 if hasattr(r, 'context') and r.context > 0:
531 # No context -> async notification that we feed to the callback
532 self.message_queue.put_nowait(r)
534 raise VPPIOError(2, 'RPC reply message received in event handler')
536 def has_context(self, msg):
540 header = VPPType('header_with_context', [['u16', 'msgid'],
541 ['u32', 'client_index'],
544 (i, ci, context), size = header.unpack(msg, 0)
545 if self.id_names[i] == 'rx_thread_exit':
549 # Decode message and returns a tuple.
551 msgobj = self.id_msgdef[i]
552 if 'context' in msgobj.field_by_name and context >= 0:
556 def decode_incoming_msg(self, msg, no_type_conversion=False):
558 self.logger.warning('vpp_api.read failed')
561 (i, ci), size = self.header.unpack(msg, 0)
562 if self.id_names[i] == 'rx_thread_exit':
566 # Decode message and returns a tuple.
568 msgobj = self.id_msgdef[i]
570 raise VPPIOError(2, 'Reply message undefined')
572 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
575 def msg_handler_async(self, msg):
576 """Process a message from VPP in async mode.
578 In async mode, all messages are returned to the callback.
580 r = self.decode_incoming_msg(msg)
584 msgname = type(r).__name__
586 if self.event_callback:
587 self.event_callback(msgname, r)
589 def _control_ping(self, context):
590 """Send a ping command."""
591 self._call_vpp_async(self.control_ping_index,
592 self.control_ping_msgdef,
595 def validate_args(self, msg, kwargs):
596 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
598 raise VPPValueError('Invalid argument {} to {}'
599 .format(list(d), msg.name))
601 def _add_stat(self, name, ms):
602 if not name in self.stats:
603 self.stats[name] = {'max': ms, 'count': 1, 'avg': ms}
605 if ms > self.stats[name]['max']:
606 self.stats[name]['max'] = ms
607 self.stats[name]['count'] += 1
608 n = self.stats[name]['count']
609 self.stats[name]['avg'] = self.stats[name]['avg'] * (n - 1) / n + ms / n
612 s = '\n=== API PAPI STATISTICS ===\n'
613 s += '{:<30} {:>4} {:>6} {:>6}\n'.format('message', 'cnt', 'avg', 'max')
614 for n in sorted(self.stats.items(), key=lambda v: v[1]['avg'], reverse=True):
615 s += '{:<30} {:>4} {:>6.2f} {:>6.2f}\n'.format(n[0], n[1]['count'],
616 n[1]['avg'], n[1]['max'])
619 def _call_vpp(self, i, msgdef, multipart, **kwargs):
620 """Given a message, send the message and await a reply.
622 msgdef - the message packing definition
623 i - the message type index
624 multipart - True if the message returns multiple
626 context - context number - chosen at random if not
628 The remainder of the kwargs are the arguments to the API call.
630 The return value is the message or message array containing
631 the response. It will raise an IOError exception if there was
632 no response within the timeout window.
635 if 'context' not in kwargs:
636 context = self.get_context()
637 kwargs['context'] = context
639 context = kwargs['context']
640 kwargs['_vl_msg_id'] = i
642 no_type_conversion = kwargs.pop('_no_type_conversion', False)
643 timeout = kwargs.pop('_timeout', None)
646 if self.transport.socket_index:
647 kwargs['client_index'] = self.transport.socket_index
648 except AttributeError:
650 self.validate_args(msgdef, kwargs)
652 s = 'Calling {}({})'.format(msgdef.name,
653 ','.join(['{!r}:{!r}'.format(k, v) for k, v in kwargs.items()]))
656 b = msgdef.pack(kwargs)
657 self.transport.suspend()
659 self.transport.write(b)
662 # Send a ping after the request - we use its response
663 # to detect that we have seen all results.
664 self._control_ping(context)
666 # Block until we get a reply.
669 r = self.read_blocking(no_type_conversion, timeout)
671 raise VPPIOError(2, 'VPP API client: read failed')
672 msgname = type(r).__name__
673 if context not in r or r.context == 0 or context != r.context:
674 # Message being queued
675 self.message_queue.put_nowait(r)
681 if msgname == 'control_ping_reply':
686 self.transport.resume()
688 s = 'Return value: {!r}'.format(r)
693 self._add_stat(msgdef.name, (te - ts) * 1000)
696 def _call_vpp_async(self, i, msg, **kwargs):
697 """Given a message, send the message and return the context.
699 msgdef - the message packing definition
700 i - the message type index
701 context - context number - chosen at random if not
703 The remainder of the kwargs are the arguments to the API call.
705 The reply message(s) will be delivered later to the registered callback.
706 The returned context will help with assigning which call
707 the reply belongs to.
709 if 'context' not in kwargs:
710 context = self.get_context()
711 kwargs['context'] = context
713 context = kwargs['context']
715 if self.transport.socket_index:
716 kwargs['client_index'] = self.transport.socket_index
717 except AttributeError:
718 kwargs['client_index'] = 0
719 kwargs['_vl_msg_id'] = i
722 self.transport.write(b)
725 def read_blocking(self, no_type_conversion=False, timeout=None):
726 """Get next received message from transport within timeout, decoded.
728 Note that notifications have context zero
729 and are not put into receive queue (at least for socket transport),
730 use async_thread with registered callback for processing them.
732 If no message appears in the queue within timeout, return None.
734 Optionally, type conversion can be skipped,
735 as some of conversions are into less precise types.
737 When r is the return value of this, the caller can get message name as:
738 msgname = type(r).__name__
739 and context number (type long) as:
742 :param no_type_conversion: If false, type conversions are applied.
743 :type no_type_conversion: bool
744 :returns: Decoded message, or None if no message (within timeout).
745 :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
746 :raises VppTransportShmemIOError if timed out.
748 msg = self.transport.read(timeout=timeout)
751 return self.decode_incoming_msg(msg, no_type_conversion)
753 def register_event_callback(self, callback):
754 """Register a callback for async messages.
756 This will be called for async notifications in sync mode,
757 and all messages in async mode. In sync mode, replies to
758 requests will not come here.
760 callback is a fn(msg_type_name, msg_type) that will be
761 called when a message comes in. While this function is
762 executing, note that (a) you are in a background thread and
763 may wish to use threading.Lock to protect your datastructures,
764 and (b) message processing from VPP will stop (so if you take
765 a long while about it you may provoke reply timeouts or cause
766 VPP to fill the RX buffer). Passing None will disable the
769 self.event_callback = callback
771 def thread_msg_handler(self):
772 """Python thread calling the user registered message handler.
774 This is to emulate the old style event callback scheme. Modern
775 clients should provide their own thread to poll the event
779 r = self.message_queue.get()
780 if r == "terminate event thread":
782 msgname = type(r).__name__
783 if self.event_callback:
784 self.event_callback(msgname, r)
786 def validate_message_table(self, namecrctable):
787 """Take a dictionary of name_crc message names
788 and returns an array of missing messages"""
791 for name_crc in namecrctable:
792 i = self.transport.get_msg_index(name_crc)
794 missing_table.append(name_crc)
797 def dump_message_table(self):
798 """Return VPPs API message table as name_crc dictionary"""
799 return self.transport.message_table
801 def dump_message_table_filtered(self, msglist):
802 """Return VPPs API message table as name_crc dictionary,
803 filtered by message name list."""
805 replies = [self.services[n]['reply'] for n in msglist]
806 message_table_filtered = {}
807 for name in msglist + replies:
808 for k,v in self.transport.message_table.items():
809 if k.startswith(name):
810 message_table_filtered[k] = v
812 return message_table_filtered
815 return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
816 "logger=%s, read_timeout=%s, use_socket=%s, " \
817 "server_address='%s'>" % (
818 self._apifiles, self.testmode, self.async_thread,
819 self.logger, self.read_timeout, self.use_socket,
823 # Provide the old name for backward compatibility.
826 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4