3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
22 import multiprocessing as mp
33 from . vpp_format import verify_enum_hint
34 from . vpp_serializer import VPPType, VPPEnumType, VPPEnumFlagType, VPPUnionType
35 from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
39 except ModuleNotFoundError:
41 """placeholder for VppTransport as the implementation is dependent on
42 VPPAPIClient's initialization values
47 logger = logging.getLogger('vpp_papi')
48 logger.addHandler(logging.NullHandler())
50 __all__ = ('FuncWrapper', 'VppApiDynamicMethodHolder',
51 'VppEnum', 'VppEnumType', 'VppEnumFlag',
52 'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
56 def metaclass(metaclass):
57 @functools.wraps(metaclass)
59 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
64 class VppEnumType(type):
65 def __getattr__(cls, name):
66 t = vpp_get_type(name)
70 @metaclass(VppEnumType)
75 @metaclass(VppEnumType)
80 def vpp_atexit(vpp_weakref):
81 """Clean up VPP connection on shutdown."""
82 vpp_instance = vpp_weakref()
83 if vpp_instance and vpp_instance.transport.connected:
84 logger.debug('Cleaning up VPP on exit')
85 vpp_instance.disconnect()
88 def add_convenience_methods():
89 # provide convenience methods to IP[46]Address.vapi_af
91 if 6 == self._version:
92 return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
93 if 4 == self._version:
94 return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
95 raise ValueError("Invalid _version.")
97 def _vapi_af_name(self):
98 if 6 == self._version:
100 if 4 == self._version:
102 raise ValueError("Invalid _version.")
104 ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
105 ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
108 class VppApiDynamicMethodHolder:
113 def __init__(self, func):
115 self.__name__ = func.__name__
116 self.__doc__ = func.__doc__
118 def __call__(self, **kwargs):
119 return self._func(**kwargs)
122 return '<FuncWrapper(func=<%s(%s)>)>' % (self.__name__, self.__doc__)
125 class VPPApiError(Exception):
129 class VPPNotImplementedError(NotImplementedError):
133 class VPPIOError(IOError):
137 class VPPRuntimeError(RuntimeError):
141 class VPPValueError(ValueError):
145 class VPPApiJSONFiles:
147 def find_api_dir(cls, dirs):
148 """Attempt to find the best directory in which API definition
149 files may reside. If the value VPP_API_DIR exists in the environment
150 then it is first on the search list. If we're inside a recognized
151 location in a VPP source tree (src/scripts and src/vpp-api/python)
152 then entries from there to the likely locations in build-root are
153 added. Finally the location used by system packages is added.
155 :returns: A single directory name, or None if no such directory
159 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
160 # in which case, plot a course to likely places in the src tree
161 import __main__ as main
162 if hasattr(main, '__file__'):
163 # get the path of the calling script
164 localdir = os.path.dirname(os.path.realpath(main.__file__))
166 # use cwd if there is no calling script
167 localdir = os.getcwd()
168 localdir_s = localdir.split(os.path.sep)
171 """Match dir against right-hand components of the script dir"""
172 d = dir.split('/') # param 'dir' assumes a / separator
174 return len(localdir_s) > length and localdir_s[-length:] == d
176 def sdir(srcdir, variant):
177 """Build a path from srcdir to the staged API files of
178 'variant' (typically '' or '_debug')"""
179 # Since 'core' and 'plugin' files are staged
180 # in separate directories, we target the parent dir.
181 return os.path.sep.join((
184 'install-vpp%s-native' % variant,
192 if dmatch('src/scripts'):
193 srcdir = os.path.sep.join(localdir_s[:-2])
194 elif dmatch('src/vpp-api/python'):
195 srcdir = os.path.sep.join(localdir_s[:-3])
197 # we're apparently running tests
198 srcdir = os.path.sep.join(localdir_s[:-1])
201 # we're in the source tree, try both the debug and release
203 dirs.append(sdir(srcdir, '_debug'))
204 dirs.append(sdir(srcdir, ''))
206 # Test for staged copies of the scripts
207 # For these, since we explicitly know if we're running a debug versus
208 # release variant, target only the relevant directory
209 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
210 srcdir = os.path.sep.join(localdir_s[:-4])
211 dirs.append(sdir(srcdir, '_debug'))
212 if dmatch('build-root/install-vpp-native/vpp/bin'):
213 srcdir = os.path.sep.join(localdir_s[:-4])
214 dirs.append(sdir(srcdir, ''))
216 # finally, try the location system packages typically install into
217 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
219 # check the directories for existence; first one wins
221 if os.path.isdir(dir):
227 def find_api_files(cls, api_dir=None, patterns='*'): # -> list
228 """Find API definition files from the given directory tree with the
229 given pattern. If no directory is given then find_api_dir() is used
230 to locate one. If no pattern is given then all definition files found
231 in the directory tree are used.
233 :param api_dir: A directory tree in which to locate API definition
234 files; subdirectories are descended into.
235 If this is None then find_api_dir() is called to discover it.
236 :param patterns: A list of patterns to use in each visited directory
237 when looking for files.
238 This can be a list/tuple object or a comma-separated string of
239 patterns. Each value in the list will have leading/trialing
241 The pattern specifies the first part of the filename, '.api.json'
243 The results are de-duplicated, thus overlapping patterns are fine.
244 If this is None it defaults to '*' meaning "all API files".
245 :returns: A list of file paths for the API files found.
248 api_dir = cls.find_api_dir([])
250 raise VPPApiError("api_dir cannot be located")
252 if isinstance(patterns, list) or isinstance(patterns, tuple):
253 patterns = [p.strip() + '.api.json' for p in patterns]
255 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
258 for root, dirnames, files in os.walk(api_dir):
259 # iterate all given patterns and de-dup the result
260 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
261 for filename in files:
262 api_files.append(os.path.join(root, filename))
267 def process_json_file(self, apidef_file):
268 api = json.load(apidef_file)
269 return self._process_json(api)
272 def process_json_str(self, json_str):
273 api = json.loads(json_str)
274 return self._process_json(api)
277 def _process_json(api): # -> Tuple[Dict, Dict]
282 for t in api['enums']:
283 t[0] = 'vl_api_' + t[0] + '_t'
284 types[t[0]] = {'type': 'enum', 'data': t}
288 for t in api['enumflags']:
289 t[0] = 'vl_api_' + t[0] + '_t'
290 types[t[0]] = {'type': 'enum', 'data': t}
294 for t in api['unions']:
295 t[0] = 'vl_api_' + t[0] + '_t'
296 types[t[0]] = {'type': 'union', 'data': t}
301 for t in api['types']:
302 t[0] = 'vl_api_' + t[0] + '_t'
303 types[t[0]] = {'type': 'type', 'data': t}
308 for t, v in api['aliases'].items():
309 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
314 services.update(api['services'])
321 for k, v in types.items():
323 if not vpp_get_type(k):
324 if v['type'] == 'enum':
326 VPPEnumType(t[0], t[1:])
329 if not vpp_get_type(k):
330 if v['type'] == 'enumflag':
332 VPPEnumFlagType(t[0], t[1:])
335 elif v['type'] == 'union':
337 VPPUnionType(t[0], t[1:])
340 elif v['type'] == 'type':
345 elif v['type'] == 'alias':
350 if len(unresolved) == 0:
353 raise VPPValueError('Unresolved type definitions {}'
358 for m in api['messages']:
360 messages[m[0]] = VPPMessage(m[0], m[1:])
361 except VPPNotImplementedError:
363 logger.error('Not implemented error for {}'.format(m[0]))
366 return messages, services
372 This class provides the APIs to VPP. The APIs are loaded
373 from provided .api.json files and makes functions accordingly.
374 These functions are documented in the VPP .api files, as they
375 are dynamically created.
377 Additionally, VPP can send callback messages; this class
378 provides a means to register a callback function to receive
379 these messages in a background thread.
382 VPPApiError = VPPApiError
383 VPPRuntimeError = VPPRuntimeError
384 VPPValueError = VPPValueError
385 VPPNotImplementedError = VPPNotImplementedError
386 VPPIOError = VPPIOError
389 def __init__(self, apifiles=None, testmode=False, async_thread=True,
390 logger=None, loglevel=None,
391 read_timeout=5, use_socket=False,
392 server_address='/run/vpp/api.sock'):
393 """Create a VPP API object.
395 apifiles is a list of files containing API
396 descriptions that will be loaded - methods will be
397 dynamically created reflecting these APIs. If not
398 provided this will load the API files from VPP's
399 default install location.
401 logger, if supplied, is the logging logger object to log to.
402 loglevel, if supplied, is the log level this logger is set
403 to report at (from the loglevels in the logging module).
406 logger = logging.getLogger(
407 "{}.{}".format(__name__, self.__class__.__name__))
408 if loglevel is not None:
409 logger.setLevel(loglevel)
416 self.header = VPPType('header', [['u16', 'msgid'],
417 ['u32', 'client_index']])
419 self.event_callback = None
420 self.message_queue = queue.Queue()
421 self.read_timeout = read_timeout
422 self.async_thread = async_thread
423 self.event_thread = None
424 self.testmode = testmode
425 self.use_socket = use_socket
426 self.server_address = server_address
427 self._apifiles = apifiles
431 from . vpp_transport_socket import VppTransport
433 from . vpp_transport_shmem import VppTransport
436 # Pick up API definitions from default directory
438 apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
439 except (RuntimeError, VPPApiError):
440 # In test mode we don't care that we can't find the API files
444 raise VPPRuntimeError
446 for file in apifiles:
447 with open(file) as apidef_file:
448 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
449 self.messages.update(m)
450 self.services.update(s)
452 self.apifiles = apifiles
455 if len(self.messages) == 0 and not testmode:
456 raise VPPValueError(1, 'Missing JSON message definitions')
457 if not(verify_enum_hint(VppEnum.vl_api_address_family_t)):
458 raise VPPRuntimeError("Invalid address family hints. "
461 self.transport = VppTransport(self, read_timeout=read_timeout,
462 server_address=server_address)
463 # Make sure we allow VPP to clean up the message rings.
464 atexit.register(vpp_atexit, weakref.ref(self))
466 add_convenience_methods()
468 def get_function(self, name):
469 return getattr(self._api, name)
472 """Multiprocessing-safe provider of unique context IDs."""
474 self.context = mp.Value(ctypes.c_uint, 0)
475 self.lock = mp.Lock()
478 """Get a new unique (or, at least, not recently used) context."""
480 self.context.value += 1
481 return self.context.value
482 get_context = ContextId()
484 def get_type(self, name):
485 return vpp_get_type(name)
489 if not hasattr(self, "_api"):
490 raise VPPApiError("Not connected, api definitions not available")
493 def make_function(self, msg, i, multipart, do_async):
496 return self._call_vpp_async(i, msg, **kwargs)
499 return self._call_vpp(i, msg, multipart, **kwargs)
501 f.__name__ = str(msg.name)
502 f.__doc__ = ", ".join(["%s %s" %
503 (msg.fieldtypes[j], k)
504 for j, k in enumerate(msg.fields)])
509 def _register_functions(self, do_async=False):
510 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
511 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
512 self._api = VppApiDynamicMethodHolder()
513 for name, msg in self.messages.items():
514 n = name + '_' + msg.crc[2:]
515 i = self.transport.get_msg_index(n)
517 self.id_msgdef[i] = msg
518 self.id_names[i] = name
520 # Create function for client side messages.
521 if name in self.services:
522 f = self.make_function(msg, i, self.services[name], do_async)
523 setattr(self._api, name, FuncWrapper(f))
526 'No such message type or failed CRC checksum: %s', n)
528 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
530 pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
532 rv = self.transport.connect(name, pfx,
533 msg_handler, rx_qlen)
535 raise VPPIOError(2, 'Connect failed')
536 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
537 self._register_functions(do_async=do_async)
539 # Initialise control ping
540 crc = self.messages['control_ping'].crc
541 self.control_ping_index = self.transport.get_msg_index(
542 ('control_ping' + '_' + crc[2:]))
543 self.control_ping_msgdef = self.messages['control_ping']
544 if self.async_thread:
545 self.event_thread = threading.Thread(
546 target=self.thread_msg_handler)
547 self.event_thread.daemon = True
548 self.event_thread.start()
550 self.event_thread = None
553 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
556 name - the name of the client.
557 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
558 do_async - if true, messages are sent without waiting for a reply
559 rx_qlen - the length of the VPP message receive queue between
562 msg_handler = self.transport.get_callback(do_async)
563 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
566 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
567 """Attach to VPP in synchronous mode. Application must poll for events.
569 name - the name of the client.
570 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
571 rx_qlen - the length of the VPP message receive queue between
575 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
578 def disconnect(self):
579 """Detach from VPP."""
580 rv = self.transport.disconnect()
581 if self.event_thread is not None:
582 self.message_queue.put("terminate event thread")
585 def msg_handler_sync(self, msg):
586 """Process an incoming message from VPP in sync mode.
588 The message may be a reply or it may be an async notification.
590 r = self.decode_incoming_msg(msg)
594 # If we have a context, then use the context to find any
595 # request waiting for a reply
597 if hasattr(r, 'context') and r.context > 0:
601 # No context -> async notification that we feed to the callback
602 self.message_queue.put_nowait(r)
604 raise VPPIOError(2, 'RPC reply message received in event handler')
606 def has_context(self, msg):
610 header = VPPType('header_with_context', [['u16', 'msgid'],
611 ['u32', 'client_index'],
614 (i, ci, context), size = header.unpack(msg, 0)
615 if self.id_names[i] == 'rx_thread_exit':
619 # Decode message and returns a tuple.
621 msgobj = self.id_msgdef[i]
622 if 'context' in msgobj.field_by_name and context >= 0:
626 def decode_incoming_msg(self, msg, no_type_conversion=False):
628 logger.warning('vpp_api.read failed')
631 (i, ci), size = self.header.unpack(msg, 0)
632 if self.id_names[i] == 'rx_thread_exit':
636 # Decode message and returns a tuple.
638 msgobj = self.id_msgdef[i]
640 raise VPPIOError(2, 'Reply message undefined')
642 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
645 def msg_handler_async(self, msg):
646 """Process a message from VPP in async mode.
648 In async mode, all messages are returned to the callback.
650 r = self.decode_incoming_msg(msg)
654 msgname = type(r).__name__
656 if self.event_callback:
657 self.event_callback(msgname, r)
659 def _control_ping(self, context):
660 """Send a ping command."""
661 self._call_vpp_async(self.control_ping_index,
662 self.control_ping_msgdef,
665 def validate_args(self, msg, kwargs):
666 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
668 raise VPPValueError('Invalid argument {} to {}'
669 .format(list(d), msg.name))
671 def _add_stat(self, name, ms):
672 if not name in self.stats:
673 self.stats[name] = {'max': ms, 'count': 1, 'avg': ms}
675 if ms > self.stats[name]['max']:
676 self.stats[name]['max'] = ms
677 self.stats[name]['count'] += 1
678 n = self.stats[name]['count']
679 self.stats[name]['avg'] = self.stats[name]['avg'] * (n - 1) / n + ms / n
682 s = '\n=== API PAPI STATISTICS ===\n'
683 s += '{:<30} {:>4} {:>6} {:>6}\n'.format('message', 'cnt', 'avg', 'max')
684 for n in sorted(self.stats.items(), key=lambda v: v[1]['avg'], reverse=True):
685 s += '{:<30} {:>4} {:>6.2f} {:>6.2f}\n'.format(n[0], n[1]['count'],
686 n[1]['avg'], n[1]['max'])
689 def get_field_options(self, msg, fld_name):
690 # when there is an option, the msgdef has 3 elements.
691 # ['u32', 'ring_size', {'default': 1024}]
692 for _def in self.messages[msg].msgdef:
693 if isinstance(_def, list) and \
698 def _call_vpp(self, i, msgdef, service, **kwargs):
699 """Given a message, send the message and await a reply.
701 msgdef - the message packing definition
702 i - the message type index
703 multipart - True if the message returns multiple
705 context - context number - chosen at random if not
707 The remainder of the kwargs are the arguments to the API call.
709 The return value is the message or message array containing
710 the response. It will raise an IOError exception if there was
711 no response within the timeout window.
714 if 'context' not in kwargs:
715 context = self.get_context()
716 kwargs['context'] = context
718 context = kwargs['context']
719 kwargs['_vl_msg_id'] = i
721 no_type_conversion = kwargs.pop('_no_type_conversion', False)
722 timeout = kwargs.pop('_timeout', None)
725 if self.transport.socket_index:
726 kwargs['client_index'] = self.transport.socket_index
727 except AttributeError:
729 self.validate_args(msgdef, kwargs)
731 s = 'Calling {}({})'.format(msgdef.name,
732 ','.join(['{!r}:{!r}'.format(k, v) for k, v in kwargs.items()]))
735 b = msgdef.pack(kwargs)
736 self.transport.suspend()
738 self.transport.write(b)
740 msgreply = service['reply']
741 stream = True if 'stream' in service else False
743 if 'stream_msg' in service:
744 # New service['reply'] = _reply and service['stream_message'] = _details
745 stream_message = service['stream_msg']
748 # Old service['reply'] = _details
749 stream_message = msgreply
750 msgreply = 'control_ping_reply'
752 # Send a ping after the request - we use its response
753 # to detect that we have seen all results.
754 self._control_ping(context)
756 # Block until we get a reply.
759 r = self.read_blocking(no_type_conversion, timeout)
761 raise VPPIOError(2, 'VPP API client: read failed')
762 msgname = type(r).__name__
763 if context not in r or r.context == 0 or context != r.context:
764 # Message being queued
765 self.message_queue.put_nowait(r)
767 if msgname != msgreply and (stream and (msgname != stream_message)):
768 print('REPLY MISMATCH', msgreply, msgname, stream_message, stream)
772 if msgname == msgreply:
773 if modern: # Return both reply and list
779 self.transport.resume()
781 s = 'Return value: {!r}'.format(r)
786 self._add_stat(msgdef.name, (te - ts) * 1000)
789 def _call_vpp_async(self, i, msg, **kwargs):
790 """Given a message, send the message and return the context.
792 msgdef - the message packing definition
793 i - the message type index
794 context - context number - chosen at random if not
796 The remainder of the kwargs are the arguments to the API call.
798 The reply message(s) will be delivered later to the registered callback.
799 The returned context will help with assigning which call
800 the reply belongs to.
802 if 'context' not in kwargs:
803 context = self.get_context()
804 kwargs['context'] = context
806 context = kwargs['context']
808 if self.transport.socket_index:
809 kwargs['client_index'] = self.transport.socket_index
810 except AttributeError:
811 kwargs['client_index'] = 0
812 kwargs['_vl_msg_id'] = i
815 self.transport.write(b)
818 def read_blocking(self, no_type_conversion=False, timeout=None):
819 """Get next received message from transport within timeout, decoded.
821 Note that notifications have context zero
822 and are not put into receive queue (at least for socket transport),
823 use async_thread with registered callback for processing them.
825 If no message appears in the queue within timeout, return None.
827 Optionally, type conversion can be skipped,
828 as some of conversions are into less precise types.
830 When r is the return value of this, the caller can get message name as:
831 msgname = type(r).__name__
832 and context number (type long) as:
835 :param no_type_conversion: If false, type conversions are applied.
836 :type no_type_conversion: bool
837 :returns: Decoded message, or None if no message (within timeout).
838 :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
839 :raises VppTransportShmemIOError if timed out.
841 msg = self.transport.read(timeout=timeout)
844 return self.decode_incoming_msg(msg, no_type_conversion)
846 def register_event_callback(self, callback):
847 """Register a callback for async messages.
849 This will be called for async notifications in sync mode,
850 and all messages in async mode. In sync mode, replies to
851 requests will not come here.
853 callback is a fn(msg_type_name, msg_type) that will be
854 called when a message comes in. While this function is
855 executing, note that (a) you are in a background thread and
856 may wish to use threading.Lock to protect your datastructures,
857 and (b) message processing from VPP will stop (so if you take
858 a long while about it you may provoke reply timeouts or cause
859 VPP to fill the RX buffer). Passing None will disable the
862 self.event_callback = callback
864 def thread_msg_handler(self):
865 """Python thread calling the user registered message handler.
867 This is to emulate the old style event callback scheme. Modern
868 clients should provide their own thread to poll the event
872 r = self.message_queue.get()
873 if r == "terminate event thread":
875 msgname = type(r).__name__
876 if self.event_callback:
877 self.event_callback(msgname, r)
879 def validate_message_table(self, namecrctable):
880 """Take a dictionary of name_crc message names
881 and returns an array of missing messages"""
884 for name_crc in namecrctable:
885 i = self.transport.get_msg_index(name_crc)
887 missing_table.append(name_crc)
890 def dump_message_table(self):
891 """Return VPPs API message table as name_crc dictionary"""
892 return self.transport.message_table
894 def dump_message_table_filtered(self, msglist):
895 """Return VPPs API message table as name_crc dictionary,
896 filtered by message name list."""
898 replies = [self.services[n]['reply'] for n in msglist]
899 message_table_filtered = {}
900 for name in msglist + replies:
901 for k,v in self.transport.message_table.items():
902 if k.startswith(name):
903 message_table_filtered[k] = v
905 return message_table_filtered
908 return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
909 "logger=%s, read_timeout=%s, use_socket=%s, " \
910 "server_address='%s'>" % (
911 self._apifiles, self.testmode, self.async_thread,
912 self.logger, self.read_timeout, self.use_socket,
915 def details_iter(self, f, **kwargs):
918 kwargs['cursor'] = cursor
919 rv, details = f(**kwargs)
922 if rv.retval == 0 or rv.retval != -165: