3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
22 import multiprocessing as mp
33 from .vpp_format import verify_enum_hint
34 from .vpp_serializer import VPPType, VPPEnumType, VPPEnumFlagType, VPPUnionType
35 from .vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
39 except ModuleNotFoundError:
42 """placeholder for VppTransport as the implementation is dependent on
43 VPPAPIClient's initialization values
48 from .vpp_transport_socket import VppTransport
50 logger = logging.getLogger("vpp_papi")
51 logger.addHandler(logging.NullHandler())
55 "VppApiDynamicMethodHolder",
66 def metaclass(metaclass):
67 @functools.wraps(metaclass)
69 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
74 class VppEnumType(type):
75 def __getattr__(cls, name):
76 t = vpp_get_type(name)
80 @metaclass(VppEnumType)
85 @metaclass(VppEnumType)
90 def vpp_atexit(vpp_weakref):
91 """Clean up VPP connection on shutdown."""
92 vpp_instance = vpp_weakref()
93 if vpp_instance and vpp_instance.transport.connected:
94 logger.debug("Cleaning up VPP on exit")
95 vpp_instance.disconnect()
98 def add_convenience_methods():
99 # provide convenience methods to IP[46]Address.vapi_af
101 if 6 == self._version:
102 return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
103 if 4 == self._version:
104 return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
105 raise ValueError("Invalid _version.")
107 def _vapi_af_name(self):
108 if 6 == self._version:
110 if 4 == self._version:
112 raise ValueError("Invalid _version.")
114 ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
115 ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
118 class VppApiDynamicMethodHolder:
123 def __init__(self, func):
125 self.__name__ = func.__name__
126 self.__doc__ = func.__doc__
128 def __call__(self, **kwargs):
129 return self._func(**kwargs)
132 return "<FuncWrapper(func=<%s(%s)>)>" % (self.__name__, self.__doc__)
135 class VPPApiError(Exception):
139 class VPPNotImplementedError(NotImplementedError):
143 class VPPIOError(IOError):
147 class VPPRuntimeError(RuntimeError):
151 class VPPValueError(ValueError):
155 class VPPApiJSONFiles:
157 def find_api_dir(cls, dirs=[]):
158 """Attempt to find the best directory in which API definition
159 files may reside. If the value VPP_API_DIR exists in the environment
160 then it is first on the search list. If we're inside a recognized
161 location in a VPP source tree (src/scripts and src/vpp-api/python)
162 then entries from there to the likely locations in build-root are
163 added. Finally the location used by system packages is added.
165 :returns: A single directory name, or None if no such directory
169 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
170 # in which case, plot a course to likely places in the src tree
171 import __main__ as main
173 if os.getenv("VPP_API_DIR"):
174 dirs.append(os.getenv("VPP_API_DIR"))
176 if hasattr(main, "__file__"):
177 # get the path of the calling script
178 localdir = os.path.dirname(os.path.realpath(main.__file__))
180 # use cwd if there is no calling script
181 localdir = os.getcwd()
182 localdir_s = localdir.split(os.path.sep)
185 """Match dir against right-hand components of the script dir"""
186 d = dir.split("/") # param 'dir' assumes a / separator
188 return len(localdir_s) > length and localdir_s[-length:] == d
190 def sdir(srcdir, variant):
191 """Build a path from srcdir to the staged API files of
192 'variant' (typically '' or '_debug')"""
193 # Since 'core' and 'plugin' files are staged
194 # in separate directories, we target the parent dir.
195 return os.path.sep.join(
199 "install-vpp%s-native" % variant,
208 if dmatch("src/scripts"):
209 srcdir = os.path.sep.join(localdir_s[:-2])
210 elif dmatch("src/vpp-api/python"):
211 srcdir = os.path.sep.join(localdir_s[:-3])
213 # we're apparently running tests
214 srcdir = os.path.sep.join(localdir_s[:-1])
217 # we're in the source tree, try both the debug and release
219 dirs.append(sdir(srcdir, "_debug"))
220 dirs.append(sdir(srcdir, ""))
222 # Test for staged copies of the scripts
223 # For these, since we explicitly know if we're running a debug versus
224 # release variant, target only the relevant directory
225 if dmatch("build-root/install-vpp_debug-native/vpp/bin"):
226 srcdir = os.path.sep.join(localdir_s[:-4])
227 dirs.append(sdir(srcdir, "_debug"))
228 if dmatch("build-root/install-vpp-native/vpp/bin"):
229 srcdir = os.path.sep.join(localdir_s[:-4])
230 dirs.append(sdir(srcdir, ""))
232 # finally, try the location system packages typically install into
233 dirs.append(os.path.sep.join(("", "usr", "share", "vpp", "api")))
235 # check the directories for existence; first one wins
237 if os.path.isdir(dir):
243 def find_api_files(cls, api_dir=None, patterns="*"): # -> list
244 """Find API definition files from the given directory tree with the
245 given pattern. If no directory is given then find_api_dir() is used
246 to locate one. If no pattern is given then all definition files found
247 in the directory tree are used.
249 :param api_dir: A directory tree in which to locate API definition
250 files; subdirectories are descended into.
251 If this is None then find_api_dir() is called to discover it.
252 :param patterns: A list of patterns to use in each visited directory
253 when looking for files.
254 This can be a list/tuple object or a comma-separated string of
255 patterns. Each value in the list will have leading/trialing
257 The pattern specifies the first part of the filename, '.api.json'
259 The results are de-duplicated, thus overlapping patterns are fine.
260 If this is None it defaults to '*' meaning "all API files".
261 :returns: A list of file paths for the API files found.
264 api_dir = cls.find_api_dir([])
266 raise VPPApiError("api_dir cannot be located")
268 if isinstance(patterns, list) or isinstance(patterns, tuple):
269 patterns = [p.strip() + ".api.json" for p in patterns]
271 patterns = [p.strip() + ".api.json" for p in patterns.split(",")]
274 for root, dirnames, files in os.walk(api_dir):
275 # iterate all given patterns and de-dup the result
276 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
277 for filename in files:
278 api_files.append(os.path.join(root, filename))
283 def process_json_file(self, apidef_file):
284 return self._process_json(apidef_file.read())
287 def process_json_str(self, json_str):
288 return self._process_json(json_str)
291 def _process_json(json_str): # -> Tuple[Dict, Dict]
292 api = json.loads(json_str)
297 for t in api["enums"]:
298 t[0] = "vl_api_" + t[0] + "_t"
299 types[t[0]] = {"type": "enum", "data": t}
303 for t in api["enumflags"]:
304 t[0] = "vl_api_" + t[0] + "_t"
305 types[t[0]] = {"type": "enum", "data": t}
309 for t in api["unions"]:
310 t[0] = "vl_api_" + t[0] + "_t"
311 types[t[0]] = {"type": "union", "data": t}
316 for t in api["types"]:
317 t[0] = "vl_api_" + t[0] + "_t"
318 types[t[0]] = {"type": "type", "data": t}
323 for t, v in api["aliases"].items():
324 types["vl_api_" + t + "_t"] = {"type": "alias", "data": v}
329 services.update(api["services"])
336 for k, v in types.items():
338 if not vpp_get_type(k):
339 if v["type"] == "enum":
341 VPPEnumType(t[0], t[1:])
344 if not vpp_get_type(k):
345 if v["type"] == "enumflag":
347 VPPEnumFlagType(t[0], t[1:])
350 elif v["type"] == "union":
352 VPPUnionType(t[0], t[1:])
355 elif v["type"] == "type":
360 elif v["type"] == "alias":
365 if len(unresolved) == 0:
368 raise VPPValueError("Unresolved type definitions {}".format(unresolved))
372 for m in api["messages"]:
374 messages[m[0]] = VPPMessage(m[0], m[1:])
375 except VPPNotImplementedError:
377 logger.error("Not implemented error for {}".format(m[0]))
380 return messages, services
383 def load_api(apifiles=None, apidir=None):
387 # Pick up API definitions from default directory
389 if isinstance(apidir, list):
392 apifiles += VPPApiJSONFiles.find_api_files(d)
394 apifiles = VPPApiJSONFiles.find_api_files(apidir)
395 except (RuntimeError, VPPApiError):
396 raise VPPRuntimeError
398 for file in apifiles:
399 with open(file) as apidef_file:
400 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
404 return apifiles, messages, services
410 This class provides the APIs to VPP. The APIs are loaded
411 from provided .api.json files and makes functions accordingly.
412 These functions are documented in the VPP .api files, as they
413 are dynamically created.
415 Additionally, VPP can send callback messages; this class
416 provides a means to register a callback function to receive
417 these messages in a background thread.
420 VPPApiError = VPPApiError
421 VPPRuntimeError = VPPRuntimeError
422 VPPValueError = VPPValueError
423 VPPNotImplementedError = VPPNotImplementedError
424 VPPIOError = VPPIOError
437 server_address="/run/vpp/api.sock",
439 """Create a VPP API object.
441 apifiles is a list of files containing API
442 descriptions that will be loaded - methods will be
443 dynamically created reflecting these APIs. If not
444 provided this will load the API files from VPP's
445 default install location.
447 logger, if supplied, is the logging logger object to log to.
448 loglevel, if supplied, is the log level this logger is set
449 to report at (from the loglevels in the logging module).
452 logger = logging.getLogger(
453 "{}.{}".format(__name__, self.__class__.__name__)
455 if loglevel is not None:
456 logger.setLevel(loglevel)
463 self.header = VPPType("header", [["u16", "msgid"], ["u32", "client_index"]])
466 self.event_callback = None
467 self.message_queue = queue.Queue()
468 self.read_timeout = read_timeout
469 self.async_thread = async_thread
470 self.event_thread = None
471 self.testmode = testmode
472 self.server_address = server_address
473 self._apifiles = apifiles
476 if self.apidir is None and hasattr(self.__class__, "apidir"):
477 # Keep supporting the old style of providing apidir.
478 self.apidir = self.__class__.apidir
480 self.apifiles, self.messages, self.services = VPPApiJSONFiles.load_api(
481 apifiles, self.apidir
483 except VPPRuntimeError as e:
490 if len(self.messages) == 0 and not testmode:
491 raise VPPValueError(1, "Missing JSON message definitions")
492 if not (verify_enum_hint(VppEnum.vl_api_address_family_t)):
493 raise VPPRuntimeError("Invalid address family hints. " "Cannot continue.")
495 self.transport = VppTransport(
496 self, read_timeout=read_timeout, server_address=server_address
498 # Make sure we allow VPP to clean up the message rings.
499 atexit.register(vpp_atexit, weakref.ref(self))
501 add_convenience_methods()
503 def get_function(self, name):
504 return getattr(self._api, name)
507 """Multiprocessing-safe provider of unique context IDs."""
510 self.context = mp.Value(ctypes.c_uint, 0)
511 self.lock = mp.Lock()
514 """Get a new unique (or, at least, not recently used) context."""
516 self.context.value += 1
517 return self.context.value
519 get_context = ContextId()
521 def get_type(self, name):
522 return vpp_get_type(name)
526 if not hasattr(self, "_api"):
527 raise VPPApiError("Not connected, api definitions not available")
530 def make_function(self, msg, i, multipart, do_async):
534 return self._call_vpp_async(i, msg, **kwargs)
539 return self._call_vpp(i, msg, multipart, **kwargs)
541 f.__name__ = str(msg.name)
542 f.__doc__ = ", ".join(
543 ["%s %s" % (msg.fieldtypes[j], k) for j, k in enumerate(msg.fields)]
549 def make_pack_function(self, msg, i, multipart):
551 return self._call_vpp_pack(i, msg, **kwargs)
556 def _register_functions(self, do_async=False):
557 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
558 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
559 self._api = VppApiDynamicMethodHolder()
560 for name, msg in self.messages.items():
561 n = name + "_" + msg.crc[2:]
562 i = self.transport.get_msg_index(n)
564 self.id_msgdef[i] = msg
565 self.id_names[i] = name
567 # Create function for client side messages.
568 if name in self.services:
569 f = self.make_function(msg, i, self.services[name], do_async)
570 f_pack = self.make_pack_function(msg, i, self.services[name])
571 setattr(self._api, name, FuncWrapper(f))
572 setattr(self._api, name + "_pack", FuncWrapper(f_pack))
574 self.logger.debug("No such message type or failed CRC checksum: %s", n)
576 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, do_async):
577 pfx = chroot_prefix.encode("utf-8") if chroot_prefix else None
579 rv = self.transport.connect(name, pfx, msg_handler, rx_qlen, do_async)
581 raise VPPIOError(2, "Connect failed")
582 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
583 self._register_functions(do_async=do_async)
585 # Initialise control ping
586 crc = self.messages["control_ping"].crc
587 self.control_ping_index = self.transport.get_msg_index(
588 ("control_ping" + "_" + crc[2:])
590 self.control_ping_msgdef = self.messages["control_ping"]
591 if self.async_thread:
592 self.event_thread = threading.Thread(target=self.thread_msg_handler)
593 self.event_thread.daemon = True
594 self.event_thread.start()
596 self.event_thread = None
599 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
602 name - the name of the client.
603 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
604 do_async - if true, messages are sent without waiting for a reply
605 rx_qlen - the length of the VPP message receive queue between
608 msg_handler = self.transport.get_callback(do_async)
609 return self.connect_internal(
610 name, msg_handler, chroot_prefix, rx_qlen, do_async
613 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
614 """Attach to VPP in synchronous mode. Application must poll for events.
616 name - the name of the client.
617 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
618 rx_qlen - the length of the VPP message receive queue between
622 return self.connect_internal(name, None, chroot_prefix, rx_qlen, do_async=False)
624 def disconnect(self):
625 """Detach from VPP."""
626 rv = self.transport.disconnect()
627 if self.event_thread is not None:
628 self.message_queue.put("terminate event thread")
631 def msg_handler_sync(self, msg):
632 """Process an incoming message from VPP in sync mode.
634 The message may be a reply or it may be an async notification.
636 r = self.decode_incoming_msg(msg)
640 # If we have a context, then use the context to find any
641 # request waiting for a reply
643 if hasattr(r, "context") and r.context > 0:
647 # No context -> async notification that we feed to the callback
648 self.message_queue.put_nowait(r)
650 raise VPPIOError(2, "RPC reply message received in event handler")
652 def has_context(self, msg):
657 "header_with_context",
658 [["u16", "msgid"], ["u32", "client_index"], ["u32", "context"]],
661 (i, ci, context), size = header.unpack(msg, 0)
662 if self.id_names[i] == "rx_thread_exit":
666 # Decode message and returns a tuple.
668 msgobj = self.id_msgdef[i]
669 if "context" in msgobj.field_by_name and context >= 0:
673 def decode_incoming_msg(self, msg, no_type_conversion=False):
675 logger.warning("vpp_api.read failed")
678 (i, ci), size = self.header.unpack(msg, 0)
679 if self.id_names[i] == "rx_thread_exit":
683 # Decode message and returns a tuple.
685 msgobj = self.id_msgdef[i]
687 raise VPPIOError(2, "Reply message undefined")
689 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
692 def msg_handler_async(self, msg):
693 """Process a message from VPP in async mode.
695 In async mode, all messages are returned to the callback.
697 r = self.decode_incoming_msg(msg)
701 msgname = type(r).__name__
703 if self.event_callback:
704 self.event_callback(msgname, r)
706 def _control_ping(self, context):
707 """Send a ping command."""
708 self._call_vpp_async(
709 self.control_ping_index, self.control_ping_msgdef, context=context
712 def validate_args(self, msg, kwargs):
713 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
715 raise VPPValueError("Invalid argument {} to {}".format(list(d), msg.name))
717 def _add_stat(self, name, ms):
718 if not name in self.stats:
719 self.stats[name] = {"max": ms, "count": 1, "avg": ms}
721 if ms > self.stats[name]["max"]:
722 self.stats[name]["max"] = ms
723 self.stats[name]["count"] += 1
724 n = self.stats[name]["count"]
725 self.stats[name]["avg"] = self.stats[name]["avg"] * (n - 1) / n + ms / n
728 s = "\n=== API PAPI STATISTICS ===\n"
729 s += "{:<30} {:>4} {:>6} {:>6}\n".format("message", "cnt", "avg", "max")
730 for n in sorted(self.stats.items(), key=lambda v: v[1]["avg"], reverse=True):
731 s += "{:<30} {:>4} {:>6.2f} {:>6.2f}\n".format(
732 n[0], n[1]["count"], n[1]["avg"], n[1]["max"]
736 def get_field_options(self, msg, fld_name):
737 # when there is an option, the msgdef has 3 elements.
738 # ['u32', 'ring_size', {'default': 1024}]
739 for _def in self.messages[msg].msgdef:
740 if isinstance(_def, list) and len(_def) == 3 and _def[1] == fld_name:
743 def _call_vpp(self, i, msgdef, service, **kwargs):
744 """Given a message, send the message and await a reply.
746 msgdef - the message packing definition
747 i - the message type index
748 multipart - True if the message returns multiple
750 context - context number - chosen at random if not
752 The remainder of the kwargs are the arguments to the API call.
754 The return value is the message or message array containing
755 the response. It will raise an IOError exception if there was
756 no response within the timeout window.
759 if "context" not in kwargs:
760 context = self.get_context()
761 kwargs["context"] = context
763 context = kwargs["context"]
764 kwargs["_vl_msg_id"] = i
766 no_type_conversion = kwargs.pop("_no_type_conversion", False)
767 timeout = kwargs.pop("_timeout", None)
770 if self.transport.socket_index:
771 kwargs["client_index"] = self.transport.socket_index
772 except AttributeError:
774 self.validate_args(msgdef, kwargs)
776 s = "Calling {}({})".format(
777 msgdef.name, ",".join(["{!r}:{!r}".format(k, v) for k, v in kwargs.items()])
781 b = msgdef.pack(kwargs)
782 self.transport.suspend()
784 self.transport.write(b)
786 msgreply = service["reply"]
787 stream = True if "stream" in service else False
789 if "stream_msg" in service:
790 # New service['reply'] = _reply and service['stream_message'] = _details
791 stream_message = service["stream_msg"]
794 # Old service['reply'] = _details
795 stream_message = msgreply
796 msgreply = "control_ping_reply"
798 # Send a ping after the request - we use its response
799 # to detect that we have seen all results.
800 self._control_ping(context)
802 # Block until we get a reply.
805 r = self.read_blocking(no_type_conversion, timeout)
807 raise VPPIOError(2, "VPP API client: read failed")
808 msgname = type(r).__name__
809 if context not in r or r.context == 0 or context != r.context:
810 # Message being queued
811 self.message_queue.put_nowait(r)
813 if msgname != msgreply and (stream and (msgname != stream_message)):
814 print("REPLY MISMATCH", msgreply, msgname, stream_message, stream)
818 if msgname == msgreply:
819 if modern: # Return both reply and list
825 self.transport.resume()
827 s = "Return value: {!r}".format(r)
832 self._add_stat(msgdef.name, (te - ts) * 1000)
835 def _call_vpp_async(self, i, msg, **kwargs):
836 """Given a message, send the message and return the context.
838 msgdef - the message packing definition
839 i - the message type index
840 context - context number - chosen at random if not
842 The remainder of the kwargs are the arguments to the API call.
844 The reply message(s) will be delivered later to the registered callback.
845 The returned context will help with assigning which call
846 the reply belongs to.
848 if "context" not in kwargs:
849 context = self.get_context()
850 kwargs["context"] = context
852 context = kwargs["context"]
854 if self.transport.socket_index:
855 kwargs["client_index"] = self.transport.socket_index
856 except AttributeError:
857 kwargs["client_index"] = 0
858 kwargs["_vl_msg_id"] = i
861 self.transport.write(b)
864 def _call_vpp_pack(self, i, msg, **kwargs):
865 """Given a message, return the binary representation."""
866 kwargs["_vl_msg_id"] = i
867 kwargs["client_index"] = 0
868 kwargs["context"] = 0
869 return msg.pack(kwargs)
871 def read_blocking(self, no_type_conversion=False, timeout=None):
872 """Get next received message from transport within timeout, decoded.
874 Note that notifications have context zero
875 and are not put into receive queue (at least for socket transport),
876 use async_thread with registered callback for processing them.
878 If no message appears in the queue within timeout, return None.
880 Optionally, type conversion can be skipped,
881 as some of conversions are into less precise types.
883 When r is the return value of this, the caller can get message name as:
884 msgname = type(r).__name__
885 and context number (type long) as:
888 :param no_type_conversion: If false, type conversions are applied.
889 :type no_type_conversion: bool
890 :returns: Decoded message, or None if no message (within timeout).
891 :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
892 :raises VppTransportShmemIOError if timed out.
894 msg = self.transport.read(timeout=timeout)
897 return self.decode_incoming_msg(msg, no_type_conversion)
899 def register_event_callback(self, callback):
900 """Register a callback for async messages.
902 This will be called for async notifications in sync mode,
903 and all messages in async mode. In sync mode, replies to
904 requests will not come here.
906 callback is a fn(msg_type_name, msg_type) that will be
907 called when a message comes in. While this function is
908 executing, note that (a) you are in a background thread and
909 may wish to use threading.Lock to protect your datastructures,
910 and (b) message processing from VPP will stop (so if you take
911 a long while about it you may provoke reply timeouts or cause
912 VPP to fill the RX buffer). Passing None will disable the
915 self.event_callback = callback
917 def thread_msg_handler(self):
918 """Python thread calling the user registered message handler.
920 This is to emulate the old style event callback scheme. Modern
921 clients should provide their own thread to poll the event
925 r = self.message_queue.get()
926 if r == "terminate event thread":
928 msgname = type(r).__name__
929 if self.event_callback:
930 self.event_callback(msgname, r)
932 def validate_message_table(self, namecrctable):
933 """Take a dictionary of name_crc message names
934 and returns an array of missing messages"""
937 for name_crc in namecrctable:
938 i = self.transport.get_msg_index(name_crc)
940 missing_table.append(name_crc)
943 def dump_message_table(self):
944 """Return VPPs API message table as name_crc dictionary"""
945 return self.transport.message_table
947 def dump_message_table_filtered(self, msglist):
948 """Return VPPs API message table as name_crc dictionary,
949 filtered by message name list."""
951 replies = [self.services[n]["reply"] for n in msglist]
952 message_table_filtered = {}
953 for name in msglist + replies:
954 for k, v in self.transport.message_table.items():
955 if k.startswith(name):
956 message_table_filtered[k] = v
958 return message_table_filtered
962 "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, "
963 "logger=%s, read_timeout=%s, "
964 "server_address='%s'>"
975 def details_iter(self, f, **kwargs):
978 kwargs["cursor"] = cursor
979 rv, details = f(**kwargs)
982 if rv.retval == 0 or rv.retval != -165: