import multiprocessing as mp
import os
import logging
-import collections
-import struct
import functools
import json
import threading
import fnmatch
import weakref
import atexit
-from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
+from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType
from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
-from . macaddress import MACAddress, mac_pton, mac_ntop
logger = logging.getLogger(__name__)
else:
import queue as queue
+__all__ = ('FuncWrapper', 'VPP', 'VppApiDynamicMethodHolder',
+ 'VppEnum', 'VppEnumType',
+ 'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
+ 'VPPApiClient', )
+
def metaclass(metaclass):
@functools.wraps(metaclass)
def __call__(self, **kwargs):
return self._func(**kwargs)
+ def __repr__(self):
+ return '<FuncWrapper(func=<%s(%s)>)>' % (self.__name__, self.__doc__)
+
class VPPApiError(Exception):
pass
self.read_timeout = read_timeout
self.async_thread = async_thread
self.event_thread = None
+ self.testmode = testmode
+ self.use_socket = use_socket
+ self.server_address = server_address
+ self._apifiles = apifiles
if use_socket:
from . vpp_transport_socket import VppTransport
if self.event_callback:
self.event_callback(msgname, r)
+ def __repr__(self):
+ return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
+ "logger=%s, read_timeout=%s, use_socket=%s, " \
+ "server_address='%s'>" % (
+ self._apifiles, self.testmode, self.async_thread,
+ self.logger, self.read_timeout, self.use_socket,
+ self.server_address)
+
+
# Provide the old name for backward compatibility.
VPP = VPPApiClient