+ @classmethod
+ def process_json_file(self, apidef_file):
+ api = json.load(apidef_file)
+ return self._process_json(api)
+
+ @classmethod
+ def process_json_str(self, json_str):
+ api = json.loads(json_str)
+ return self._process_json(api)
+
+ @classmethod
+ def process_json_array_str(self, json_str):
+ services = {}
+ messages = {}
+
+ apis = json.loads(json_str)
+ for a in apis:
+ m, s = self._process_json(a)
+ messages.update(m)
+ services.update(s)
+ return messages, services
+
+ @staticmethod
+ def _process_json(api): # -> Tuple[Dict, Dict]
+ types = {}
+ services = {}
+ messages = {}
+ try:
+ for t in api["enums"]:
+ t[0] = "vl_api_" + t[0] + "_t"
+ types[t[0]] = {"type": "enum", "data": t}
+ except KeyError:
+ pass
+ try:
+ for t in api["enumflags"]:
+ t[0] = "vl_api_" + t[0] + "_t"
+ types[t[0]] = {"type": "enum", "data": t}
+ except KeyError:
+ pass
+ try:
+ for t in api["unions"]:
+ t[0] = "vl_api_" + t[0] + "_t"
+ types[t[0]] = {"type": "union", "data": t}
+ except KeyError:
+ pass
+
+ try:
+ for t in api["types"]:
+ t[0] = "vl_api_" + t[0] + "_t"
+ types[t[0]] = {"type": "type", "data": t}
+ except KeyError:
+ pass
+
+ try:
+ for t, v in api["aliases"].items():
+ types["vl_api_" + t + "_t"] = {"type": "alias", "data": v}
+ except KeyError:
+ pass
+
+ try:
+ services.update(api["services"])
+ except KeyError:
+ pass
+
+ i = 0
+ while True:
+ unresolved = {}
+ for k, v in types.items():
+ t = v["data"]
+ if not vpp_get_type(k):
+ if v["type"] == "enum":
+ try:
+ VPPEnumType(t[0], t[1:])
+ except ValueError:
+ unresolved[k] = v
+ if not vpp_get_type(k):
+ if v["type"] == "enumflag":
+ try:
+ VPPEnumFlagType(t[0], t[1:])
+ except ValueError:
+ unresolved[k] = v
+ elif v["type"] == "union":
+ try:
+ VPPUnionType(t[0], t[1:])
+ except ValueError:
+ unresolved[k] = v
+ elif v["type"] == "type":
+ try:
+ VPPType(t[0], t[1:])
+ except ValueError:
+ unresolved[k] = v
+ elif v["type"] == "alias":
+ try:
+ VPPTypeAlias(k, t)
+ except ValueError:
+ unresolved[k] = v
+ if len(unresolved) == 0:
+ break
+ if i > 3:
+ raise VPPValueError("Unresolved type definitions {}".format(unresolved))
+ types = unresolved
+ i += 1
+ try:
+ for m in api["messages"]:
+ try:
+ messages[m[0]] = VPPMessage(m[0], m[1:])
+ except VPPNotImplementedError:
+ logger.error("Not implemented error for {}".format(m[0]))
+ except KeyError:
+ pass
+ return messages, services
+
+ @staticmethod
+ def load_api(apifiles=None, apidir=None):
+ messages = {}
+ services = {}
+ if not apifiles:
+ # Pick up API definitions from default directory
+ try:
+ if isinstance(apidir, list):
+ apifiles = []
+ for d in apidir:
+ apifiles += VPPApiJSONFiles.find_api_files(d)
+ else:
+ apifiles = VPPApiJSONFiles.find_api_files(apidir)
+ except (RuntimeError, VPPApiError):
+ raise VPPRuntimeError
+
+ for file in apifiles:
+ with open(file) as apidef_file:
+ m, s = VPPApiJSONFiles.process_json_file(apidef_file)
+ messages.update(m)
+ services.update(s)
+
+ return apifiles, messages, services
+
+
+class VPPApiClient:
+ """VPP interface.
+
+ This class provides the APIs to VPP. The APIs are loaded
+ from provided .api.json files and makes functions accordingly.
+ These functions are documented in the VPP .api files, as they
+ are dynamically created.
+
+ Additionally, VPP can send callback messages; this class
+ provides a means to register a callback function to receive
+ these messages in a background thread.
+ """
+
+ VPPApiError = VPPApiError
+ VPPRuntimeError = VPPRuntimeError
+ VPPValueError = VPPValueError
+ VPPNotImplementedError = VPPNotImplementedError
+ VPPIOError = VPPIOError
+
+ def __init__(
+ self,
+ *,
+ apifiles=None,
+ apidir=None,
+ testmode=False,
+ async_thread=True,
+ logger=None,
+ loglevel=None,
+ read_timeout=5,
+ use_socket=True,
+ server_address="/run/vpp/api.sock",
+ bootstrapapi=False,
+ ):
+ """Create a VPP API object.
+
+ apifiles is a list of files containing API
+ descriptions that will be loaded - methods will be
+ dynamically created reflecting these APIs. If not
+ provided this will load the API files from VPP's
+ default install location.
+
+ logger, if supplied, is the logging logger object to log to.
+ loglevel, if supplied, is the log level this logger is set
+ to report at (from the loglevels in the logging module).
+ """
+ if logger is None:
+ logger = logging.getLogger(
+ "{}.{}".format(__name__, self.__class__.__name__)
+ )
+ if loglevel is not None:
+ logger.setLevel(loglevel)
+ self.logger = logger
+
+ self.messages = {}
+ self.services = {}
+ self.id_names = []
+ self.id_msgdef = []
+ self.header = VPPType("header", [["u16", "msgid"], ["u32", "client_index"]])
+ self.apifiles = []
+ self.apidir = apidir
+ self.event_callback = None
+ self.message_queue = queue.Queue()
+ self.read_timeout = read_timeout
+ self.async_thread = async_thread
+ self.event_thread = None
+ self.testmode = testmode
+ self.server_address = server_address
+ self._apifiles = apifiles
+ self.stats = {}
+ self.bootstrapapi = bootstrapapi
+
+ if not bootstrapapi:
+ if self.apidir is None and hasattr(self.__class__, "apidir"):
+ # Keep supporting the old style of providing apidir.
+ self.apidir = self.__class__.apidir
+ try:
+ self.apifiles, self.messages, self.services = VPPApiJSONFiles.load_api(
+ apifiles, self.apidir
+ )
+ except VPPRuntimeError as e:
+ if testmode:
+ self.apifiles = []
+ else:
+ raise e
+ else:
+ # Bootstrap the API (memclnt.api bundled with VPP PAPI)
+ resource_path = "/".join(("data", "memclnt.api.json"))
+ file_content = pkg_resources.resource_string(__name__, resource_path)
+ self.messages, self.services = VPPApiJSONFiles.process_json_str(
+ file_content
+ )
+
+ # Basic sanity check
+ if len(self.messages) == 0 and not testmode:
+ raise VPPValueError(1, "Missing JSON message definitions")
+ if not bootstrapapi:
+ if not (verify_enum_hint(VppEnum.vl_api_address_family_t)):
+ raise VPPRuntimeError(
+ "Invalid address family hints. " "Cannot continue."
+ )
+
+ self.transport = VppTransport(
+ self, read_timeout=read_timeout, server_address=server_address
+ )
+ # Make sure we allow VPP to clean up the message rings.
+ atexit.register(vpp_atexit, weakref.ref(self))
+
+ add_convenience_methods()
+
+ def get_function(self, name):
+ return getattr(self._api, name)
+
+ class ContextId:
+ """Multiprocessing-safe provider of unique context IDs."""
+
+ def __init__(self):
+ self.context = mp.Value(ctypes.c_uint, 0)
+ self.lock = mp.Lock()
+
+ def __call__(self):
+ """Get a new unique (or, at least, not recently used) context."""
+ with self.lock:
+ self.context.value += 1
+ return self.context.value
+
+ get_context = ContextId()
+
+ def get_type(self, name):
+ return vpp_get_type(name)