3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
22 import multiprocessing as mp
33 from .vpp_format import verify_enum_hint
34 from .vpp_serializer import VPPType, VPPEnumType, VPPEnumFlagType, VPPUnionType
35 from .vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
39 except ModuleNotFoundError:
42 """placeholder for VppTransport as the implementation is dependent on
43 VPPAPIClient's initialization values
48 from .vpp_transport_socket import VppTransport
50 logger = logging.getLogger("vpp_papi")
51 logger.addHandler(logging.NullHandler())
55 "VppApiDynamicMethodHolder",
66 def metaclass(metaclass):
67 @functools.wraps(metaclass)
69 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
74 class VppEnumType(type):
75 def __getattr__(cls, name):
76 t = vpp_get_type(name)
80 @metaclass(VppEnumType)
85 @metaclass(VppEnumType)
90 def vpp_atexit(vpp_weakref):
91 """Clean up VPP connection on shutdown."""
92 vpp_instance = vpp_weakref()
93 if vpp_instance and vpp_instance.transport.connected:
94 logger.debug("Cleaning up VPP on exit")
95 vpp_instance.disconnect()
98 def add_convenience_methods():
99 # provide convenience methods to IP[46]Address.vapi_af
101 if 6 == self._version:
102 return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
103 if 4 == self._version:
104 return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
105 raise ValueError("Invalid _version.")
107 def _vapi_af_name(self):
108 if 6 == self._version:
110 if 4 == self._version:
112 raise ValueError("Invalid _version.")
114 ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
115 ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
118 class VppApiDynamicMethodHolder:
123 def __init__(self, func):
125 self.__name__ = func.__name__
126 self.__doc__ = func.__doc__
128 def __call__(self, **kwargs):
129 return self._func(**kwargs)
132 return "<FuncWrapper(func=<%s(%s)>)>" % (self.__name__, self.__doc__)
135 class VPPApiError(Exception):
139 class VPPNotImplementedError(NotImplementedError):
143 class VPPIOError(IOError):
147 class VPPRuntimeError(RuntimeError):
151 class VPPValueError(ValueError):
155 class VPPApiJSONFiles:
157 def find_api_dir(cls, dirs=[]):
158 """Attempt to find the best directory in which API definition
159 files may reside. If the value VPP_API_DIR exists in the environment
160 then it is first on the search list. If we're inside a recognized
161 location in a VPP source tree (src/scripts and src/vpp-api/python)
162 then entries from there to the likely locations in build-root are
163 added. Finally the location used by system packages is added.
165 :returns: A single directory name, or None if no such directory
169 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
170 # in which case, plot a course to likely places in the src tree
171 import __main__ as main
173 if os.getenv("VPP_API_DIR"):
174 dirs.append(os.getenv("VPP_API_DIR"))
176 if hasattr(main, "__file__"):
177 # get the path of the calling script
178 localdir = os.path.dirname(os.path.realpath(main.__file__))
180 # use cwd if there is no calling script
181 localdir = os.getcwd()
182 localdir_s = localdir.split(os.path.sep)
185 """Match dir against right-hand components of the script dir"""
186 d = dir.split("/") # param 'dir' assumes a / separator
188 return len(localdir_s) > length and localdir_s[-length:] == d
190 def sdir(srcdir, variant):
191 """Build a path from srcdir to the staged API files of
192 'variant' (typically '' or '_debug')"""
193 # Since 'core' and 'plugin' files are staged
194 # in separate directories, we target the parent dir.
195 return os.path.sep.join(
199 "install-vpp%s-native" % variant,
208 if dmatch("src/scripts"):
209 srcdir = os.path.sep.join(localdir_s[:-2])
210 elif dmatch("src/vpp-api/python"):
211 srcdir = os.path.sep.join(localdir_s[:-3])
213 # we're apparently running tests
214 srcdir = os.path.sep.join(localdir_s[:-1])
217 # we're in the source tree, try both the debug and release
219 dirs.append(sdir(srcdir, "_debug"))
220 dirs.append(sdir(srcdir, ""))
222 # Test for staged copies of the scripts
223 # For these, since we explicitly know if we're running a debug versus
224 # release variant, target only the relevant directory
225 if dmatch("build-root/install-vpp_debug-native/vpp/bin"):
226 srcdir = os.path.sep.join(localdir_s[:-4])
227 dirs.append(sdir(srcdir, "_debug"))
228 if dmatch("build-root/install-vpp-native/vpp/bin"):
229 srcdir = os.path.sep.join(localdir_s[:-4])
230 dirs.append(sdir(srcdir, ""))
232 # finally, try the location system packages typically install into
233 dirs.append(os.path.sep.join(("", "usr", "share", "vpp", "api")))
235 # check the directories for existence; first one wins
237 if os.path.isdir(dir):
243 def find_api_files(cls, api_dir=None, patterns="*"): # -> list
244 """Find API definition files from the given directory tree with the
245 given pattern. If no directory is given then find_api_dir() is used
246 to locate one. If no pattern is given then all definition files found
247 in the directory tree are used.
249 :param api_dir: A directory tree in which to locate API definition
250 files; subdirectories are descended into.
251 If this is None then find_api_dir() is called to discover it.
252 :param patterns: A list of patterns to use in each visited directory
253 when looking for files.
254 This can be a list/tuple object or a comma-separated string of
255 patterns. Each value in the list will have leading/trialing
257 The pattern specifies the first part of the filename, '.api.json'
259 The results are de-duplicated, thus overlapping patterns are fine.
260 If this is None it defaults to '*' meaning "all API files".
261 :returns: A list of file paths for the API files found.
264 api_dir = cls.find_api_dir([])
266 raise VPPApiError("api_dir cannot be located")
268 if isinstance(patterns, list) or isinstance(patterns, tuple):
269 patterns = [p.strip() + ".api.json" for p in patterns]
271 patterns = [p.strip() + ".api.json" for p in patterns.split(",")]
274 for root, dirnames, files in os.walk(api_dir):
275 # iterate all given patterns and de-dup the result
276 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
277 for filename in files:
278 api_files.append(os.path.join(root, filename))
283 def process_json_file(self, apidef_file):
284 api = json.load(apidef_file)
285 return self._process_json(api)
288 def process_json_str(self, json_str):
289 api = json.loads(json_str)
290 return self._process_json(api)
293 def _process_json(api): # -> Tuple[Dict, Dict]
298 for t in api["enums"]:
299 t[0] = "vl_api_" + t[0] + "_t"
300 types[t[0]] = {"type": "enum", "data": t}
304 for t in api["enumflags"]:
305 t[0] = "vl_api_" + t[0] + "_t"
306 types[t[0]] = {"type": "enum", "data": t}
310 for t in api["unions"]:
311 t[0] = "vl_api_" + t[0] + "_t"
312 types[t[0]] = {"type": "union", "data": t}
317 for t in api["types"]:
318 t[0] = "vl_api_" + t[0] + "_t"
319 types[t[0]] = {"type": "type", "data": t}
324 for t, v in api["aliases"].items():
325 types["vl_api_" + t + "_t"] = {"type": "alias", "data": v}
330 services.update(api["services"])
337 for k, v in types.items():
339 if not vpp_get_type(k):
340 if v["type"] == "enum":
342 VPPEnumType(t[0], t[1:])
345 if not vpp_get_type(k):
346 if v["type"] == "enumflag":
348 VPPEnumFlagType(t[0], t[1:])
351 elif v["type"] == "union":
353 VPPUnionType(t[0], t[1:])
356 elif v["type"] == "type":
361 elif v["type"] == "alias":
366 if len(unresolved) == 0:
369 raise VPPValueError("Unresolved type definitions {}".format(unresolved))
373 for m in api["messages"]:
375 messages[m[0]] = VPPMessage(m[0], m[1:])
376 except VPPNotImplementedError:
378 logger.error("Not implemented error for {}".format(m[0]))
381 return messages, services
387 This class provides the APIs to VPP. The APIs are loaded
388 from provided .api.json files and makes functions accordingly.
389 These functions are documented in the VPP .api files, as they
390 are dynamically created.
392 Additionally, VPP can send callback messages; this class
393 provides a means to register a callback function to receive
394 these messages in a background thread.
398 VPPApiError = VPPApiError
399 VPPRuntimeError = VPPRuntimeError
400 VPPValueError = VPPValueError
401 VPPNotImplementedError = VPPNotImplementedError
402 VPPIOError = VPPIOError
414 server_address="/run/vpp/api.sock",
416 """Create a VPP API object.
418 apifiles is a list of files containing API
419 descriptions that will be loaded - methods will be
420 dynamically created reflecting these APIs. If not
421 provided this will load the API files from VPP's
422 default install location.
424 logger, if supplied, is the logging logger object to log to.
425 loglevel, if supplied, is the log level this logger is set
426 to report at (from the loglevels in the logging module).
429 logger = logging.getLogger(
430 "{}.{}".format(__name__, self.__class__.__name__)
432 if loglevel is not None:
433 logger.setLevel(loglevel)
440 self.header = VPPType("header", [["u16", "msgid"], ["u32", "client_index"]])
442 self.event_callback = None
443 self.message_queue = queue.Queue()
444 self.read_timeout = read_timeout
445 self.async_thread = async_thread
446 self.event_thread = None
447 self.testmode = testmode
448 self.server_address = server_address
449 self._apifiles = apifiles
453 # Pick up API definitions from default directory
455 if isinstance(self.apidir, list):
457 for d in self.apidir:
458 apifiles += VPPApiJSONFiles.find_api_files(d)
460 apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
461 except (RuntimeError, VPPApiError):
462 # In test mode we don't care that we can't find the API files
466 raise VPPRuntimeError
468 for file in apifiles:
469 with open(file) as apidef_file:
470 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
471 self.messages.update(m)
472 self.services.update(s)
474 self.apifiles = apifiles
477 if len(self.messages) == 0 and not testmode:
478 raise VPPValueError(1, "Missing JSON message definitions")
479 if not (verify_enum_hint(VppEnum.vl_api_address_family_t)):
480 raise VPPRuntimeError("Invalid address family hints. " "Cannot continue.")
482 self.transport = VppTransport(
483 self, read_timeout=read_timeout, server_address=server_address
485 # Make sure we allow VPP to clean up the message rings.
486 atexit.register(vpp_atexit, weakref.ref(self))
488 add_convenience_methods()
490 def get_function(self, name):
491 return getattr(self._api, name)
494 """Multiprocessing-safe provider of unique context IDs."""
497 self.context = mp.Value(ctypes.c_uint, 0)
498 self.lock = mp.Lock()
501 """Get a new unique (or, at least, not recently used) context."""
503 self.context.value += 1
504 return self.context.value
506 get_context = ContextId()
508 def get_type(self, name):
509 return vpp_get_type(name)
513 if not hasattr(self, "_api"):
514 raise VPPApiError("Not connected, api definitions not available")
517 def make_function(self, msg, i, multipart, do_async):
521 return self._call_vpp_async(i, msg, **kwargs)
526 return self._call_vpp(i, msg, multipart, **kwargs)
528 f.__name__ = str(msg.name)
529 f.__doc__ = ", ".join(
530 ["%s %s" % (msg.fieldtypes[j], k) for j, k in enumerate(msg.fields)]
536 def make_pack_function(self, msg, i, multipart):
538 return self._call_vpp_pack(i, msg, **kwargs)
543 def _register_functions(self, do_async=False):
544 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
545 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
546 self._api = VppApiDynamicMethodHolder()
547 for name, msg in self.messages.items():
548 n = name + "_" + msg.crc[2:]
549 i = self.transport.get_msg_index(n)
551 self.id_msgdef[i] = msg
552 self.id_names[i] = name
554 # Create function for client side messages.
555 if name in self.services:
556 f = self.make_function(msg, i, self.services[name], do_async)
557 f_pack = self.make_pack_function(msg, i, self.services[name])
558 setattr(self._api, name, FuncWrapper(f))
559 setattr(self._api, name + "_pack", FuncWrapper(f_pack))
561 self.logger.debug("No such message type or failed CRC checksum: %s", n)
563 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, do_async):
564 pfx = chroot_prefix.encode("utf-8") if chroot_prefix else None
566 rv = self.transport.connect(name, pfx, msg_handler, rx_qlen, do_async)
568 raise VPPIOError(2, "Connect failed")
569 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
570 self._register_functions(do_async=do_async)
572 # Initialise control ping
573 crc = self.messages["control_ping"].crc
574 self.control_ping_index = self.transport.get_msg_index(
575 ("control_ping" + "_" + crc[2:])
577 self.control_ping_msgdef = self.messages["control_ping"]
578 if self.async_thread:
579 self.event_thread = threading.Thread(target=self.thread_msg_handler)
580 self.event_thread.daemon = True
581 self.event_thread.start()
583 self.event_thread = None
586 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
589 name - the name of the client.
590 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
591 do_async - if true, messages are sent without waiting for a reply
592 rx_qlen - the length of the VPP message receive queue between
595 msg_handler = self.transport.get_callback(do_async)
596 return self.connect_internal(
597 name, msg_handler, chroot_prefix, rx_qlen, do_async
600 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
601 """Attach to VPP in synchronous mode. Application must poll for events.
603 name - the name of the client.
604 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
605 rx_qlen - the length of the VPP message receive queue between
609 return self.connect_internal(name, None, chroot_prefix, rx_qlen, do_async=False)
611 def disconnect(self):
612 """Detach from VPP."""
613 rv = self.transport.disconnect()
614 if self.event_thread is not None:
615 self.message_queue.put("terminate event thread")
618 def msg_handler_sync(self, msg):
619 """Process an incoming message from VPP in sync mode.
621 The message may be a reply or it may be an async notification.
623 r = self.decode_incoming_msg(msg)
627 # If we have a context, then use the context to find any
628 # request waiting for a reply
630 if hasattr(r, "context") and r.context > 0:
634 # No context -> async notification that we feed to the callback
635 self.message_queue.put_nowait(r)
637 raise VPPIOError(2, "RPC reply message received in event handler")
639 def has_context(self, msg):
644 "header_with_context",
645 [["u16", "msgid"], ["u32", "client_index"], ["u32", "context"]],
648 (i, ci, context), size = header.unpack(msg, 0)
649 if self.id_names[i] == "rx_thread_exit":
653 # Decode message and returns a tuple.
655 msgobj = self.id_msgdef[i]
656 if "context" in msgobj.field_by_name and context >= 0:
660 def decode_incoming_msg(self, msg, no_type_conversion=False):
662 logger.warning("vpp_api.read failed")
665 (i, ci), size = self.header.unpack(msg, 0)
666 if self.id_names[i] == "rx_thread_exit":
670 # Decode message and returns a tuple.
672 msgobj = self.id_msgdef[i]
674 raise VPPIOError(2, "Reply message undefined")
676 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
679 def msg_handler_async(self, msg):
680 """Process a message from VPP in async mode.
682 In async mode, all messages are returned to the callback.
684 r = self.decode_incoming_msg(msg)
688 msgname = type(r).__name__
690 if self.event_callback:
691 self.event_callback(msgname, r)
693 def _control_ping(self, context):
694 """Send a ping command."""
695 self._call_vpp_async(
696 self.control_ping_index, self.control_ping_msgdef, context=context
699 def validate_args(self, msg, kwargs):
700 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
702 raise VPPValueError("Invalid argument {} to {}".format(list(d), msg.name))
704 def _add_stat(self, name, ms):
705 if not name in self.stats:
706 self.stats[name] = {"max": ms, "count": 1, "avg": ms}
708 if ms > self.stats[name]["max"]:
709 self.stats[name]["max"] = ms
710 self.stats[name]["count"] += 1
711 n = self.stats[name]["count"]
712 self.stats[name]["avg"] = self.stats[name]["avg"] * (n - 1) / n + ms / n
715 s = "\n=== API PAPI STATISTICS ===\n"
716 s += "{:<30} {:>4} {:>6} {:>6}\n".format("message", "cnt", "avg", "max")
717 for n in sorted(self.stats.items(), key=lambda v: v[1]["avg"], reverse=True):
718 s += "{:<30} {:>4} {:>6.2f} {:>6.2f}\n".format(
719 n[0], n[1]["count"], n[1]["avg"], n[1]["max"]
723 def get_field_options(self, msg, fld_name):
724 # when there is an option, the msgdef has 3 elements.
725 # ['u32', 'ring_size', {'default': 1024}]
726 for _def in self.messages[msg].msgdef:
727 if isinstance(_def, list) and len(_def) == 3 and _def[1] == fld_name:
730 def _call_vpp(self, i, msgdef, service, **kwargs):
731 """Given a message, send the message and await a reply.
733 msgdef - the message packing definition
734 i - the message type index
735 multipart - True if the message returns multiple
737 context - context number - chosen at random if not
739 The remainder of the kwargs are the arguments to the API call.
741 The return value is the message or message array containing
742 the response. It will raise an IOError exception if there was
743 no response within the timeout window.
746 if "context" not in kwargs:
747 context = self.get_context()
748 kwargs["context"] = context
750 context = kwargs["context"]
751 kwargs["_vl_msg_id"] = i
753 no_type_conversion = kwargs.pop("_no_type_conversion", False)
754 timeout = kwargs.pop("_timeout", None)
757 if self.transport.socket_index:
758 kwargs["client_index"] = self.transport.socket_index
759 except AttributeError:
761 self.validate_args(msgdef, kwargs)
763 s = "Calling {}({})".format(
764 msgdef.name, ",".join(["{!r}:{!r}".format(k, v) for k, v in kwargs.items()])
768 b = msgdef.pack(kwargs)
769 self.transport.suspend()
771 self.transport.write(b)
773 msgreply = service["reply"]
774 stream = True if "stream" in service else False
776 if "stream_msg" in service:
777 # New service['reply'] = _reply and service['stream_message'] = _details
778 stream_message = service["stream_msg"]
781 # Old service['reply'] = _details
782 stream_message = msgreply
783 msgreply = "control_ping_reply"
785 # Send a ping after the request - we use its response
786 # to detect that we have seen all results.
787 self._control_ping(context)
789 # Block until we get a reply.
792 r = self.read_blocking(no_type_conversion, timeout)
794 raise VPPIOError(2, "VPP API client: read failed")
795 msgname = type(r).__name__
796 if context not in r or r.context == 0 or context != r.context:
797 # Message being queued
798 self.message_queue.put_nowait(r)
800 if msgname != msgreply and (stream and (msgname != stream_message)):
801 print("REPLY MISMATCH", msgreply, msgname, stream_message, stream)
805 if msgname == msgreply:
806 if modern: # Return both reply and list
812 self.transport.resume()
814 s = "Return value: {!r}".format(r)
819 self._add_stat(msgdef.name, (te - ts) * 1000)
822 def _call_vpp_async(self, i, msg, **kwargs):
823 """Given a message, send the message and return the context.
825 msgdef - the message packing definition
826 i - the message type index
827 context - context number - chosen at random if not
829 The remainder of the kwargs are the arguments to the API call.
831 The reply message(s) will be delivered later to the registered callback.
832 The returned context will help with assigning which call
833 the reply belongs to.
835 if "context" not in kwargs:
836 context = self.get_context()
837 kwargs["context"] = context
839 context = kwargs["context"]
841 if self.transport.socket_index:
842 kwargs["client_index"] = self.transport.socket_index
843 except AttributeError:
844 kwargs["client_index"] = 0
845 kwargs["_vl_msg_id"] = i
848 self.transport.write(b)
851 def _call_vpp_pack(self, i, msg, **kwargs):
852 """Given a message, return the binary representation."""
853 kwargs["_vl_msg_id"] = i
854 kwargs["client_index"] = 0
855 kwargs["context"] = 0
856 return msg.pack(kwargs)
858 def read_blocking(self, no_type_conversion=False, timeout=None):
859 """Get next received message from transport within timeout, decoded.
861 Note that notifications have context zero
862 and are not put into receive queue (at least for socket transport),
863 use async_thread with registered callback for processing them.
865 If no message appears in the queue within timeout, return None.
867 Optionally, type conversion can be skipped,
868 as some of conversions are into less precise types.
870 When r is the return value of this, the caller can get message name as:
871 msgname = type(r).__name__
872 and context number (type long) as:
875 :param no_type_conversion: If false, type conversions are applied.
876 :type no_type_conversion: bool
877 :returns: Decoded message, or None if no message (within timeout).
878 :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
879 :raises VppTransportShmemIOError if timed out.
881 msg = self.transport.read(timeout=timeout)
884 return self.decode_incoming_msg(msg, no_type_conversion)
886 def register_event_callback(self, callback):
887 """Register a callback for async messages.
889 This will be called for async notifications in sync mode,
890 and all messages in async mode. In sync mode, replies to
891 requests will not come here.
893 callback is a fn(msg_type_name, msg_type) that will be
894 called when a message comes in. While this function is
895 executing, note that (a) you are in a background thread and
896 may wish to use threading.Lock to protect your datastructures,
897 and (b) message processing from VPP will stop (so if you take
898 a long while about it you may provoke reply timeouts or cause
899 VPP to fill the RX buffer). Passing None will disable the
902 self.event_callback = callback
904 def thread_msg_handler(self):
905 """Python thread calling the user registered message handler.
907 This is to emulate the old style event callback scheme. Modern
908 clients should provide their own thread to poll the event
912 r = self.message_queue.get()
913 if r == "terminate event thread":
915 msgname = type(r).__name__
916 if self.event_callback:
917 self.event_callback(msgname, r)
919 def validate_message_table(self, namecrctable):
920 """Take a dictionary of name_crc message names
921 and returns an array of missing messages"""
924 for name_crc in namecrctable:
925 i = self.transport.get_msg_index(name_crc)
927 missing_table.append(name_crc)
930 def dump_message_table(self):
931 """Return VPPs API message table as name_crc dictionary"""
932 return self.transport.message_table
934 def dump_message_table_filtered(self, msglist):
935 """Return VPPs API message table as name_crc dictionary,
936 filtered by message name list."""
938 replies = [self.services[n]["reply"] for n in msglist]
939 message_table_filtered = {}
940 for name in msglist + replies:
941 for k, v in self.transport.message_table.items():
942 if k.startswith(name):
943 message_table_filtered[k] = v
945 return message_table_filtered
949 "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, "
950 "logger=%s, read_timeout=%s, "
951 "server_address='%s'>"
962 def details_iter(self, f, **kwargs):
965 kwargs["cursor"] = cursor
966 rv, details = f(**kwargs)
969 if rv.retval == 0 or rv.retval != -165: