3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
22 import multiprocessing as mp
33 from . vpp_format import verify_enum_hint
34 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType
35 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
39 except ModuleNotFoundError:
41 """placeholder for VppTransport as the implementation is dependent on
42 VPPAPIClient's initialization values
47 logger = logging.getLogger('vpp_papi')
48 logger.addHandler(logging.NullHandler())
50 __all__ = ('FuncWrapper', 'VPP', 'VppApiDynamicMethodHolder',
51 'VppEnum', 'VppEnumType',
52 'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
56 def metaclass(metaclass):
57 @functools.wraps(metaclass)
59 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
64 class VppEnumType(type):
65 def __getattr__(cls, name):
66 t = vpp_get_type(name)
70 @metaclass(VppEnumType)
75 def vpp_atexit(vpp_weakref):
76 """Clean up VPP connection on shutdown."""
77 vpp_instance = vpp_weakref()
78 if vpp_instance and vpp_instance.transport.connected:
79 logger.debug('Cleaning up VPP on exit')
80 vpp_instance.disconnect()
85 def add_convenience_methods():
86 # provide convenience methods to IP[46]Address.vapi_af
88 if 6 == self._version:
89 return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
90 if 4 == self._version:
91 return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
92 raise ValueError("Invalid _version.")
94 def _vapi_af_name(self):
95 if 6 == self._version:
97 if 4 == self._version:
99 raise ValueError("Invalid _version.")
101 ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
102 ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
105 class VppApiDynamicMethodHolder:
110 def __init__(self, func):
112 self.__name__ = func.__name__
113 self.__doc__ = func.__doc__
115 def __call__(self, **kwargs):
116 return self._func(**kwargs)
119 return '<FuncWrapper(func=<%s(%s)>)>' % (self.__name__, self.__doc__)
122 class VPPApiError(Exception):
126 class VPPNotImplementedError(NotImplementedError):
130 class VPPIOError(IOError):
134 class VPPRuntimeError(RuntimeError):
138 class VPPValueError(ValueError):
142 class VPPApiJSONFiles:
144 def find_api_dir(cls, dirs):
145 """Attempt to find the best directory in which API definition
146 files may reside. If the value VPP_API_DIR exists in the environment
147 then it is first on the search list. If we're inside a recognized
148 location in a VPP source tree (src/scripts and src/vpp-api/python)
149 then entries from there to the likely locations in build-root are
150 added. Finally the location used by system packages is added.
152 :returns: A single directory name, or None if no such directory
156 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
157 # in which case, plot a course to likely places in the src tree
158 import __main__ as main
159 if hasattr(main, '__file__'):
160 # get the path of the calling script
161 localdir = os.path.dirname(os.path.realpath(main.__file__))
163 # use cwd if there is no calling script
164 localdir = os.getcwd()
165 localdir_s = localdir.split(os.path.sep)
168 """Match dir against right-hand components of the script dir"""
169 d = dir.split('/') # param 'dir' assumes a / separator
171 return len(localdir_s) > length and localdir_s[-length:] == d
173 def sdir(srcdir, variant):
174 """Build a path from srcdir to the staged API files of
175 'variant' (typically '' or '_debug')"""
176 # Since 'core' and 'plugin' files are staged
177 # in separate directories, we target the parent dir.
178 return os.path.sep.join((
181 'install-vpp%s-native' % variant,
189 if dmatch('src/scripts'):
190 srcdir = os.path.sep.join(localdir_s[:-2])
191 elif dmatch('src/vpp-api/python'):
192 srcdir = os.path.sep.join(localdir_s[:-3])
194 # we're apparently running tests
195 srcdir = os.path.sep.join(localdir_s[:-1])
198 # we're in the source tree, try both the debug and release
200 dirs.append(sdir(srcdir, '_debug'))
201 dirs.append(sdir(srcdir, ''))
203 # Test for staged copies of the scripts
204 # For these, since we explicitly know if we're running a debug versus
205 # release variant, target only the relevant directory
206 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
207 srcdir = os.path.sep.join(localdir_s[:-4])
208 dirs.append(sdir(srcdir, '_debug'))
209 if dmatch('build-root/install-vpp-native/vpp/bin'):
210 srcdir = os.path.sep.join(localdir_s[:-4])
211 dirs.append(sdir(srcdir, ''))
213 # finally, try the location system packages typically install into
214 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
216 # check the directories for existence; first one wins
218 if os.path.isdir(dir):
224 def find_api_files(cls, api_dir=None, patterns='*'): # -> list
225 """Find API definition files from the given directory tree with the
226 given pattern. If no directory is given then find_api_dir() is used
227 to locate one. If no pattern is given then all definition files found
228 in the directory tree are used.
230 :param api_dir: A directory tree in which to locate API definition
231 files; subdirectories are descended into.
232 If this is None then find_api_dir() is called to discover it.
233 :param patterns: A list of patterns to use in each visited directory
234 when looking for files.
235 This can be a list/tuple object or a comma-separated string of
236 patterns. Each value in the list will have leading/trialing
238 The pattern specifies the first part of the filename, '.api.json'
240 The results are de-duplicated, thus overlapping patterns are fine.
241 If this is None it defaults to '*' meaning "all API files".
242 :returns: A list of file paths for the API files found.
245 api_dir = cls.find_api_dir([])
247 raise VPPApiError("api_dir cannot be located")
249 if isinstance(patterns, list) or isinstance(patterns, tuple):
250 patterns = [p.strip() + '.api.json' for p in patterns]
252 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
255 for root, dirnames, files in os.walk(api_dir):
256 # iterate all given patterns and de-dup the result
257 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
258 for filename in files:
259 api_files.append(os.path.join(root, filename))
264 def process_json_file(self, apidef_file):
265 api = json.load(apidef_file)
266 return self._process_json(api)
269 def process_json_str(self, json_str):
270 api = json.loads(json_str)
271 return self._process_json(api)
274 def _process_json(api): # -> Tuple[Dict, Dict]
279 for t in api['enums']:
280 t[0] = 'vl_api_' + t[0] + '_t'
281 types[t[0]] = {'type': 'enum', 'data': t}
286 for t in api['unions']:
287 t[0] = 'vl_api_' + t[0] + '_t'
288 types[t[0]] = {'type': 'union', 'data': t}
293 for t in api['types']:
294 t[0] = 'vl_api_' + t[0] + '_t'
295 types[t[0]] = {'type': 'type', 'data': t}
300 for t, v in api['aliases'].items():
301 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
306 services.update(api['services'])
313 for k, v in types.items():
315 if not vpp_get_type(k):
316 if v['type'] == 'enum':
318 VPPEnumType(t[0], t[1:])
321 elif v['type'] == 'union':
323 VPPUnionType(t[0], t[1:])
326 elif v['type'] == 'type':
331 elif v['type'] == 'alias':
336 if len(unresolved) == 0:
339 raise VPPValueError('Unresolved type definitions {}'
344 for m in api['messages']:
346 messages[m[0]] = VPPMessage(m[0], m[1:])
347 except VPPNotImplementedError:
349 logger.error('Not implemented error for {}'.format(m[0]))
352 return messages, services
358 This class provides the APIs to VPP. The APIs are loaded
359 from provided .api.json files and makes functions accordingly.
360 These functions are documented in the VPP .api files, as they
361 are dynamically created.
363 Additionally, VPP can send callback messages; this class
364 provides a means to register a callback function to receive
365 these messages in a background thread.
368 VPPApiError = VPPApiError
369 VPPRuntimeError = VPPRuntimeError
370 VPPValueError = VPPValueError
371 VPPNotImplementedError = VPPNotImplementedError
372 VPPIOError = VPPIOError
375 def __init__(self, apifiles=None, testmode=False, async_thread=True,
376 logger=None, loglevel=None,
377 read_timeout=5, use_socket=False,
378 server_address='/run/vpp/api.sock'):
379 """Create a VPP API object.
381 apifiles is a list of files containing API
382 descriptions that will be loaded - methods will be
383 dynamically created reflecting these APIs. If not
384 provided this will load the API files from VPP's
385 default install location.
387 logger, if supplied, is the logging logger object to log to.
388 loglevel, if supplied, is the log level this logger is set
389 to report at (from the loglevels in the logging module).
392 logger = logging.getLogger(
393 "{}.{}".format(__name__, self.__class__.__name__))
394 if loglevel is not None:
395 logger.setLevel(loglevel)
402 self.header = VPPType('header', [['u16', 'msgid'],
403 ['u32', 'client_index']])
405 self.event_callback = None
406 self.message_queue = queue.Queue()
407 self.read_timeout = read_timeout
408 self.async_thread = async_thread
409 self.event_thread = None
410 self.testmode = testmode
411 self.use_socket = use_socket
412 self.server_address = server_address
413 self._apifiles = apifiles
417 from . vpp_transport_socket import VppTransport
419 from . vpp_transport_shmem import VppTransport
422 # Pick up API definitions from default directory
424 apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
425 except (RuntimeError, VPPApiError):
426 # In test mode we don't care that we can't find the API files
430 raise VPPRuntimeError
432 for file in apifiles:
433 with open(file) as apidef_file:
434 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
435 self.messages.update(m)
436 self.services.update(s)
438 self.apifiles = apifiles
441 if len(self.messages) == 0 and not testmode:
442 raise VPPValueError(1, 'Missing JSON message definitions')
443 if not(verify_enum_hint(VppEnum.vl_api_address_family_t)):
444 raise VPPRuntimeError("Invalid address family hints. "
447 self.transport = VppTransport(self, read_timeout=read_timeout,
448 server_address=server_address)
449 # Make sure we allow VPP to clean up the message rings.
450 atexit.register(vpp_atexit, weakref.ref(self))
452 add_convenience_methods()
454 def get_function(self, name):
455 return getattr(self._api, name)
458 """Multiprocessing-safe provider of unique context IDs."""
460 self.context = mp.Value(ctypes.c_uint, 0)
461 self.lock = mp.Lock()
464 """Get a new unique (or, at least, not recently used) context."""
466 self.context.value += 1
467 return self.context.value
468 get_context = ContextId()
470 def get_type(self, name):
471 return vpp_get_type(name)
475 if not hasattr(self, "_api"):
476 raise VPPApiError("Not connected, api definitions not available")
479 def make_function(self, msg, i, multipart, do_async):
482 return self._call_vpp_async(i, msg, **kwargs)
485 return self._call_vpp(i, msg, multipart, **kwargs)
487 f.__name__ = str(msg.name)
488 f.__doc__ = ", ".join(["%s %s" %
489 (msg.fieldtypes[j], k)
490 for j, k in enumerate(msg.fields)])
495 def _register_functions(self, do_async=False):
496 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
497 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
498 self._api = VppApiDynamicMethodHolder()
499 for name, msg in self.messages.items():
500 n = name + '_' + msg.crc[2:]
501 i = self.transport.get_msg_index(n)
503 self.id_msgdef[i] = msg
504 self.id_names[i] = name
506 # Create function for client side messages.
507 if name in self.services:
508 f = self.make_function(msg, i, self.services[name], do_async)
509 setattr(self._api, name, FuncWrapper(f))
512 'No such message type or failed CRC checksum: %s', n)
514 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
516 pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
518 rv = self.transport.connect(name, pfx,
519 msg_handler, rx_qlen)
521 raise VPPIOError(2, 'Connect failed')
522 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
523 self._register_functions(do_async=do_async)
525 # Initialise control ping
526 crc = self.messages['control_ping'].crc
527 self.control_ping_index = self.transport.get_msg_index(
528 ('control_ping' + '_' + crc[2:]))
529 self.control_ping_msgdef = self.messages['control_ping']
530 if self.async_thread:
531 self.event_thread = threading.Thread(
532 target=self.thread_msg_handler)
533 self.event_thread.daemon = True
534 self.event_thread.start()
536 self.event_thread = None
539 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
542 name - the name of the client.
543 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
544 do_async - if true, messages are sent without waiting for a reply
545 rx_qlen - the length of the VPP message receive queue between
548 msg_handler = self.transport.get_callback(do_async)
549 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
552 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
553 """Attach to VPP in synchronous mode. Application must poll for events.
555 name - the name of the client.
556 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
557 rx_qlen - the length of the VPP message receive queue between
561 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
564 def disconnect(self):
565 """Detach from VPP."""
566 rv = self.transport.disconnect()
567 if self.event_thread is not None:
568 self.message_queue.put("terminate event thread")
571 def msg_handler_sync(self, msg):
572 """Process an incoming message from VPP in sync mode.
574 The message may be a reply or it may be an async notification.
576 r = self.decode_incoming_msg(msg)
580 # If we have a context, then use the context to find any
581 # request waiting for a reply
583 if hasattr(r, 'context') and r.context > 0:
587 # No context -> async notification that we feed to the callback
588 self.message_queue.put_nowait(r)
590 raise VPPIOError(2, 'RPC reply message received in event handler')
592 def has_context(self, msg):
596 header = VPPType('header_with_context', [['u16', 'msgid'],
597 ['u32', 'client_index'],
600 (i, ci, context), size = header.unpack(msg, 0)
601 if self.id_names[i] == 'rx_thread_exit':
605 # Decode message and returns a tuple.
607 msgobj = self.id_msgdef[i]
608 if 'context' in msgobj.field_by_name and context >= 0:
612 def decode_incoming_msg(self, msg, no_type_conversion=False):
614 logger.warning('vpp_api.read failed')
617 (i, ci), size = self.header.unpack(msg, 0)
618 if self.id_names[i] == 'rx_thread_exit':
622 # Decode message and returns a tuple.
624 msgobj = self.id_msgdef[i]
626 raise VPPIOError(2, 'Reply message undefined')
628 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
631 def msg_handler_async(self, msg):
632 """Process a message from VPP in async mode.
634 In async mode, all messages are returned to the callback.
636 r = self.decode_incoming_msg(msg)
640 msgname = type(r).__name__
642 if self.event_callback:
643 self.event_callback(msgname, r)
645 def _control_ping(self, context):
646 """Send a ping command."""
647 self._call_vpp_async(self.control_ping_index,
648 self.control_ping_msgdef,
651 def validate_args(self, msg, kwargs):
652 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
654 raise VPPValueError('Invalid argument {} to {}'
655 .format(list(d), msg.name))
657 def _add_stat(self, name, ms):
658 if not name in self.stats:
659 self.stats[name] = {'max': ms, 'count': 1, 'avg': ms}
661 if ms > self.stats[name]['max']:
662 self.stats[name]['max'] = ms
663 self.stats[name]['count'] += 1
664 n = self.stats[name]['count']
665 self.stats[name]['avg'] = self.stats[name]['avg'] * (n - 1) / n + ms / n
668 s = '\n=== API PAPI STATISTICS ===\n'
669 s += '{:<30} {:>4} {:>6} {:>6}\n'.format('message', 'cnt', 'avg', 'max')
670 for n in sorted(self.stats.items(), key=lambda v: v[1]['avg'], reverse=True):
671 s += '{:<30} {:>4} {:>6.2f} {:>6.2f}\n'.format(n[0], n[1]['count'],
672 n[1]['avg'], n[1]['max'])
675 def _call_vpp(self, i, msgdef, service, **kwargs):
676 """Given a message, send the message and await a reply.
678 msgdef - the message packing definition
679 i - the message type index
680 multipart - True if the message returns multiple
682 context - context number - chosen at random if not
684 The remainder of the kwargs are the arguments to the API call.
686 The return value is the message or message array containing
687 the response. It will raise an IOError exception if there was
688 no response within the timeout window.
691 if 'context' not in kwargs:
692 context = self.get_context()
693 kwargs['context'] = context
695 context = kwargs['context']
696 kwargs['_vl_msg_id'] = i
698 no_type_conversion = kwargs.pop('_no_type_conversion', False)
699 timeout = kwargs.pop('_timeout', None)
702 if self.transport.socket_index:
703 kwargs['client_index'] = self.transport.socket_index
704 except AttributeError:
706 self.validate_args(msgdef, kwargs)
708 s = 'Calling {}({})'.format(msgdef.name,
709 ','.join(['{!r}:{!r}'.format(k, v) for k, v in kwargs.items()]))
712 b = msgdef.pack(kwargs)
713 self.transport.suspend()
715 self.transport.write(b)
717 msgreply = service['reply']
718 stream = True if 'stream' in service else False
720 if 'stream_msg' in service:
721 # New service['reply'] = _reply and service['stream_message'] = _details
722 stream_message = service['stream_msg']
725 # Old service['reply'] = _details
726 stream_message = msgreply
727 msgreply = 'control_ping_reply'
729 # Send a ping after the request - we use its response
730 # to detect that we have seen all results.
731 self._control_ping(context)
733 # Block until we get a reply.
736 r = self.read_blocking(no_type_conversion, timeout)
738 raise VPPIOError(2, 'VPP API client: read failed')
739 msgname = type(r).__name__
740 if context not in r or r.context == 0 or context != r.context:
741 # Message being queued
742 self.message_queue.put_nowait(r)
744 if msgname != msgreply and (stream and (msgname != stream_message)):
745 print('REPLY MISMATCH', msgreply, msgname, stream_message, stream)
749 if msgname == msgreply:
750 if modern: # Return both reply and list
756 self.transport.resume()
758 s = 'Return value: {!r}'.format(r)
763 self._add_stat(msgdef.name, (te - ts) * 1000)
766 def _call_vpp_async(self, i, msg, **kwargs):
767 """Given a message, send the message and return the context.
769 msgdef - the message packing definition
770 i - the message type index
771 context - context number - chosen at random if not
773 The remainder of the kwargs are the arguments to the API call.
775 The reply message(s) will be delivered later to the registered callback.
776 The returned context will help with assigning which call
777 the reply belongs to.
779 if 'context' not in kwargs:
780 context = self.get_context()
781 kwargs['context'] = context
783 context = kwargs['context']
785 if self.transport.socket_index:
786 kwargs['client_index'] = self.transport.socket_index
787 except AttributeError:
788 kwargs['client_index'] = 0
789 kwargs['_vl_msg_id'] = i
792 self.transport.write(b)
795 def read_blocking(self, no_type_conversion=False, timeout=None):
796 """Get next received message from transport within timeout, decoded.
798 Note that notifications have context zero
799 and are not put into receive queue (at least for socket transport),
800 use async_thread with registered callback for processing them.
802 If no message appears in the queue within timeout, return None.
804 Optionally, type conversion can be skipped,
805 as some of conversions are into less precise types.
807 When r is the return value of this, the caller can get message name as:
808 msgname = type(r).__name__
809 and context number (type long) as:
812 :param no_type_conversion: If false, type conversions are applied.
813 :type no_type_conversion: bool
814 :returns: Decoded message, or None if no message (within timeout).
815 :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
816 :raises VppTransportShmemIOError if timed out.
818 msg = self.transport.read(timeout=timeout)
821 return self.decode_incoming_msg(msg, no_type_conversion)
823 def register_event_callback(self, callback):
824 """Register a callback for async messages.
826 This will be called for async notifications in sync mode,
827 and all messages in async mode. In sync mode, replies to
828 requests will not come here.
830 callback is a fn(msg_type_name, msg_type) that will be
831 called when a message comes in. While this function is
832 executing, note that (a) you are in a background thread and
833 may wish to use threading.Lock to protect your datastructures,
834 and (b) message processing from VPP will stop (so if you take
835 a long while about it you may provoke reply timeouts or cause
836 VPP to fill the RX buffer). Passing None will disable the
839 self.event_callback = callback
841 def thread_msg_handler(self):
842 """Python thread calling the user registered message handler.
844 This is to emulate the old style event callback scheme. Modern
845 clients should provide their own thread to poll the event
849 r = self.message_queue.get()
850 if r == "terminate event thread":
852 msgname = type(r).__name__
853 if self.event_callback:
854 self.event_callback(msgname, r)
856 def validate_message_table(self, namecrctable):
857 """Take a dictionary of name_crc message names
858 and returns an array of missing messages"""
861 for name_crc in namecrctable:
862 i = self.transport.get_msg_index(name_crc)
864 missing_table.append(name_crc)
867 def dump_message_table(self):
868 """Return VPPs API message table as name_crc dictionary"""
869 return self.transport.message_table
871 def dump_message_table_filtered(self, msglist):
872 """Return VPPs API message table as name_crc dictionary,
873 filtered by message name list."""
875 replies = [self.services[n]['reply'] for n in msglist]
876 message_table_filtered = {}
877 for name in msglist + replies:
878 for k,v in self.transport.message_table.items():
879 if k.startswith(name):
880 message_table_filtered[k] = v
882 return message_table_filtered
885 return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
886 "logger=%s, read_timeout=%s, use_socket=%s, " \
887 "server_address='%s'>" % (
888 self._apifiles, self.testmode, self.async_thread,
889 self.logger, self.read_timeout, self.use_socket,
892 def details_iter(self, f, **kwargs):
895 kwargs['cursor'] = cursor
896 rv, details = f(**kwargs)
898 # Convert to yield from details when we only support python 3
902 if rv.retval == 0 or rv.retval != -165:
906 # Provide the old name for backward compatibility.
909 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4