X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvpp-api%2Fpython%2Fvpp_papi%2Fvpp_papi.py;h=3465f503e9e8d2e95c41d6d1b4d3e8f94d1f146e;hb=041372b79;hp=05688cec7319ff86f2992a33402a789bfa102182;hpb=ead1e536d66d83b546528c32e2112085a97c8e13;p=vpp.git diff --git a/src/vpp-api/python/vpp_papi/vpp_papi.py b/src/vpp-api/python/vpp_papi/vpp_papi.py index 05688cec731..3465f503e9e 100644 --- a/src/vpp-api/python/vpp_papi/vpp_papi.py +++ b/src/vpp-api/python/vpp_papi/vpp_papi.py @@ -17,9 +17,11 @@ from __future__ import print_function from __future__ import absolute_import import ctypes +import ipaddress import sys import multiprocessing as mp import os +import queue import logging import functools import json @@ -27,16 +29,28 @@ import threading import fnmatch import weakref import atexit -from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType +import time +from . vpp_format import verify_enum_hint +from . vpp_serializer import VPPType, VPPEnumType, VPPEnumFlagType, VPPUnionType from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias -if sys.version[0] == '2': - import Queue as queue -else: - import queue as queue +try: + import VppTransport +except ModuleNotFoundError: + class V: + """placeholder for VppTransport as the implementation is dependent on + VPPAPIClient's initialization values + """ + + VppTransport = V + +from . vpp_transport_socket import VppTransport + +logger = logging.getLogger('vpp_papi') +logger.addHandler(logging.NullHandler()) -__all__ = ('FuncWrapper', 'VPP', 'VppApiDynamicMethodHolder', - 'VppEnum', 'VppEnumType', +__all__ = ('FuncWrapper', 'VppApiDynamicMethodHolder', + 'VppEnum', 'VppEnumType', 'VppEnumFlag', 'VPPIOError', 'VPPRuntimeError', 'VPPValueError', 'VPPApiClient', ) @@ -56,7 +70,12 @@ class VppEnumType(type): @metaclass(VppEnumType) -class VppEnum(object): +class VppEnum: + pass + + +@metaclass(VppEnumType) +class VppEnumFlag: pass @@ -64,23 +83,35 @@ def vpp_atexit(vpp_weakref): """Clean up VPP connection on shutdown.""" vpp_instance = vpp_weakref() if vpp_instance and vpp_instance.transport.connected: - vpp_instance.logger.debug('Cleaning up VPP on exit') + logger.debug('Cleaning up VPP on exit') vpp_instance.disconnect() -if sys.version[0] == '2': - def vpp_iterator(d): - return d.iteritems() -else: - def vpp_iterator(d): - return d.items() +def add_convenience_methods(): + # provide convenience methods to IP[46]Address.vapi_af + def _vapi_af(self): + if 6 == self._version: + return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value + if 4 == self._version: + return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value + raise ValueError("Invalid _version.") + + def _vapi_af_name(self): + if 6 == self._version: + return 'ip6' + if 4 == self._version: + return 'ip4' + raise ValueError("Invalid _version.") + + ipaddress._IPAddressBase.vapi_af = property(_vapi_af) + ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name) -class VppApiDynamicMethodHolder(object): +class VppApiDynamicMethodHolder: pass -class FuncWrapper(object): +class FuncWrapper: def __init__(self, func): self._func = func self.__name__ = func.__name__ @@ -112,7 +143,8 @@ class VPPRuntimeError(RuntimeError): class VPPValueError(ValueError): pass -class VPPApiJSONFiles(object): + +class VPPApiJSONFiles: @classmethod def find_api_dir(cls, dirs): """Attempt to find the best directory in which API definition @@ -194,7 +226,7 @@ class VPPApiJSONFiles(object): return None @classmethod - def find_api_files(cls, api_dir=None, patterns='*'): + def find_api_files(cls, api_dir=None, patterns='*'): # -> list """Find API definition files from the given directory tree with the given pattern. If no directory is given then find_api_dir() is used to locate one. If no pattern is given then all definition files found @@ -236,21 +268,54 @@ class VPPApiJSONFiles(object): @classmethod def process_json_file(self, apidef_file): api = json.load(apidef_file) + return self._process_json(api) + + @classmethod + def process_json_str(self, json_str): + api = json.loads(json_str) + return self._process_json(api) + + @staticmethod + def _process_json(api): # -> Tuple[Dict, Dict] types = {} services = {} messages = {} - for t in api['enums']: - t[0] = 'vl_api_' + t[0] + '_t' - types[t[0]] = {'type': 'enum', 'data': t} - for t in api['unions']: - t[0] = 'vl_api_' + t[0] + '_t' - types[t[0]] = {'type': 'union', 'data': t} - for t in api['types']: - t[0] = 'vl_api_' + t[0] + '_t' - types[t[0]] = {'type': 'type', 'data': t} - for t, v in api['aliases'].items(): - types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v} - services.update(api['services']) + try: + for t in api['enums']: + t[0] = 'vl_api_' + t[0] + '_t' + types[t[0]] = {'type': 'enum', 'data': t} + except KeyError: + pass + try: + for t in api['enumflags']: + t[0] = 'vl_api_' + t[0] + '_t' + types[t[0]] = {'type': 'enum', 'data': t} + except KeyError: + pass + try: + for t in api['unions']: + t[0] = 'vl_api_' + t[0] + '_t' + types[t[0]] = {'type': 'union', 'data': t} + except KeyError: + pass + + try: + for t in api['types']: + t[0] = 'vl_api_' + t[0] + '_t' + types[t[0]] = {'type': 'type', 'data': t} + except KeyError: + pass + + try: + for t, v in api['aliases'].items(): + types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v} + except KeyError: + pass + + try: + services.update(api['services']) + except KeyError: + pass i = 0 while True: @@ -263,6 +328,12 @@ class VPPApiJSONFiles(object): VPPEnumType(t[0], t[1:]) except ValueError: unresolved[k] = v + if not vpp_get_type(k): + if v['type'] == 'enumflag': + try: + VPPEnumFlagType(t[0], t[1:]) + except ValueError: + unresolved[k] = v elif v['type'] == 'union': try: VPPUnionType(t[0], t[1:]) @@ -285,16 +356,19 @@ class VPPApiJSONFiles(object): .format(unresolved)) types = unresolved i += 1 - - for m in api['messages']: - try: - messages[m[0]] = VPPMessage(m[0], m[1:]) - except VPPNotImplementedError: - ### OLE FIXME - self.logger.error('Not implemented error for {}'.format(m[0])) + try: + for m in api['messages']: + try: + messages[m[0]] = VPPMessage(m[0], m[1:]) + except VPPNotImplementedError: + ### OLE FIXME + logger.error('Not implemented error for {}'.format(m[0])) + except KeyError: + pass return messages, services -class VPPApiClient(object): + +class VPPApiClient: """VPP interface. This class provides the APIs to VPP. The APIs are loaded @@ -314,9 +388,9 @@ class VPPApiClient(object): VPPIOError = VPPIOError - def __init__(self, apifiles=None, testmode=False, async_thread=True, + def __init__(self, *, apifiles=None, testmode=False, async_thread=True, logger=None, loglevel=None, - read_timeout=5, use_socket=False, + read_timeout=5, use_socket=True, server_address='/run/vpp/api.sock'): """Create a VPP API object. @@ -350,20 +424,15 @@ class VPPApiClient(object): self.async_thread = async_thread self.event_thread = None self.testmode = testmode - self.use_socket = use_socket self.server_address = server_address self._apifiles = apifiles - - if use_socket: - from . vpp_transport_socket import VppTransport - else: - from . vpp_transport_shmem import VppTransport + self.stats = {} if not apifiles: # Pick up API definitions from default directory try: apifiles = VPPApiJSONFiles.find_api_files(self.apidir) - except RuntimeError: + except (RuntimeError, VPPApiError): # In test mode we don't care that we can't find the API files if testmode: apifiles = [] @@ -381,17 +450,21 @@ class VPPApiClient(object): # Basic sanity check if len(self.messages) == 0 and not testmode: raise VPPValueError(1, 'Missing JSON message definitions') + if not(verify_enum_hint(VppEnum.vl_api_address_family_t)): + raise VPPRuntimeError("Invalid address family hints. " + "Cannot continue.") self.transport = VppTransport(self, read_timeout=read_timeout, server_address=server_address) # Make sure we allow VPP to clean up the message rings. atexit.register(vpp_atexit, weakref.ref(self)) + add_convenience_methods() + def get_function(self, name): return getattr(self._api, name) - - class ContextId(object): + class ContextId: """Multiprocessing-safe provider of unique context IDs.""" def __init__(self): self.context = mp.Value(ctypes.c_uint, 0) @@ -433,7 +506,7 @@ class VPPApiClient(object): self.id_names = [None] * (self.vpp_dictionary_maxid + 1) self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1) self._api = VppApiDynamicMethodHolder() - for name, msg in vpp_iterator(self.messages): + for name, msg in self.messages.items(): n = name + '_' + msg.crc[2:] i = self.transport.get_msg_index(n) if i > 0: @@ -442,12 +515,7 @@ class VPPApiClient(object): # Create function for client side messages. if name in self.services: - if 'stream' in self.services[name] and \ - self.services[name]['stream']: - multipart = True - else: - multipart = False - f = self.make_function(msg, i, multipart, do_async) + f = self.make_function(msg, i, self.services[name], do_async) setattr(self._api, name, FuncWrapper(f)) else: self.logger.debug( @@ -553,7 +621,7 @@ class VPPApiClient(object): def decode_incoming_msg(self, msg, no_type_conversion=False): if not msg: - self.logger.warning('vpp_api.read failed') + logger.warning('vpp_api.read failed') return (i, ci), size = self.header.unpack(msg, 0) @@ -596,7 +664,34 @@ class VPPApiClient(object): raise VPPValueError('Invalid argument {} to {}' .format(list(d), msg.name)) - def _call_vpp(self, i, msgdef, multipart, **kwargs): + def _add_stat(self, name, ms): + if not name in self.stats: + self.stats[name] = {'max': ms, 'count': 1, 'avg': ms} + else: + if ms > self.stats[name]['max']: + self.stats[name]['max'] = ms + self.stats[name]['count'] += 1 + n = self.stats[name]['count'] + self.stats[name]['avg'] = self.stats[name]['avg'] * (n - 1) / n + ms / n + + def get_stats(self): + s = '\n=== API PAPI STATISTICS ===\n' + s += '{:<30} {:>4} {:>6} {:>6}\n'.format('message', 'cnt', 'avg', 'max') + for n in sorted(self.stats.items(), key=lambda v: v[1]['avg'], reverse=True): + s += '{:<30} {:>4} {:>6.2f} {:>6.2f}\n'.format(n[0], n[1]['count'], + n[1]['avg'], n[1]['max']) + return s + + def get_field_options(self, msg, fld_name): + # when there is an option, the msgdef has 3 elements. + # ['u32', 'ring_size', {'default': 1024}] + for _def in self.messages[msg].msgdef: + if isinstance(_def, list) and \ + len(_def) == 3 and \ + _def[1] == fld_name: + return _def[2] + + def _call_vpp(self, i, msgdef, service, **kwargs): """Given a message, send the message and await a reply. msgdef - the message packing definition @@ -611,7 +706,7 @@ class VPPApiClient(object): the response. It will raise an IOError exception if there was no response within the timeout window. """ - + ts = time.time() if 'context' not in kwargs: context = self.get_context() kwargs['context'] = context @@ -620,6 +715,7 @@ class VPPApiClient(object): kwargs['_vl_msg_id'] = i no_type_conversion = kwargs.pop('_no_type_conversion', False) + timeout = kwargs.pop('_timeout', None) try: if self.transport.socket_index: @@ -637,15 +733,26 @@ class VPPApiClient(object): self.transport.write(b) - if multipart: - # Send a ping after the request - we use its response - # to detect that we have seen all results. - self._control_ping(context) + msgreply = service['reply'] + stream = True if 'stream' in service else False + if stream: + if 'stream_msg' in service: + # New service['reply'] = _reply and service['stream_message'] = _details + stream_message = service['stream_msg'] + modern =True + else: + # Old service['reply'] = _details + stream_message = msgreply + msgreply = 'control_ping_reply' + modern = False + # Send a ping after the request - we use its response + # to detect that we have seen all results. + self._control_ping(context) # Block until we get a reply. rl = [] while (True): - r = self.read_blocking(no_type_conversion) + r = self.read_blocking(no_type_conversion, timeout) if r is None: raise VPPIOError(2, 'VPP API client: read failed') msgname = type(r).__name__ @@ -653,11 +760,14 @@ class VPPApiClient(object): # Message being queued self.message_queue.put_nowait(r) continue - - if not multipart: + if msgname != msgreply and (stream and (msgname != stream_message)): + print('REPLY MISMATCH', msgreply, msgname, stream_message, stream) + if not stream: rl = r break - if msgname == 'control_ping_reply': + if msgname == msgreply: + if modern: # Return both reply and list + rl = r, rl break rl.append(r) @@ -668,6 +778,8 @@ class VPPApiClient(object): if len(s) > 80: s = s[:80] + "..." self.logger.debug(s) + te = time.time() + self._add_stat(msgdef.name, (te - ts) * 1000) return rl def _call_vpp_async(self, i, msg, **kwargs): @@ -699,10 +811,10 @@ class VPPApiClient(object): self.transport.write(b) return context - def read_blocking(self, no_type_conversion=False): + def read_blocking(self, no_type_conversion=False, timeout=None): """Get next received message from transport within timeout, decoded. - Note that noticifations have context zero + Note that notifications have context zero and are not put into receive queue (at least for socket transport), use async_thread with registered callback for processing them. @@ -720,8 +832,9 @@ class VPPApiClient(object): :type no_type_conversion: bool :returns: Decoded message, or None if no message (within timeout). :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion. + :raises VppTransportShmemIOError if timed out. """ - msg = self.transport.read() + msg = self.transport.read(timeout=timeout) if not msg: return None return self.decode_incoming_msg(msg, no_type_conversion) @@ -759,16 +872,48 @@ class VPPApiClient(object): if self.event_callback: self.event_callback(msgname, r) + def validate_message_table(self, namecrctable): + """Take a dictionary of name_crc message names + and returns an array of missing messages""" + + missing_table = [] + for name_crc in namecrctable: + i = self.transport.get_msg_index(name_crc) + if i <= 0: + missing_table.append(name_crc) + return missing_table + + def dump_message_table(self): + """Return VPPs API message table as name_crc dictionary""" + return self.transport.message_table + + def dump_message_table_filtered(self, msglist): + """Return VPPs API message table as name_crc dictionary, + filtered by message name list.""" + + replies = [self.services[n]['reply'] for n in msglist] + message_table_filtered = {} + for name in msglist + replies: + for k,v in self.transport.message_table.items(): + if k.startswith(name): + message_table_filtered[k] = v + break + return message_table_filtered + def __repr__(self): return "" % ( self._apifiles, self.testmode, self.async_thread, - self.logger, self.read_timeout, self.use_socket, - self.server_address) - + self.logger, self.read_timeout, self.server_address) -# Provide the old name for backward compatibility. -VPP = VPPApiClient - -# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 + def details_iter(self, f, **kwargs): + cursor = 0 + while True: + kwargs['cursor'] = cursor + rv, details = f(**kwargs) + for d in details: + yield d + if rv.retval == 0 or rv.retval != -165: + break + cursor = rv.cursor