import logging
import collections
import struct
+import functools
import json
import threading
import fnmatch
import atexit
from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
-from . vpp_format import VPPFormat
+from . macaddress import MACAddress, mac_pton, mac_ntop
+
+logger = logging.getLogger(__name__)
if sys.version[0] == '2':
import Queue as queue
import queue as queue
+def metaclass(metaclass):
+ @functools.wraps(metaclass)
+ def wrapper(cls):
+ return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
+
+ return wrapper
+
+
class VppEnumType(type):
def __getattr__(cls, name):
t = vpp_get_type(name)
return t.enum
-# Python3
-# class VppEnum(metaclass=VppEnumType):
-# pass
+@metaclass(VppEnumType)
class VppEnum(object):
- __metaclass__ = VppEnumType
+ pass
def vpp_atexit(vpp_weakref):
vpp_instance.disconnect()
-def vpp_iterator(d):
- if sys.version[0] == '2':
+if sys.version[0] == '2':
+ def vpp_iterator(d):
return d.iteritems()
- else:
+else:
+ def vpp_iterator(d):
return d.items()
+def call_logger(msgdef, kwargs):
+ s = 'Calling {}('.format(msgdef.name)
+ for k, v in kwargs.items():
+ s += '{}:{} '.format(k, v)
+ s += ')'
+ return s
+
+
+def return_logger(r):
+ s = 'Return from {}'.format(r)
+ return s
+
+
class VppApiDynamicMethodHolder(object):
pass
def __init__(self, func):
self._func = func
self.__name__ = func.__name__
+ self.__doc__ = func.__doc__
def __call__(self, **kwargs):
return self._func(**kwargs)
pass
-class VPP(object):
+class VPPApiClient(object):
"""VPP interface.
This class provides the APIs to VPP. The APIs are loaded
provides a means to register a callback function to receive
these messages in a background thread.
"""
+ apidir = None
VPPApiError = VPPApiError
VPPRuntimeError = VPPRuntimeError
VPPValueError = VPPValueError
types[t[0]] = {'type': 'type', 'data': t}
for t, v in api['aliases'].items():
types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
+ self.services.update(api['services'])
i = 0
while True:
self.logger = logger
self.messages = {}
+ self.services = {}
self.id_names = []
self.id_msgdef = []
self.header = VPPType('header', [['u16', 'msgid'],
self.message_queue = queue.Queue()
self.read_timeout = read_timeout
self.async_thread = async_thread
+ self.event_thread = None
if use_socket:
from . vpp_transport_socket import VppTransport
:returns: A single directory name, or None if no such directory
could be found.
"""
- dirs = []
-
- if 'VPP_API_DIR' in os.environ:
- dirs.append(os.environ['VPP_API_DIR'])
+ dirs = [cls.apidir] if cls.apidir else []
# perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
# in which case, plot a course to likely places in the src tree
# finally, try the location system packages typically install into
dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
- # check the directories for existance; first one wins
+ # check the directories for existence; first one wins
for dir in dirs:
if os.path.isdir(dir):
return dir
f.__doc__ = ", ".join(["%s %s" %
(msg.fieldtypes[j], k)
for j, k in enumerate(msg.fields)])
+ f.msg = msg
+
return f
def _register_functions(self, do_async=False):
self._api = VppApiDynamicMethodHolder()
for name, msg in vpp_iterator(self.messages):
n = name + '_' + msg.crc[2:]
- i = self.transport.get_msg_index(n.encode())
+ i = self.transport.get_msg_index(n.encode('utf-8'))
if i > 0:
self.id_msgdef[i] = msg
self.id_names[i] = name
- # TODO: Fix multipart (use services)
- multipart = True if name.find('_dump') > 0 else False
- f = self.make_function(msg, i, multipart, do_async)
- setattr(self._api, name, FuncWrapper(f))
+
+ # Create function for client side messages.
+ if name in self.services:
+ if 'stream' in self.services[name] and \
+ self.services[name]['stream']:
+ multipart = True
+ else:
+ multipart = False
+ f = self.make_function(msg, i, multipart, do_async)
+ setattr(self._api, name, FuncWrapper(f))
else:
self.logger.debug(
'No such message type or failed CRC checksum: %s', n)
def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
do_async):
- pfx = chroot_prefix.encode() if chroot_prefix else None
+ pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
- rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
+ rv = self.transport.connect(name.encode('utf-8'), pfx,
+ msg_handler, rx_qlen)
if rv != 0:
raise VPPIOError(2, 'Connect failed')
self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
# Initialise control ping
crc = self.messages['control_ping'].crc
self.control_ping_index = self.transport.get_msg_index(
- ('control_ping' + '_' + crc[2:]).encode())
+ ('control_ping' + '_' + crc[2:]).encode('utf-8'))
self.control_ping_msgdef = self.messages['control_ping']
if self.async_thread:
self.event_thread = threading.Thread(
target=self.thread_msg_handler)
self.event_thread.daemon = True
self.event_thread.start()
+ else:
+ self.event_thread = None
return rv
def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
def disconnect(self):
"""Detach from VPP."""
rv = self.transport.disconnect()
- self.message_queue.put("terminate event thread")
+ if self.event_thread is not None:
+ self.message_queue.put("terminate event thread")
return rv
def msg_handler_sync(self, msg):
else:
raise VPPIOError(2, 'RPC reply message received in event handler')
- def decode_incoming_msg(self, msg):
+ def has_context(self, msg):
+ if len(msg) < 10:
+ return False
+
+ header = VPPType('header_with_context', [['u16', 'msgid'],
+ ['u32', 'client_index'],
+ ['u32', 'context']])
+
+ (i, ci, context), size = header.unpack(msg, 0)
+ if self.id_names[i] == 'rx_thread_exit':
+ return
+
+ #
+ # Decode message and returns a tuple.
+ #
+ msgobj = self.id_msgdef[i]
+ if 'context' in msgobj.field_by_name and context >= 0:
+ return True
+ return False
+
+ def decode_incoming_msg(self, msg, no_type_conversion=False):
if not msg:
self.logger.warning('vpp_api.read failed')
return
+
(i, ci), size = self.header.unpack(msg, 0)
if self.id_names[i] == 'rx_thread_exit':
return
if not msgobj:
raise VPPIOError(2, 'Reply message undefined')
- r, size = msgobj.unpack(msg)
+ r, size = msgobj.unpack(msg, ntc=no_type_conversion)
return r
def msg_handler_async(self, msg):
d = set(kwargs.keys()) - set(msg.field_by_name.keys())
if d:
raise VPPValueError('Invalid argument {} to {}'
- .format(list(d), msg.name))
+ .format(list(d), msg.name))
- def _call_vpp(self, i, msg, multipart, **kwargs):
+ def _call_vpp(self, i, msgdef, multipart, **kwargs):
"""Given a message, send the message and await a reply.
msgdef - the message packing definition
context = kwargs['context']
kwargs['_vl_msg_id'] = i
+ no_type_conversion = kwargs.pop('_no_type_conversion', False)
+
try:
if self.transport.socket_index:
kwargs['client_index'] = self.transport.socket_index
except AttributeError:
pass
- self.validate_args(msg, kwargs)
- b = msg.pack(kwargs)
+ self.validate_args(msgdef, kwargs)
+
+ logging.debug(call_logger(msgdef, kwargs))
+
+ b = msgdef.pack(kwargs)
self.transport.suspend()
self.transport.write(b)
msg = self.transport.read()
if not msg:
raise VPPIOError(2, 'VPP API client: read failed')
- r = self.decode_incoming_msg(msg)
+ r = self.decode_incoming_msg(msg, no_type_conversion)
msgname = type(r).__name__
if context not in r or r.context == 0 or context != r.context:
# Message being queued
self.transport.resume()
+ logger.debug(return_logger(rl))
return rl
def _call_vpp_async(self, i, msg, **kwargs):
if self.event_callback:
self.event_callback(msgname, r)
+# Provide the old name for backward compatibility.
+VPP = VPPApiClient
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4