3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
22 import multiprocessing as mp
33 from . vpp_format import verify_enum_hint
34 from . vpp_serializer import VPPType, VPPEnumType, VPPEnumFlagType, VPPUnionType
35 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
39 except ModuleNotFoundError:
41 """placeholder for VppTransport as the implementation is dependent on
42 VPPAPIClient's initialization values
47 from . vpp_transport_socket import VppTransport
49 logger = logging.getLogger('vpp_papi')
50 logger.addHandler(logging.NullHandler())
52 __all__ = ('FuncWrapper', 'VppApiDynamicMethodHolder',
53 'VppEnum', 'VppEnumType', 'VppEnumFlag',
54 'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
58 def metaclass(metaclass):
59 @functools.wraps(metaclass)
61 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
66 class VppEnumType(type):
67 def __getattr__(cls, name):
68 t = vpp_get_type(name)
72 @metaclass(VppEnumType)
77 @metaclass(VppEnumType)
82 def vpp_atexit(vpp_weakref):
83 """Clean up VPP connection on shutdown."""
84 vpp_instance = vpp_weakref()
85 if vpp_instance and vpp_instance.transport.connected:
86 logger.debug('Cleaning up VPP on exit')
87 vpp_instance.disconnect()
90 def add_convenience_methods():
91 # provide convenience methods to IP[46]Address.vapi_af
93 if 6 == self._version:
94 return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
95 if 4 == self._version:
96 return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
97 raise ValueError("Invalid _version.")
99 def _vapi_af_name(self):
100 if 6 == self._version:
102 if 4 == self._version:
104 raise ValueError("Invalid _version.")
106 ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
107 ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
110 class VppApiDynamicMethodHolder:
115 def __init__(self, func):
117 self.__name__ = func.__name__
118 self.__doc__ = func.__doc__
120 def __call__(self, **kwargs):
121 return self._func(**kwargs)
124 return '<FuncWrapper(func=<%s(%s)>)>' % (self.__name__, self.__doc__)
127 class VPPApiError(Exception):
131 class VPPNotImplementedError(NotImplementedError):
135 class VPPIOError(IOError):
139 class VPPRuntimeError(RuntimeError):
143 class VPPValueError(ValueError):
147 class VPPApiJSONFiles:
149 def find_api_dir(cls, dirs):
150 """Attempt to find the best directory in which API definition
151 files may reside. If the value VPP_API_DIR exists in the environment
152 then it is first on the search list. If we're inside a recognized
153 location in a VPP source tree (src/scripts and src/vpp-api/python)
154 then entries from there to the likely locations in build-root are
155 added. Finally the location used by system packages is added.
157 :returns: A single directory name, or None if no such directory
161 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
162 # in which case, plot a course to likely places in the src tree
163 import __main__ as main
164 if hasattr(main, '__file__'):
165 # get the path of the calling script
166 localdir = os.path.dirname(os.path.realpath(main.__file__))
168 # use cwd if there is no calling script
169 localdir = os.getcwd()
170 localdir_s = localdir.split(os.path.sep)
173 """Match dir against right-hand components of the script dir"""
174 d = dir.split('/') # param 'dir' assumes a / separator
176 return len(localdir_s) > length and localdir_s[-length:] == d
178 def sdir(srcdir, variant):
179 """Build a path from srcdir to the staged API files of
180 'variant' (typically '' or '_debug')"""
181 # Since 'core' and 'plugin' files are staged
182 # in separate directories, we target the parent dir.
183 return os.path.sep.join((
186 'install-vpp%s-native' % variant,
194 if dmatch('src/scripts'):
195 srcdir = os.path.sep.join(localdir_s[:-2])
196 elif dmatch('src/vpp-api/python'):
197 srcdir = os.path.sep.join(localdir_s[:-3])
199 # we're apparently running tests
200 srcdir = os.path.sep.join(localdir_s[:-1])
203 # we're in the source tree, try both the debug and release
205 dirs.append(sdir(srcdir, '_debug'))
206 dirs.append(sdir(srcdir, ''))
208 # Test for staged copies of the scripts
209 # For these, since we explicitly know if we're running a debug versus
210 # release variant, target only the relevant directory
211 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
212 srcdir = os.path.sep.join(localdir_s[:-4])
213 dirs.append(sdir(srcdir, '_debug'))
214 if dmatch('build-root/install-vpp-native/vpp/bin'):
215 srcdir = os.path.sep.join(localdir_s[:-4])
216 dirs.append(sdir(srcdir, ''))
218 # finally, try the location system packages typically install into
219 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
221 # check the directories for existence; first one wins
223 if os.path.isdir(dir):
229 def find_api_files(cls, api_dir=None, patterns='*'): # -> list
230 """Find API definition files from the given directory tree with the
231 given pattern. If no directory is given then find_api_dir() is used
232 to locate one. If no pattern is given then all definition files found
233 in the directory tree are used.
235 :param api_dir: A directory tree in which to locate API definition
236 files; subdirectories are descended into.
237 If this is None then find_api_dir() is called to discover it.
238 :param patterns: A list of patterns to use in each visited directory
239 when looking for files.
240 This can be a list/tuple object or a comma-separated string of
241 patterns. Each value in the list will have leading/trialing
243 The pattern specifies the first part of the filename, '.api.json'
245 The results are de-duplicated, thus overlapping patterns are fine.
246 If this is None it defaults to '*' meaning "all API files".
247 :returns: A list of file paths for the API files found.
250 api_dir = cls.find_api_dir([])
252 raise VPPApiError("api_dir cannot be located")
254 if isinstance(patterns, list) or isinstance(patterns, tuple):
255 patterns = [p.strip() + '.api.json' for p in patterns]
257 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
260 for root, dirnames, files in os.walk(api_dir):
261 # iterate all given patterns and de-dup the result
262 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
263 for filename in files:
264 api_files.append(os.path.join(root, filename))
269 def process_json_file(self, apidef_file):
270 api = json.load(apidef_file)
271 return self._process_json(api)
274 def process_json_str(self, json_str):
275 api = json.loads(json_str)
276 return self._process_json(api)
279 def _process_json(api): # -> Tuple[Dict, Dict]
284 for t in api['enums']:
285 t[0] = 'vl_api_' + t[0] + '_t'
286 types[t[0]] = {'type': 'enum', 'data': t}
290 for t in api['enumflags']:
291 t[0] = 'vl_api_' + t[0] + '_t'
292 types[t[0]] = {'type': 'enum', 'data': t}
296 for t in api['unions']:
297 t[0] = 'vl_api_' + t[0] + '_t'
298 types[t[0]] = {'type': 'union', 'data': t}
303 for t in api['types']:
304 t[0] = 'vl_api_' + t[0] + '_t'
305 types[t[0]] = {'type': 'type', 'data': t}
310 for t, v in api['aliases'].items():
311 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
316 services.update(api['services'])
323 for k, v in types.items():
325 if not vpp_get_type(k):
326 if v['type'] == 'enum':
328 VPPEnumType(t[0], t[1:])
331 if not vpp_get_type(k):
332 if v['type'] == 'enumflag':
334 VPPEnumFlagType(t[0], t[1:])
337 elif v['type'] == 'union':
339 VPPUnionType(t[0], t[1:])
342 elif v['type'] == 'type':
347 elif v['type'] == 'alias':
352 if len(unresolved) == 0:
355 raise VPPValueError('Unresolved type definitions {}'
360 for m in api['messages']:
362 messages[m[0]] = VPPMessage(m[0], m[1:])
363 except VPPNotImplementedError:
365 logger.error('Not implemented error for {}'.format(m[0]))
368 return messages, services
374 This class provides the APIs to VPP. The APIs are loaded
375 from provided .api.json files and makes functions accordingly.
376 These functions are documented in the VPP .api files, as they
377 are dynamically created.
379 Additionally, VPP can send callback messages; this class
380 provides a means to register a callback function to receive
381 these messages in a background thread.
384 VPPApiError = VPPApiError
385 VPPRuntimeError = VPPRuntimeError
386 VPPValueError = VPPValueError
387 VPPNotImplementedError = VPPNotImplementedError
388 VPPIOError = VPPIOError
391 def __init__(self, *, apifiles=None, testmode=False, async_thread=True,
392 logger=None, loglevel=None,
393 read_timeout=5, use_socket=True,
394 server_address='/run/vpp/api.sock'):
395 """Create a VPP API object.
397 apifiles is a list of files containing API
398 descriptions that will be loaded - methods will be
399 dynamically created reflecting these APIs. If not
400 provided this will load the API files from VPP's
401 default install location.
403 logger, if supplied, is the logging logger object to log to.
404 loglevel, if supplied, is the log level this logger is set
405 to report at (from the loglevels in the logging module).
408 logger = logging.getLogger(
409 "{}.{}".format(__name__, self.__class__.__name__))
410 if loglevel is not None:
411 logger.setLevel(loglevel)
418 self.header = VPPType('header', [['u16', 'msgid'],
419 ['u32', 'client_index']])
421 self.event_callback = None
422 self.message_queue = queue.Queue()
423 self.read_timeout = read_timeout
424 self.async_thread = async_thread
425 self.event_thread = None
426 self.testmode = testmode
427 self.server_address = server_address
428 self._apifiles = apifiles
432 # Pick up API definitions from default directory
434 apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
435 except (RuntimeError, VPPApiError):
436 # In test mode we don't care that we can't find the API files
440 raise VPPRuntimeError
442 for file in apifiles:
443 with open(file) as apidef_file:
444 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
445 self.messages.update(m)
446 self.services.update(s)
448 self.apifiles = apifiles
451 if len(self.messages) == 0 and not testmode:
452 raise VPPValueError(1, 'Missing JSON message definitions')
453 if not(verify_enum_hint(VppEnum.vl_api_address_family_t)):
454 raise VPPRuntimeError("Invalid address family hints. "
457 self.transport = VppTransport(self, read_timeout=read_timeout,
458 server_address=server_address)
459 # Make sure we allow VPP to clean up the message rings.
460 atexit.register(vpp_atexit, weakref.ref(self))
462 add_convenience_methods()
464 def get_function(self, name):
465 return getattr(self._api, name)
468 """Multiprocessing-safe provider of unique context IDs."""
470 self.context = mp.Value(ctypes.c_uint, 0)
471 self.lock = mp.Lock()
474 """Get a new unique (or, at least, not recently used) context."""
476 self.context.value += 1
477 return self.context.value
478 get_context = ContextId()
480 def get_type(self, name):
481 return vpp_get_type(name)
485 if not hasattr(self, "_api"):
486 raise VPPApiError("Not connected, api definitions not available")
489 def make_function(self, msg, i, multipart, do_async):
492 return self._call_vpp_async(i, msg, **kwargs)
495 return self._call_vpp(i, msg, multipart, **kwargs)
497 f.__name__ = str(msg.name)
498 f.__doc__ = ", ".join(["%s %s" %
499 (msg.fieldtypes[j], k)
500 for j, k in enumerate(msg.fields)])
505 def _register_functions(self, do_async=False):
506 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
507 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
508 self._api = VppApiDynamicMethodHolder()
509 for name, msg in self.messages.items():
510 n = name + '_' + msg.crc[2:]
511 i = self.transport.get_msg_index(n)
513 self.id_msgdef[i] = msg
514 self.id_names[i] = name
516 # Create function for client side messages.
517 if name in self.services:
518 f = self.make_function(msg, i, self.services[name], do_async)
519 setattr(self._api, name, FuncWrapper(f))
522 'No such message type or failed CRC checksum: %s', n)
524 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
526 pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
528 rv = self.transport.connect(name, pfx,
529 msg_handler, rx_qlen)
531 raise VPPIOError(2, 'Connect failed')
532 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
533 self._register_functions(do_async=do_async)
535 # Initialise control ping
536 crc = self.messages['control_ping'].crc
537 self.control_ping_index = self.transport.get_msg_index(
538 ('control_ping' + '_' + crc[2:]))
539 self.control_ping_msgdef = self.messages['control_ping']
540 if self.async_thread:
541 self.event_thread = threading.Thread(
542 target=self.thread_msg_handler)
543 self.event_thread.daemon = True
544 self.event_thread.start()
546 self.event_thread = None
549 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
552 name - the name of the client.
553 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
554 do_async - if true, messages are sent without waiting for a reply
555 rx_qlen - the length of the VPP message receive queue between
558 msg_handler = self.transport.get_callback(do_async)
559 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
562 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
563 """Attach to VPP in synchronous mode. Application must poll for events.
565 name - the name of the client.
566 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
567 rx_qlen - the length of the VPP message receive queue between
571 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
574 def disconnect(self):
575 """Detach from VPP."""
576 rv = self.transport.disconnect()
577 if self.event_thread is not None:
578 self.message_queue.put("terminate event thread")
581 def msg_handler_sync(self, msg):
582 """Process an incoming message from VPP in sync mode.
584 The message may be a reply or it may be an async notification.
586 r = self.decode_incoming_msg(msg)
590 # If we have a context, then use the context to find any
591 # request waiting for a reply
593 if hasattr(r, 'context') and r.context > 0:
597 # No context -> async notification that we feed to the callback
598 self.message_queue.put_nowait(r)
600 raise VPPIOError(2, 'RPC reply message received in event handler')
602 def has_context(self, msg):
606 header = VPPType('header_with_context', [['u16', 'msgid'],
607 ['u32', 'client_index'],
610 (i, ci, context), size = header.unpack(msg, 0)
611 if self.id_names[i] == 'rx_thread_exit':
615 # Decode message and returns a tuple.
617 msgobj = self.id_msgdef[i]
618 if 'context' in msgobj.field_by_name and context >= 0:
622 def decode_incoming_msg(self, msg, no_type_conversion=False):
624 logger.warning('vpp_api.read failed')
627 (i, ci), size = self.header.unpack(msg, 0)
628 if self.id_names[i] == 'rx_thread_exit':
632 # Decode message and returns a tuple.
634 msgobj = self.id_msgdef[i]
636 raise VPPIOError(2, 'Reply message undefined')
638 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
641 def msg_handler_async(self, msg):
642 """Process a message from VPP in async mode.
644 In async mode, all messages are returned to the callback.
646 r = self.decode_incoming_msg(msg)
650 msgname = type(r).__name__
652 if self.event_callback:
653 self.event_callback(msgname, r)
655 def _control_ping(self, context):
656 """Send a ping command."""
657 self._call_vpp_async(self.control_ping_index,
658 self.control_ping_msgdef,
661 def validate_args(self, msg, kwargs):
662 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
664 raise VPPValueError('Invalid argument {} to {}'
665 .format(list(d), msg.name))
667 def _add_stat(self, name, ms):
668 if not name in self.stats:
669 self.stats[name] = {'max': ms, 'count': 1, 'avg': ms}
671 if ms > self.stats[name]['max']:
672 self.stats[name]['max'] = ms
673 self.stats[name]['count'] += 1
674 n = self.stats[name]['count']
675 self.stats[name]['avg'] = self.stats[name]['avg'] * (n - 1) / n + ms / n
678 s = '\n=== API PAPI STATISTICS ===\n'
679 s += '{:<30} {:>4} {:>6} {:>6}\n'.format('message', 'cnt', 'avg', 'max')
680 for n in sorted(self.stats.items(), key=lambda v: v[1]['avg'], reverse=True):
681 s += '{:<30} {:>4} {:>6.2f} {:>6.2f}\n'.format(n[0], n[1]['count'],
682 n[1]['avg'], n[1]['max'])
685 def get_field_options(self, msg, fld_name):
686 # when there is an option, the msgdef has 3 elements.
687 # ['u32', 'ring_size', {'default': 1024}]
688 for _def in self.messages[msg].msgdef:
689 if isinstance(_def, list) and \
694 def _call_vpp(self, i, msgdef, service, **kwargs):
695 """Given a message, send the message and await a reply.
697 msgdef - the message packing definition
698 i - the message type index
699 multipart - True if the message returns multiple
701 context - context number - chosen at random if not
703 The remainder of the kwargs are the arguments to the API call.
705 The return value is the message or message array containing
706 the response. It will raise an IOError exception if there was
707 no response within the timeout window.
710 if 'context' not in kwargs:
711 context = self.get_context()
712 kwargs['context'] = context
714 context = kwargs['context']
715 kwargs['_vl_msg_id'] = i
717 no_type_conversion = kwargs.pop('_no_type_conversion', False)
718 timeout = kwargs.pop('_timeout', None)
721 if self.transport.socket_index:
722 kwargs['client_index'] = self.transport.socket_index
723 except AttributeError:
725 self.validate_args(msgdef, kwargs)
727 s = 'Calling {}({})'.format(msgdef.name,
728 ','.join(['{!r}:{!r}'.format(k, v) for k, v in kwargs.items()]))
731 b = msgdef.pack(kwargs)
732 self.transport.suspend()
734 self.transport.write(b)
736 msgreply = service['reply']
737 stream = True if 'stream' in service else False
739 if 'stream_msg' in service:
740 # New service['reply'] = _reply and service['stream_message'] = _details
741 stream_message = service['stream_msg']
744 # Old service['reply'] = _details
745 stream_message = msgreply
746 msgreply = 'control_ping_reply'
748 # Send a ping after the request - we use its response
749 # to detect that we have seen all results.
750 self._control_ping(context)
752 # Block until we get a reply.
755 r = self.read_blocking(no_type_conversion, timeout)
757 raise VPPIOError(2, 'VPP API client: read failed')
758 msgname = type(r).__name__
759 if context not in r or r.context == 0 or context != r.context:
760 # Message being queued
761 self.message_queue.put_nowait(r)
763 if msgname != msgreply and (stream and (msgname != stream_message)):
764 print('REPLY MISMATCH', msgreply, msgname, stream_message, stream)
768 if msgname == msgreply:
769 if modern: # Return both reply and list
775 self.transport.resume()
777 s = 'Return value: {!r}'.format(r)
782 self._add_stat(msgdef.name, (te - ts) * 1000)
785 def _call_vpp_async(self, i, msg, **kwargs):
786 """Given a message, send the message and return the context.
788 msgdef - the message packing definition
789 i - the message type index
790 context - context number - chosen at random if not
792 The remainder of the kwargs are the arguments to the API call.
794 The reply message(s) will be delivered later to the registered callback.
795 The returned context will help with assigning which call
796 the reply belongs to.
798 if 'context' not in kwargs:
799 context = self.get_context()
800 kwargs['context'] = context
802 context = kwargs['context']
804 if self.transport.socket_index:
805 kwargs['client_index'] = self.transport.socket_index
806 except AttributeError:
807 kwargs['client_index'] = 0
808 kwargs['_vl_msg_id'] = i
811 self.transport.write(b)
814 def read_blocking(self, no_type_conversion=False, timeout=None):
815 """Get next received message from transport within timeout, decoded.
817 Note that notifications have context zero
818 and are not put into receive queue (at least for socket transport),
819 use async_thread with registered callback for processing them.
821 If no message appears in the queue within timeout, return None.
823 Optionally, type conversion can be skipped,
824 as some of conversions are into less precise types.
826 When r is the return value of this, the caller can get message name as:
827 msgname = type(r).__name__
828 and context number (type long) as:
831 :param no_type_conversion: If false, type conversions are applied.
832 :type no_type_conversion: bool
833 :returns: Decoded message, or None if no message (within timeout).
834 :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
835 :raises VppTransportShmemIOError if timed out.
837 msg = self.transport.read(timeout=timeout)
840 return self.decode_incoming_msg(msg, no_type_conversion)
842 def register_event_callback(self, callback):
843 """Register a callback for async messages.
845 This will be called for async notifications in sync mode,
846 and all messages in async mode. In sync mode, replies to
847 requests will not come here.
849 callback is a fn(msg_type_name, msg_type) that will be
850 called when a message comes in. While this function is
851 executing, note that (a) you are in a background thread and
852 may wish to use threading.Lock to protect your datastructures,
853 and (b) message processing from VPP will stop (so if you take
854 a long while about it you may provoke reply timeouts or cause
855 VPP to fill the RX buffer). Passing None will disable the
858 self.event_callback = callback
860 def thread_msg_handler(self):
861 """Python thread calling the user registered message handler.
863 This is to emulate the old style event callback scheme. Modern
864 clients should provide their own thread to poll the event
868 r = self.message_queue.get()
869 if r == "terminate event thread":
871 msgname = type(r).__name__
872 if self.event_callback:
873 self.event_callback(msgname, r)
875 def validate_message_table(self, namecrctable):
876 """Take a dictionary of name_crc message names
877 and returns an array of missing messages"""
880 for name_crc in namecrctable:
881 i = self.transport.get_msg_index(name_crc)
883 missing_table.append(name_crc)
886 def dump_message_table(self):
887 """Return VPPs API message table as name_crc dictionary"""
888 return self.transport.message_table
890 def dump_message_table_filtered(self, msglist):
891 """Return VPPs API message table as name_crc dictionary,
892 filtered by message name list."""
894 replies = [self.services[n]['reply'] for n in msglist]
895 message_table_filtered = {}
896 for name in msglist + replies:
897 for k,v in self.transport.message_table.items():
898 if k.startswith(name):
899 message_table_filtered[k] = v
901 return message_table_filtered
904 return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
905 "logger=%s, read_timeout=%s, " \
906 "server_address='%s'>" % (
907 self._apifiles, self.testmode, self.async_thread,
908 self.logger, self.read_timeout, self.server_address)
910 def details_iter(self, f, **kwargs):
913 kwargs['cursor'] = cursor
914 rv, details = f(**kwargs)
917 if rv.retval == 0 or rv.retval != -165: