X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fvpp-api%2Fpython%2Fvpp_papi%2Fvpp_papi.py;h=818a55f52f37d721d431aae2a2cc6c415a06f32b;hb=6595ff7f88be45d4c3f4dae09f7253b8b4ed26af;hp=32abe14da949ee231a07bb3167f794413c847fd2;hpb=0bcad32b3870f9998fa1393418081cdda685272f;p=vpp.git diff --git a/src/vpp-api/python/vpp_papi/vpp_papi.py b/src/vpp-api/python/vpp_papi/vpp_papi.py index 32abe14da94..818a55f52f3 100644 --- a/src/vpp-api/python/vpp_papi/vpp_papi.py +++ b/src/vpp-api/python/vpp_papi/vpp_papi.py @@ -16,17 +16,18 @@ from __future__ import print_function from __future__ import absolute_import +import ctypes import sys +import multiprocessing as mp import os import logging -import collections -import struct +import functools import json import threading import fnmatch import weakref import atexit -from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes +from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias if sys.version[0] == '2': @@ -34,6 +35,19 @@ if sys.version[0] == '2': else: import queue as queue +__all__ = ('FuncWrapper', 'VPP', 'VppApiDynamicMethodHolder', + 'VppEnum', 'VppEnumType', + 'VPPIOError', 'VPPRuntimeError', 'VPPValueError', + 'VPPApiClient', ) + + +def metaclass(metaclass): + @functools.wraps(metaclass) + def wrapper(cls): + return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy()) + + return wrapper + class VppEnumType(type): def __getattr__(cls, name): @@ -41,11 +55,9 @@ class VppEnumType(type): return t.enum -# Python3 -# class VppEnum(metaclass=VppEnumType): -# pass +@metaclass(VppEnumType) class VppEnum(object): - __metaclass__ = VppEnumType + pass def vpp_atexit(vpp_weakref): @@ -55,6 +67,7 @@ def vpp_atexit(vpp_weakref): vpp_instance.logger.debug('Cleaning up VPP on exit') vpp_instance.disconnect() + if sys.version[0] == '2': def vpp_iterator(d): return d.iteritems() @@ -71,10 +84,14 @@ class FuncWrapper(object): def __init__(self, func): self._func = func self.__name__ = func.__name__ + self.__doc__ = func.__doc__ def __call__(self, **kwargs): return self._func(**kwargs) + def __repr__(self): + return ')>' % (self.__name__, self.__doc__) + class VPPApiError(Exception): pass @@ -96,7 +113,7 @@ class VPPValueError(ValueError): pass -class VPP(object): +class VPPApiClient(object): """VPP interface. This class provides the APIs to VPP. The APIs are loaded @@ -108,6 +125,7 @@ class VPP(object): provides a means to register a callback function to receive these messages in a background thread. """ + apidir = None VPPApiError = VPPApiError VPPRuntimeError = VPPRuntimeError VPPValueError = VPPValueError @@ -173,7 +191,7 @@ class VPP(object): def __init__(self, apifiles=None, testmode=False, async_thread=True, logger=None, loglevel=None, read_timeout=5, use_socket=False, - server_address='/run/vpp-api.sock'): + server_address='/run/vpp/api.sock'): """Create a VPP API object. apifiles is a list of files containing API @@ -187,7 +205,8 @@ class VPP(object): to report at (from the loglevels in the logging module). """ if logger is None: - logger = logging.getLogger(__name__) + logger = logging.getLogger( + "{}.{}".format(__name__, self.__class__.__name__)) if loglevel is not None: logger.setLevel(loglevel) self.logger = logger @@ -203,6 +222,11 @@ class VPP(object): self.message_queue = queue.Queue() self.read_timeout = read_timeout self.async_thread = async_thread + self.event_thread = None + self.testmode = testmode + self.use_socket = use_socket + self.server_address = server_address + self._apifiles = apifiles if use_socket: from . vpp_transport_socket import VppTransport @@ -236,16 +260,16 @@ class VPP(object): atexit.register(vpp_atexit, weakref.ref(self)) class ContextId(object): - """Thread-safe provider of unique context IDs.""" + """Multiprocessing-safe provider of unique context IDs.""" def __init__(self): - self.context = 0 - self.lock = threading.Lock() + self.context = mp.Value(ctypes.c_uint, 0) + self.lock = mp.Lock() def __call__(self): """Get a new unique (or, at least, not recently used) context.""" with self.lock: - self.context += 1 - return self.context + self.context.value += 1 + return self.context.value get_context = ContextId() def get_type(self, name): @@ -263,10 +287,7 @@ class VPP(object): :returns: A single directory name, or None if no such directory could be found. """ - dirs = [] - - if 'VPP_API_DIR' in os.environ: - dirs.append(os.environ['VPP_API_DIR']) + dirs = [cls.apidir] if cls.apidir else [] # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir; # in which case, plot a course to likely places in the src tree @@ -328,7 +349,7 @@ class VPP(object): # finally, try the location system packages typically install into dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api'))) - # check the directories for existance; first one wins + # check the directories for existence; first one wins for dir in dirs: if os.path.isdir(dir): return dir @@ -393,6 +414,8 @@ class VPP(object): f.__doc__ = ", ".join(["%s %s" % (msg.fieldtypes[j], k) for j, k in enumerate(msg.fields)]) + f.msg = msg + return f def _register_functions(self, do_async=False): @@ -401,7 +424,7 @@ class VPP(object): self._api = VppApiDynamicMethodHolder() for name, msg in vpp_iterator(self.messages): n = name + '_' + msg.crc[2:] - i = self.transport.get_msg_index(n.encode()) + i = self.transport.get_msg_index(n.encode('utf-8')) if i > 0: self.id_msgdef[i] = msg self.id_names[i] = name @@ -421,9 +444,10 @@ class VPP(object): def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, do_async): - pfx = chroot_prefix.encode() if chroot_prefix else None + pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None - rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen) + rv = self.transport.connect(name.encode('utf-8'), pfx, + msg_handler, rx_qlen) if rv != 0: raise VPPIOError(2, 'Connect failed') self.vpp_dictionary_maxid = self.transport.msg_table_max_index() @@ -432,13 +456,15 @@ class VPP(object): # Initialise control ping crc = self.messages['control_ping'].crc self.control_ping_index = self.transport.get_msg_index( - ('control_ping' + '_' + crc[2:]).encode()) + ('control_ping' + '_' + crc[2:]).encode('utf-8')) self.control_ping_msgdef = self.messages['control_ping'] if self.async_thread: self.event_thread = threading.Thread( target=self.thread_msg_handler) self.event_thread.daemon = True self.event_thread.start() + else: + self.event_thread = None return rv def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32): @@ -469,7 +495,8 @@ class VPP(object): def disconnect(self): """Detach from VPP.""" rv = self.transport.disconnect() - self.message_queue.put("terminate event thread") + if self.event_thread is not None: + self.message_queue.put("terminate event thread") return rv def msg_handler_sync(self, msg): @@ -493,10 +520,31 @@ class VPP(object): else: raise VPPIOError(2, 'RPC reply message received in event handler') + def has_context(self, msg): + if len(msg) < 10: + return False + + header = VPPType('header_with_context', [['u16', 'msgid'], + ['u32', 'client_index'], + ['u32', 'context']]) + + (i, ci, context), size = header.unpack(msg, 0) + if self.id_names[i] == 'rx_thread_exit': + return + + # + # Decode message and returns a tuple. + # + msgobj = self.id_msgdef[i] + if 'context' in msgobj.field_by_name and context >= 0: + return True + return False + def decode_incoming_msg(self, msg, no_type_conversion=False): if not msg: self.logger.warning('vpp_api.read failed') return + (i, ci), size = self.header.unpack(msg, 0) if self.id_names[i] == 'rx_thread_exit': return @@ -537,7 +585,7 @@ class VPP(object): raise VPPValueError('Invalid argument {} to {}' .format(list(d), msg.name)) - def _call_vpp(self, i, msg, multipart, **kwargs): + def _call_vpp(self, i, msgdef, multipart, **kwargs): """Given a message, send the message and await a reply. msgdef - the message packing definition @@ -567,8 +615,13 @@ class VPP(object): kwargs['client_index'] = self.transport.socket_index except AttributeError: pass - self.validate_args(msg, kwargs) - b = msg.pack(kwargs) + self.validate_args(msgdef, kwargs) + + s = 'Calling {}({})'.format(msgdef.name, + ','.join(['{!r}:{!r}'.format(k, v) for k, v in kwargs.items()])) + self.logger.debug(s) + + b = msgdef.pack(kwargs) self.transport.suspend() self.transport.write(b) @@ -601,6 +654,7 @@ class VPP(object): self.transport.resume() + self.logger.debug('Return from {!r}'.format(r)) return rl def _call_vpp_async(self, i, msg, **kwargs): @@ -660,5 +714,16 @@ class VPP(object): if self.event_callback: self.event_callback(msgname, r) + def __repr__(self): + return "" % ( + self._apifiles, self.testmode, self.async_thread, + self.logger, self.read_timeout, self.use_socket, + self.server_address) + + +# Provide the old name for backward compatibility. +VPP = VPPApiClient # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4