import logging
import collections
import struct
+import functools
import json
import threading
import fnmatch
import atexit
from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
+from . macaddress import MACAddress, mac_pton, mac_ntop
+
+logger = logging.getLogger(__name__)
if sys.version[0] == '2':
import Queue as queue
import queue as queue
+def metaclass(metaclass):
+ @functools.wraps(metaclass)
+ def wrapper(cls):
+ return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
+
+ return wrapper
+
+
class VppEnumType(type):
def __getattr__(cls, name):
t = vpp_get_type(name)
return t.enum
-# Python3
-# class VppEnum(metaclass=VppEnumType):
-# pass
+@metaclass(VppEnumType)
class VppEnum(object):
- __metaclass__ = VppEnumType
+ pass
def vpp_atexit(vpp_weakref):
vpp_instance.logger.debug('Cleaning up VPP on exit')
vpp_instance.disconnect()
+
if sys.version[0] == '2':
def vpp_iterator(d):
return d.iteritems()
return d.items()
+def call_logger(msgdef, kwargs):
+ s = 'Calling {}('.format(msgdef.name)
+ for k, v in kwargs.items():
+ s += '{}:{} '.format(k, v)
+ s += ')'
+ return s
+
+
+def return_logger(r):
+ s = 'Return from {}'.format(r)
+ return s
+
+
class VppApiDynamicMethodHolder(object):
pass
def __init__(self, func):
self._func = func
self.__name__ = func.__name__
+ self.__doc__ = func.__doc__
def __call__(self, **kwargs):
return self._func(**kwargs)
pass
-class VPP(object):
+class VPPApiClient(object):
"""VPP interface.
This class provides the APIs to VPP. The APIs are loaded
provides a means to register a callback function to receive
these messages in a background thread.
"""
+ apidir = None
VPPApiError = VPPApiError
VPPRuntimeError = VPPRuntimeError
VPPValueError = VPPValueError
self.message_queue = queue.Queue()
self.read_timeout = read_timeout
self.async_thread = async_thread
+ self.event_thread = None
if use_socket:
from . vpp_transport_socket import VppTransport
:returns: A single directory name, or None if no such directory
could be found.
"""
- dirs = []
-
- if 'VPP_API_DIR' in os.environ:
- dirs.append(os.environ['VPP_API_DIR'])
+ dirs = [cls.apidir] if cls.apidir else []
# perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
# in which case, plot a course to likely places in the src tree
# finally, try the location system packages typically install into
dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
- # check the directories for existance; first one wins
+ # check the directories for existence; first one wins
for dir in dirs:
if os.path.isdir(dir):
return dir
f.__doc__ = ", ".join(["%s %s" %
(msg.fieldtypes[j], k)
for j, k in enumerate(msg.fields)])
+ f.msg = msg
+
return f
def _register_functions(self, do_async=False):
self._api = VppApiDynamicMethodHolder()
for name, msg in vpp_iterator(self.messages):
n = name + '_' + msg.crc[2:]
- i = self.transport.get_msg_index(n.encode())
+ i = self.transport.get_msg_index(n.encode('utf-8'))
if i > 0:
self.id_msgdef[i] = msg
self.id_names[i] = name
def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
do_async):
- pfx = chroot_prefix.encode() if chroot_prefix else None
+ pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
- rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
+ rv = self.transport.connect(name.encode('utf-8'), pfx,
+ msg_handler, rx_qlen)
if rv != 0:
raise VPPIOError(2, 'Connect failed')
self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
# Initialise control ping
crc = self.messages['control_ping'].crc
self.control_ping_index = self.transport.get_msg_index(
- ('control_ping' + '_' + crc[2:]).encode())
+ ('control_ping' + '_' + crc[2:]).encode('utf-8'))
self.control_ping_msgdef = self.messages['control_ping']
if self.async_thread:
self.event_thread = threading.Thread(
target=self.thread_msg_handler)
self.event_thread.daemon = True
self.event_thread.start()
+ else:
+ self.event_thread = None
return rv
def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
def disconnect(self):
"""Detach from VPP."""
rv = self.transport.disconnect()
- self.message_queue.put("terminate event thread")
+ if self.event_thread is not None:
+ self.message_queue.put("terminate event thread")
return rv
def msg_handler_sync(self, msg):
else:
raise VPPIOError(2, 'RPC reply message received in event handler')
+ def has_context(self, msg):
+ if len(msg) < 10:
+ return False
+
+ header = VPPType('header_with_context', [['u16', 'msgid'],
+ ['u32', 'client_index'],
+ ['u32', 'context']])
+
+ (i, ci, context), size = header.unpack(msg, 0)
+ if self.id_names[i] == 'rx_thread_exit':
+ return
+
+ #
+ # Decode message and returns a tuple.
+ #
+ msgobj = self.id_msgdef[i]
+ if 'context' in msgobj.field_by_name and context >= 0:
+ return True
+ return False
+
def decode_incoming_msg(self, msg, no_type_conversion=False):
if not msg:
self.logger.warning('vpp_api.read failed')
return
+
(i, ci), size = self.header.unpack(msg, 0)
if self.id_names[i] == 'rx_thread_exit':
return
raise VPPValueError('Invalid argument {} to {}'
.format(list(d), msg.name))
- def _call_vpp(self, i, msg, multipart, **kwargs):
+ def _call_vpp(self, i, msgdef, multipart, **kwargs):
"""Given a message, send the message and await a reply.
msgdef - the message packing definition
kwargs['client_index'] = self.transport.socket_index
except AttributeError:
pass
- self.validate_args(msg, kwargs)
- b = msg.pack(kwargs)
+ self.validate_args(msgdef, kwargs)
+
+ logging.debug(call_logger(msgdef, kwargs))
+
+ b = msgdef.pack(kwargs)
self.transport.suspend()
self.transport.write(b)
self.transport.resume()
+ logger.debug(return_logger(rl))
return rl
def _call_vpp_async(self, i, msg, **kwargs):
if self.event_callback:
self.event_callback(msgname, r)
+# Provide the old name for backward compatibility.
+VPP = VPPApiClient
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4