+ @classmethod
+ def process_json_file(self, apidef_file):
+ api = json.load(apidef_file)
+ return self._process_json(api)
+
+ @classmethod
+ def process_json_str(self, json_str):
+ api = json.loads(json_str)
+ return self._process_json(api)
+
+ @staticmethod
+ def _process_json(api): # -> Tuple[Dict, Dict]
+ types = {}
+ services = {}
+ messages = {}
+ try:
+ for t in api['enums']:
+ t[0] = 'vl_api_' + t[0] + '_t'
+ types[t[0]] = {'type': 'enum', 'data': t}
+ except KeyError:
+ pass
+ try:
+ for t in api['enumflags']:
+ t[0] = 'vl_api_' + t[0] + '_t'
+ types[t[0]] = {'type': 'enum', 'data': t}
+ except KeyError:
+ pass
+ try:
+ for t in api['unions']:
+ t[0] = 'vl_api_' + t[0] + '_t'
+ types[t[0]] = {'type': 'union', 'data': t}
+ except KeyError:
+ pass
+
+ try:
+ for t in api['types']:
+ t[0] = 'vl_api_' + t[0] + '_t'
+ types[t[0]] = {'type': 'type', 'data': t}
+ except KeyError:
+ pass
+
+ try:
+ for t, v in api['aliases'].items():
+ types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
+ except KeyError:
+ pass
+
+ try:
+ services.update(api['services'])
+ except KeyError:
+ pass
+
+ i = 0
+ while True:
+ unresolved = {}
+ for k, v in types.items():
+ t = v['data']
+ if not vpp_get_type(k):
+ if v['type'] == 'enum':
+ try:
+ VPPEnumType(t[0], t[1:])
+ except ValueError:
+ unresolved[k] = v
+ if not vpp_get_type(k):
+ if v['type'] == 'enumflag':
+ try:
+ VPPEnumFlagType(t[0], t[1:])
+ except ValueError:
+ unresolved[k] = v
+ elif v['type'] == 'union':
+ try:
+ VPPUnionType(t[0], t[1:])
+ except ValueError:
+ unresolved[k] = v
+ elif v['type'] == 'type':
+ try:
+ VPPType(t[0], t[1:])
+ except ValueError:
+ unresolved[k] = v
+ elif v['type'] == 'alias':
+ try:
+ VPPTypeAlias(k, t)
+ except ValueError:
+ unresolved[k] = v
+ if len(unresolved) == 0:
+ break
+ if i > 3:
+ raise VPPValueError('Unresolved type definitions {}'
+ .format(unresolved))
+ types = unresolved
+ i += 1
+ try:
+ for m in api['messages']:
+ try:
+ messages[m[0]] = VPPMessage(m[0], m[1:])
+ except VPPNotImplementedError:
+ ### OLE FIXME
+ logger.error('Not implemented error for {}'.format(m[0]))
+ except KeyError:
+ pass
+ return messages, services
+
+
+class VPPApiClient:
+ """VPP interface.
+
+ This class provides the APIs to VPP. The APIs are loaded
+ from provided .api.json files and makes functions accordingly.
+ These functions are documented in the VPP .api files, as they
+ are dynamically created.
+
+ Additionally, VPP can send callback messages; this class
+ provides a means to register a callback function to receive
+ these messages in a background thread.
+ """
+ apidir = None
+ VPPApiError = VPPApiError
+ VPPRuntimeError = VPPRuntimeError
+ VPPValueError = VPPValueError
+ VPPNotImplementedError = VPPNotImplementedError
+ VPPIOError = VPPIOError
+
+
+ def __init__(self, *, apifiles=None, testmode=False, async_thread=True,
+ logger=None, loglevel=None,
+ read_timeout=5, use_socket=True,
+ server_address='/run/vpp/api.sock'):
+ """Create a VPP API object.
+
+ apifiles is a list of files containing API
+ descriptions that will be loaded - methods will be
+ dynamically created reflecting these APIs. If not
+ provided this will load the API files from VPP's
+ default install location.
+
+ logger, if supplied, is the logging logger object to log to.
+ loglevel, if supplied, is the log level this logger is set
+ to report at (from the loglevels in the logging module).
+ """
+ if logger is None:
+ logger = logging.getLogger(
+ "{}.{}".format(__name__, self.__class__.__name__))
+ if loglevel is not None:
+ logger.setLevel(loglevel)
+ self.logger = logger
+
+ self.messages = {}
+ self.services = {}
+ self.id_names = []
+ self.id_msgdef = []
+ self.header = VPPType('header', [['u16', 'msgid'],
+ ['u32', 'client_index']])
+ self.apifiles = []
+ self.event_callback = None
+ self.message_queue = queue.Queue()
+ self.read_timeout = read_timeout
+ self.async_thread = async_thread
+ self.event_thread = None
+ self.testmode = testmode
+ self.use_socket = use_socket
+ self.server_address = server_address
+ self._apifiles = apifiles
+ self.stats = {}
+
+ if use_socket:
+ from . vpp_transport_socket import VppTransport
+ else:
+ from . vpp_transport_shmem import VppTransport
+
+ if not apifiles:
+ # Pick up API definitions from default directory
+ try:
+ apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
+ except (RuntimeError, VPPApiError):
+ # In test mode we don't care that we can't find the API files
+ if testmode:
+ apifiles = []
+ else:
+ raise VPPRuntimeError
+
+ for file in apifiles:
+ with open(file) as apidef_file:
+ m, s = VPPApiJSONFiles.process_json_file(apidef_file)
+ self.messages.update(m)
+ self.services.update(s)
+
+ self.apifiles = apifiles
+
+ # Basic sanity check
+ if len(self.messages) == 0 and not testmode:
+ raise VPPValueError(1, 'Missing JSON message definitions')
+ if not(verify_enum_hint(VppEnum.vl_api_address_family_t)):
+ raise VPPRuntimeError("Invalid address family hints. "
+ "Cannot continue.")
+
+ self.transport = VppTransport(self, read_timeout=read_timeout,
+ server_address=server_address)
+ # Make sure we allow VPP to clean up the message rings.
+ atexit.register(vpp_atexit, weakref.ref(self))
+
+ add_convenience_methods()
+
+ def get_function(self, name):
+ return getattr(self._api, name)
+
+ class ContextId:
+ """Multiprocessing-safe provider of unique context IDs."""
+ def __init__(self):
+ self.context = mp.Value(ctypes.c_uint, 0)
+ self.lock = mp.Lock()
+
+ def __call__(self):
+ """Get a new unique (or, at least, not recently used) context."""
+ with self.lock:
+ self.context.value += 1
+ return self.context.value
+ get_context = ContextId()
+
+ def get_type(self, name):
+ return vpp_get_type(name)
+