import sys
import multiprocessing as mp
import os
+import queue
import logging
import functools
import json
logger = logging.getLogger('vpp_papi')
logger.addHandler(logging.NullHandler())
-if sys.version[0] == '2':
- import Queue as queue
-else:
- import queue as queue
-
__all__ = ('FuncWrapper', 'VPP', 'VppApiDynamicMethodHolder',
'VppEnum', 'VppEnumType',
'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
@metaclass(VppEnumType)
-class VppEnum(object):
+class VppEnum:
pass
vpp_instance.disconnect()
-if sys.version[0] == '2':
- def vpp_iterator(d):
- return d.iteritems()
-else:
- def vpp_iterator(d):
- return d.items()
def add_convenience_methods():
ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
-class VppApiDynamicMethodHolder(object):
+class VppApiDynamicMethodHolder:
pass
-class FuncWrapper(object):
+class FuncWrapper:
def __init__(self, func):
self._func = func
self.__name__ = func.__name__
pass
-class VPPApiJSONFiles(object):
+class VPPApiJSONFiles:
@classmethod
def find_api_dir(cls, dirs):
"""Attempt to find the best directory in which API definition
return messages, services
-class VPPApiClient(object):
+class VPPApiClient:
"""VPP interface.
This class provides the APIs to VPP. The APIs are loaded
def get_function(self, name):
return getattr(self._api, name)
- class ContextId(object):
+ class ContextId:
"""Multiprocessing-safe provider of unique context IDs."""
def __init__(self):
self.context = mp.Value(ctypes.c_uint, 0)
self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
self._api = VppApiDynamicMethodHolder()
- for name, msg in vpp_iterator(self.messages):
+ for name, msg in self.messages.items():
n = name + '_' + msg.crc[2:]
i = self.transport.get_msg_index(n)
if i > 0: