3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
22 import multiprocessing as mp
32 from . vpp_format import verify_enum_hint
33 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType
34 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
36 if sys.version[0] == '2':
41 __all__ = ('FuncWrapper', 'VPP', 'VppApiDynamicMethodHolder',
42 'VppEnum', 'VppEnumType',
43 'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
47 def metaclass(metaclass):
48 @functools.wraps(metaclass)
50 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
55 class VppEnumType(type):
56 def __getattr__(cls, name):
57 t = vpp_get_type(name)
61 @metaclass(VppEnumType)
62 class VppEnum(object):
66 def vpp_atexit(vpp_weakref):
67 """Clean up VPP connection on shutdown."""
68 vpp_instance = vpp_weakref()
69 if vpp_instance and vpp_instance.transport.connected:
70 vpp_instance.logger.debug('Cleaning up VPP on exit')
71 vpp_instance.disconnect()
74 if sys.version[0] == '2':
82 def add_convenience_methods():
83 # provide convenience methods to IP[46]Address.vapi_af
85 if 6 == self._version:
86 return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
87 if 4 == self._version:
88 return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
89 raise ValueError("Invalid _version.")
91 def _vapi_af_name(self):
92 if 6 == self._version:
94 if 4 == self._version:
96 raise ValueError("Invalid _version.")
98 ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
99 ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
102 class VppApiDynamicMethodHolder(object):
106 class FuncWrapper(object):
107 def __init__(self, func):
109 self.__name__ = func.__name__
110 self.__doc__ = func.__doc__
112 def __call__(self, **kwargs):
113 return self._func(**kwargs)
116 return '<FuncWrapper(func=<%s(%s)>)>' % (self.__name__, self.__doc__)
119 class VPPApiError(Exception):
123 class VPPNotImplementedError(NotImplementedError):
127 class VPPIOError(IOError):
131 class VPPRuntimeError(RuntimeError):
135 class VPPValueError(ValueError):
139 class VPPApiJSONFiles(object):
141 def find_api_dir(cls, dirs):
142 """Attempt to find the best directory in which API definition
143 files may reside. If the value VPP_API_DIR exists in the environment
144 then it is first on the search list. If we're inside a recognized
145 location in a VPP source tree (src/scripts and src/vpp-api/python)
146 then entries from there to the likely locations in build-root are
147 added. Finally the location used by system packages is added.
149 :returns: A single directory name, or None if no such directory
153 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
154 # in which case, plot a course to likely places in the src tree
155 import __main__ as main
156 if hasattr(main, '__file__'):
157 # get the path of the calling script
158 localdir = os.path.dirname(os.path.realpath(main.__file__))
160 # use cwd if there is no calling script
161 localdir = os.getcwd()
162 localdir_s = localdir.split(os.path.sep)
165 """Match dir against right-hand components of the script dir"""
166 d = dir.split('/') # param 'dir' assumes a / separator
168 return len(localdir_s) > length and localdir_s[-length:] == d
170 def sdir(srcdir, variant):
171 """Build a path from srcdir to the staged API files of
172 'variant' (typically '' or '_debug')"""
173 # Since 'core' and 'plugin' files are staged
174 # in separate directories, we target the parent dir.
175 return os.path.sep.join((
178 'install-vpp%s-native' % variant,
186 if dmatch('src/scripts'):
187 srcdir = os.path.sep.join(localdir_s[:-2])
188 elif dmatch('src/vpp-api/python'):
189 srcdir = os.path.sep.join(localdir_s[:-3])
191 # we're apparently running tests
192 srcdir = os.path.sep.join(localdir_s[:-1])
195 # we're in the source tree, try both the debug and release
197 dirs.append(sdir(srcdir, '_debug'))
198 dirs.append(sdir(srcdir, ''))
200 # Test for staged copies of the scripts
201 # For these, since we explicitly know if we're running a debug versus
202 # release variant, target only the relevant directory
203 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
204 srcdir = os.path.sep.join(localdir_s[:-4])
205 dirs.append(sdir(srcdir, '_debug'))
206 if dmatch('build-root/install-vpp-native/vpp/bin'):
207 srcdir = os.path.sep.join(localdir_s[:-4])
208 dirs.append(sdir(srcdir, ''))
210 # finally, try the location system packages typically install into
211 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
213 # check the directories for existence; first one wins
215 if os.path.isdir(dir):
221 def find_api_files(cls, api_dir=None, patterns='*'):
222 """Find API definition files from the given directory tree with the
223 given pattern. If no directory is given then find_api_dir() is used
224 to locate one. If no pattern is given then all definition files found
225 in the directory tree are used.
227 :param api_dir: A directory tree in which to locate API definition
228 files; subdirectories are descended into.
229 If this is None then find_api_dir() is called to discover it.
230 :param patterns: A list of patterns to use in each visited directory
231 when looking for files.
232 This can be a list/tuple object or a comma-separated string of
233 patterns. Each value in the list will have leading/trialing
235 The pattern specifies the first part of the filename, '.api.json'
237 The results are de-duplicated, thus overlapping patterns are fine.
238 If this is None it defaults to '*' meaning "all API files".
239 :returns: A list of file paths for the API files found.
242 api_dir = cls.find_api_dir([])
244 raise VPPApiError("api_dir cannot be located")
246 if isinstance(patterns, list) or isinstance(patterns, tuple):
247 patterns = [p.strip() + '.api.json' for p in patterns]
249 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
252 for root, dirnames, files in os.walk(api_dir):
253 # iterate all given patterns and de-dup the result
254 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
255 for filename in files:
256 api_files.append(os.path.join(root, filename))
261 def process_json_file(self, apidef_file):
262 api = json.load(apidef_file)
266 for t in api['enums']:
267 t[0] = 'vl_api_' + t[0] + '_t'
268 types[t[0]] = {'type': 'enum', 'data': t}
269 for t in api['unions']:
270 t[0] = 'vl_api_' + t[0] + '_t'
271 types[t[0]] = {'type': 'union', 'data': t}
272 for t in api['types']:
273 t[0] = 'vl_api_' + t[0] + '_t'
274 types[t[0]] = {'type': 'type', 'data': t}
275 for t, v in api['aliases'].items():
276 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
277 services.update(api['services'])
282 for k, v in types.items():
284 if not vpp_get_type(k):
285 if v['type'] == 'enum':
287 VPPEnumType(t[0], t[1:])
290 elif v['type'] == 'union':
292 VPPUnionType(t[0], t[1:])
295 elif v['type'] == 'type':
300 elif v['type'] == 'alias':
305 if len(unresolved) == 0:
308 raise VPPValueError('Unresolved type definitions {}'
313 for m in api['messages']:
315 messages[m[0]] = VPPMessage(m[0], m[1:])
316 except VPPNotImplementedError:
318 self.logger.error('Not implemented error for {}'.format(m[0]))
319 return messages, services
322 class VPPApiClient(object):
325 This class provides the APIs to VPP. The APIs are loaded
326 from provided .api.json files and makes functions accordingly.
327 These functions are documented in the VPP .api files, as they
328 are dynamically created.
330 Additionally, VPP can send callback messages; this class
331 provides a means to register a callback function to receive
332 these messages in a background thread.
335 VPPApiError = VPPApiError
336 VPPRuntimeError = VPPRuntimeError
337 VPPValueError = VPPValueError
338 VPPNotImplementedError = VPPNotImplementedError
339 VPPIOError = VPPIOError
342 def __init__(self, apifiles=None, testmode=False, async_thread=True,
343 logger=None, loglevel=None,
344 read_timeout=5, use_socket=False,
345 server_address='/run/vpp/api.sock'):
346 """Create a VPP API object.
348 apifiles is a list of files containing API
349 descriptions that will be loaded - methods will be
350 dynamically created reflecting these APIs. If not
351 provided this will load the API files from VPP's
352 default install location.
354 logger, if supplied, is the logging logger object to log to.
355 loglevel, if supplied, is the log level this logger is set
356 to report at (from the loglevels in the logging module).
359 logger = logging.getLogger(
360 "{}.{}".format(__name__, self.__class__.__name__))
361 if loglevel is not None:
362 logger.setLevel(loglevel)
369 self.header = VPPType('header', [['u16', 'msgid'],
370 ['u32', 'client_index']])
372 self.event_callback = None
373 self.message_queue = queue.Queue()
374 self.read_timeout = read_timeout
375 self.async_thread = async_thread
376 self.event_thread = None
377 self.testmode = testmode
378 self.use_socket = use_socket
379 self.server_address = server_address
380 self._apifiles = apifiles
384 from . vpp_transport_socket import VppTransport
386 from . vpp_transport_shmem import VppTransport
389 # Pick up API definitions from default directory
391 apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
393 # In test mode we don't care that we can't find the API files
397 raise VPPRuntimeError
399 for file in apifiles:
400 with open(file) as apidef_file:
401 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
402 self.messages.update(m)
403 self.services.update(s)
405 self.apifiles = apifiles
408 if len(self.messages) == 0 and not testmode:
409 raise VPPValueError(1, 'Missing JSON message definitions')
410 if not(verify_enum_hint(VppEnum.vl_api_address_family_t)):
411 raise VPPRuntimeError("Invalid address family hints. "
414 self.transport = VppTransport(self, read_timeout=read_timeout,
415 server_address=server_address)
416 # Make sure we allow VPP to clean up the message rings.
417 atexit.register(vpp_atexit, weakref.ref(self))
419 add_convenience_methods()
421 def get_function(self, name):
422 return getattr(self._api, name)
424 class ContextId(object):
425 """Multiprocessing-safe provider of unique context IDs."""
427 self.context = mp.Value(ctypes.c_uint, 0)
428 self.lock = mp.Lock()
431 """Get a new unique (or, at least, not recently used) context."""
433 self.context.value += 1
434 return self.context.value
435 get_context = ContextId()
437 def get_type(self, name):
438 return vpp_get_type(name)
442 if not hasattr(self, "_api"):
443 raise VPPApiError("Not connected, api definitions not available")
446 def make_function(self, msg, i, multipart, do_async):
449 return self._call_vpp_async(i, msg, **kwargs)
452 return self._call_vpp(i, msg, multipart, **kwargs)
454 f.__name__ = str(msg.name)
455 f.__doc__ = ", ".join(["%s %s" %
456 (msg.fieldtypes[j], k)
457 for j, k in enumerate(msg.fields)])
462 def _register_functions(self, do_async=False):
463 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
464 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
465 self._api = VppApiDynamicMethodHolder()
466 for name, msg in vpp_iterator(self.messages):
467 n = name + '_' + msg.crc[2:]
468 i = self.transport.get_msg_index(n)
470 self.id_msgdef[i] = msg
471 self.id_names[i] = name
473 # Create function for client side messages.
474 if name in self.services:
475 f = self.make_function(msg, i, self.services[name], do_async)
476 setattr(self._api, name, FuncWrapper(f))
479 'No such message type or failed CRC checksum: %s', n)
481 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
483 pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
485 rv = self.transport.connect(name, pfx,
486 msg_handler, rx_qlen)
488 raise VPPIOError(2, 'Connect failed')
489 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
490 self._register_functions(do_async=do_async)
492 # Initialise control ping
493 crc = self.messages['control_ping'].crc
494 self.control_ping_index = self.transport.get_msg_index(
495 ('control_ping' + '_' + crc[2:]))
496 self.control_ping_msgdef = self.messages['control_ping']
497 if self.async_thread:
498 self.event_thread = threading.Thread(
499 target=self.thread_msg_handler)
500 self.event_thread.daemon = True
501 self.event_thread.start()
503 self.event_thread = None
506 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
509 name - the name of the client.
510 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
511 do_async - if true, messages are sent without waiting for a reply
512 rx_qlen - the length of the VPP message receive queue between
515 msg_handler = self.transport.get_callback(do_async)
516 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
519 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
520 """Attach to VPP in synchronous mode. Application must poll for events.
522 name - the name of the client.
523 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
524 rx_qlen - the length of the VPP message receive queue between
528 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
531 def disconnect(self):
532 """Detach from VPP."""
533 rv = self.transport.disconnect()
534 if self.event_thread is not None:
535 self.message_queue.put("terminate event thread")
538 def msg_handler_sync(self, msg):
539 """Process an incoming message from VPP in sync mode.
541 The message may be a reply or it may be an async notification.
543 r = self.decode_incoming_msg(msg)
547 # If we have a context, then use the context to find any
548 # request waiting for a reply
550 if hasattr(r, 'context') and r.context > 0:
554 # No context -> async notification that we feed to the callback
555 self.message_queue.put_nowait(r)
557 raise VPPIOError(2, 'RPC reply message received in event handler')
559 def has_context(self, msg):
563 header = VPPType('header_with_context', [['u16', 'msgid'],
564 ['u32', 'client_index'],
567 (i, ci, context), size = header.unpack(msg, 0)
568 if self.id_names[i] == 'rx_thread_exit':
572 # Decode message and returns a tuple.
574 msgobj = self.id_msgdef[i]
575 if 'context' in msgobj.field_by_name and context >= 0:
579 def decode_incoming_msg(self, msg, no_type_conversion=False):
581 self.logger.warning('vpp_api.read failed')
584 (i, ci), size = self.header.unpack(msg, 0)
585 if self.id_names[i] == 'rx_thread_exit':
589 # Decode message and returns a tuple.
591 msgobj = self.id_msgdef[i]
593 raise VPPIOError(2, 'Reply message undefined')
595 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
598 def msg_handler_async(self, msg):
599 """Process a message from VPP in async mode.
601 In async mode, all messages are returned to the callback.
603 r = self.decode_incoming_msg(msg)
607 msgname = type(r).__name__
609 if self.event_callback:
610 self.event_callback(msgname, r)
612 def _control_ping(self, context):
613 """Send a ping command."""
614 self._call_vpp_async(self.control_ping_index,
615 self.control_ping_msgdef,
618 def validate_args(self, msg, kwargs):
619 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
621 raise VPPValueError('Invalid argument {} to {}'
622 .format(list(d), msg.name))
624 def _add_stat(self, name, ms):
625 if not name in self.stats:
626 self.stats[name] = {'max': ms, 'count': 1, 'avg': ms}
628 if ms > self.stats[name]['max']:
629 self.stats[name]['max'] = ms
630 self.stats[name]['count'] += 1
631 n = self.stats[name]['count']
632 self.stats[name]['avg'] = self.stats[name]['avg'] * (n - 1) / n + ms / n
635 s = '\n=== API PAPI STATISTICS ===\n'
636 s += '{:<30} {:>4} {:>6} {:>6}\n'.format('message', 'cnt', 'avg', 'max')
637 for n in sorted(self.stats.items(), key=lambda v: v[1]['avg'], reverse=True):
638 s += '{:<30} {:>4} {:>6.2f} {:>6.2f}\n'.format(n[0], n[1]['count'],
639 n[1]['avg'], n[1]['max'])
642 def _call_vpp(self, i, msgdef, service, **kwargs):
643 """Given a message, send the message and await a reply.
645 msgdef - the message packing definition
646 i - the message type index
647 multipart - True if the message returns multiple
649 context - context number - chosen at random if not
651 The remainder of the kwargs are the arguments to the API call.
653 The return value is the message or message array containing
654 the response. It will raise an IOError exception if there was
655 no response within the timeout window.
658 if 'context' not in kwargs:
659 context = self.get_context()
660 kwargs['context'] = context
662 context = kwargs['context']
663 kwargs['_vl_msg_id'] = i
665 no_type_conversion = kwargs.pop('_no_type_conversion', False)
666 timeout = kwargs.pop('_timeout', None)
669 if self.transport.socket_index:
670 kwargs['client_index'] = self.transport.socket_index
671 except AttributeError:
673 self.validate_args(msgdef, kwargs)
675 s = 'Calling {}({})'.format(msgdef.name,
676 ','.join(['{!r}:{!r}'.format(k, v) for k, v in kwargs.items()]))
679 b = msgdef.pack(kwargs)
680 self.transport.suspend()
682 self.transport.write(b)
684 msgreply = service['reply']
685 stream = True if 'stream' in service else False
687 if 'stream_msg' in service:
688 # New service['reply'] = _reply and service['stream_message'] = _details
689 stream_message = service['stream_msg']
692 # Old service['reply'] = _details
693 stream_message = msgreply
694 msgreply = 'control_ping_reply'
696 # Send a ping after the request - we use its response
697 # to detect that we have seen all results.
698 self._control_ping(context)
700 # Block until we get a reply.
703 r = self.read_blocking(no_type_conversion, timeout)
705 raise VPPIOError(2, 'VPP API client: read failed')
706 msgname = type(r).__name__
707 if context not in r or r.context == 0 or context != r.context:
708 # Message being queued
709 self.message_queue.put_nowait(r)
711 if msgname != msgreply and (stream and (msgname != stream_message)):
712 print('REPLY MISMATCH', msgreply, msgname, stream_message, stream)
716 if msgname == msgreply:
717 if modern: # Return both reply and list
723 self.transport.resume()
725 s = 'Return value: {!r}'.format(r)
730 self._add_stat(msgdef.name, (te - ts) * 1000)
733 def _call_vpp_async(self, i, msg, **kwargs):
734 """Given a message, send the message and return the context.
736 msgdef - the message packing definition
737 i - the message type index
738 context - context number - chosen at random if not
740 The remainder of the kwargs are the arguments to the API call.
742 The reply message(s) will be delivered later to the registered callback.
743 The returned context will help with assigning which call
744 the reply belongs to.
746 if 'context' not in kwargs:
747 context = self.get_context()
748 kwargs['context'] = context
750 context = kwargs['context']
752 if self.transport.socket_index:
753 kwargs['client_index'] = self.transport.socket_index
754 except AttributeError:
755 kwargs['client_index'] = 0
756 kwargs['_vl_msg_id'] = i
759 self.transport.write(b)
762 def read_blocking(self, no_type_conversion=False, timeout=None):
763 """Get next received message from transport within timeout, decoded.
765 Note that notifications have context zero
766 and are not put into receive queue (at least for socket transport),
767 use async_thread with registered callback for processing them.
769 If no message appears in the queue within timeout, return None.
771 Optionally, type conversion can be skipped,
772 as some of conversions are into less precise types.
774 When r is the return value of this, the caller can get message name as:
775 msgname = type(r).__name__
776 and context number (type long) as:
779 :param no_type_conversion: If false, type conversions are applied.
780 :type no_type_conversion: bool
781 :returns: Decoded message, or None if no message (within timeout).
782 :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
783 :raises VppTransportShmemIOError if timed out.
785 msg = self.transport.read(timeout=timeout)
788 return self.decode_incoming_msg(msg, no_type_conversion)
790 def register_event_callback(self, callback):
791 """Register a callback for async messages.
793 This will be called for async notifications in sync mode,
794 and all messages in async mode. In sync mode, replies to
795 requests will not come here.
797 callback is a fn(msg_type_name, msg_type) that will be
798 called when a message comes in. While this function is
799 executing, note that (a) you are in a background thread and
800 may wish to use threading.Lock to protect your datastructures,
801 and (b) message processing from VPP will stop (so if you take
802 a long while about it you may provoke reply timeouts or cause
803 VPP to fill the RX buffer). Passing None will disable the
806 self.event_callback = callback
808 def thread_msg_handler(self):
809 """Python thread calling the user registered message handler.
811 This is to emulate the old style event callback scheme. Modern
812 clients should provide their own thread to poll the event
816 r = self.message_queue.get()
817 if r == "terminate event thread":
819 msgname = type(r).__name__
820 if self.event_callback:
821 self.event_callback(msgname, r)
823 def validate_message_table(self, namecrctable):
824 """Take a dictionary of name_crc message names
825 and returns an array of missing messages"""
828 for name_crc in namecrctable:
829 i = self.transport.get_msg_index(name_crc)
831 missing_table.append(name_crc)
834 def dump_message_table(self):
835 """Return VPPs API message table as name_crc dictionary"""
836 return self.transport.message_table
838 def dump_message_table_filtered(self, msglist):
839 """Return VPPs API message table as name_crc dictionary,
840 filtered by message name list."""
842 replies = [self.services[n]['reply'] for n in msglist]
843 message_table_filtered = {}
844 for name in msglist + replies:
845 for k,v in self.transport.message_table.items():
846 if k.startswith(name):
847 message_table_filtered[k] = v
849 return message_table_filtered
852 return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
853 "logger=%s, read_timeout=%s, use_socket=%s, " \
854 "server_address='%s'>" % (
855 self._apifiles, self.testmode, self.async_thread,
856 self.logger, self.read_timeout, self.use_socket,
859 def details_iter(self, f, **kwargs):
862 kwargs['cursor'] = cursor
863 rv, details = f(**kwargs)
865 # Convert to yield from details when we only support python 3
869 if rv.retval == 0 or rv.retval != -165:
873 # Provide the old name for backward compatibility.
876 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4