3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
22 import multiprocessing as mp
33 from .vpp_format import verify_enum_hint
34 from .vpp_serializer import VPPType, VPPEnumType, VPPEnumFlagType, VPPUnionType
35 from .vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
39 except ModuleNotFoundError:
42 """placeholder for VppTransport as the implementation is dependent on
43 VPPAPIClient's initialization values
48 from .vpp_transport_socket import VppTransport
50 logger = logging.getLogger("vpp_papi")
51 logger.addHandler(logging.NullHandler())
55 "VppApiDynamicMethodHolder",
66 def metaclass(metaclass):
67 @functools.wraps(metaclass)
69 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
74 class VppEnumType(type):
75 def __getattr__(cls, name):
76 t = vpp_get_type(name)
80 @metaclass(VppEnumType)
85 @metaclass(VppEnumType)
90 def vpp_atexit(vpp_weakref):
91 """Clean up VPP connection on shutdown."""
92 vpp_instance = vpp_weakref()
93 if vpp_instance and vpp_instance.transport.connected:
94 logger.debug("Cleaning up VPP on exit")
95 vpp_instance.disconnect()
98 def add_convenience_methods():
99 # provide convenience methods to IP[46]Address.vapi_af
101 if 6 == self._version:
102 return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
103 if 4 == self._version:
104 return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
105 raise ValueError("Invalid _version.")
107 def _vapi_af_name(self):
108 if 6 == self._version:
110 if 4 == self._version:
112 raise ValueError("Invalid _version.")
114 ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
115 ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
118 class VppApiDynamicMethodHolder:
123 def __init__(self, func):
125 self.__name__ = func.__name__
126 self.__doc__ = func.__doc__
128 def __call__(self, **kwargs):
129 return self._func(**kwargs)
132 return "<FuncWrapper(func=<%s(%s)>)>" % (self.__name__, self.__doc__)
135 class VPPApiError(Exception):
139 class VPPNotImplementedError(NotImplementedError):
143 class VPPIOError(IOError):
147 class VPPRuntimeError(RuntimeError):
151 class VPPValueError(ValueError):
155 class VPPApiJSONFiles:
157 def find_api_dir(cls, dirs=[]):
158 """Attempt to find the best directory in which API definition
159 files may reside. If the value VPP_API_DIR exists in the environment
160 then it is first on the search list. If we're inside a recognized
161 location in a VPP source tree (src/scripts and src/vpp-api/python)
162 then entries from there to the likely locations in build-root are
163 added. Finally the location used by system packages is added.
165 :returns: A single directory name, or None if no such directory
169 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
170 # in which case, plot a course to likely places in the src tree
171 import __main__ as main
173 if os.getenv("VPP_API_DIR"):
174 dirs.append(os.getenv("VPP_API_DIR"))
176 if hasattr(main, "__file__"):
177 # get the path of the calling script
178 localdir = os.path.dirname(os.path.realpath(main.__file__))
180 # use cwd if there is no calling script
181 localdir = os.getcwd()
182 localdir_s = localdir.split(os.path.sep)
185 """Match dir against right-hand components of the script dir"""
186 d = dir.split("/") # param 'dir' assumes a / separator
188 return len(localdir_s) > length and localdir_s[-length:] == d
190 def sdir(srcdir, variant):
191 """Build a path from srcdir to the staged API files of
192 'variant' (typically '' or '_debug')"""
193 # Since 'core' and 'plugin' files are staged
194 # in separate directories, we target the parent dir.
195 return os.path.sep.join(
199 "install-vpp%s-native" % variant,
208 if dmatch("src/scripts"):
209 srcdir = os.path.sep.join(localdir_s[:-2])
210 elif dmatch("src/vpp-api/python"):
211 srcdir = os.path.sep.join(localdir_s[:-3])
213 # we're apparently running tests
214 srcdir = os.path.sep.join(localdir_s[:-1])
217 # we're in the source tree, try both the debug and release
219 dirs.append(sdir(srcdir, "_debug"))
220 dirs.append(sdir(srcdir, ""))
222 # Test for staged copies of the scripts
223 # For these, since we explicitly know if we're running a debug versus
224 # release variant, target only the relevant directory
225 if dmatch("build-root/install-vpp_debug-native/vpp/bin"):
226 srcdir = os.path.sep.join(localdir_s[:-4])
227 dirs.append(sdir(srcdir, "_debug"))
228 if dmatch("build-root/install-vpp-native/vpp/bin"):
229 srcdir = os.path.sep.join(localdir_s[:-4])
230 dirs.append(sdir(srcdir, ""))
232 # finally, try the location system packages typically install into
233 dirs.append(os.path.sep.join(("", "usr", "share", "vpp", "api")))
235 # check the directories for existence; first one wins
237 if os.path.isdir(dir):
243 def find_api_files(cls, api_dir=None, patterns="*"): # -> list
244 """Find API definition files from the given directory tree with the
245 given pattern. If no directory is given then find_api_dir() is used
246 to locate one. If no pattern is given then all definition files found
247 in the directory tree are used.
249 :param api_dir: A directory tree in which to locate API definition
250 files; subdirectories are descended into.
251 If this is None then find_api_dir() is called to discover it.
252 :param patterns: A list of patterns to use in each visited directory
253 when looking for files.
254 This can be a list/tuple object or a comma-separated string of
255 patterns. Each value in the list will have leading/trialing
257 The pattern specifies the first part of the filename, '.api.json'
259 The results are de-duplicated, thus overlapping patterns are fine.
260 If this is None it defaults to '*' meaning "all API files".
261 :returns: A list of file paths for the API files found.
264 api_dir = cls.find_api_dir([])
266 raise VPPApiError("api_dir cannot be located")
268 if isinstance(patterns, list) or isinstance(patterns, tuple):
269 patterns = [p.strip() + ".api.json" for p in patterns]
271 patterns = [p.strip() + ".api.json" for p in patterns.split(",")]
274 for root, dirnames, files in os.walk(api_dir):
275 # iterate all given patterns and de-dup the result
276 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
277 for filename in files:
278 api_files.append(os.path.join(root, filename))
283 def process_json_file(self, apidef_file):
284 return self._process_json(apidef_file.read())
287 def process_json_str(self, json_str):
288 return self._process_json(json_str)
291 def _process_json(json_str): # -> Tuple[Dict, Dict]
292 api = json.loads(json_str)
297 for t in api["enums"]:
298 t[0] = "vl_api_" + t[0] + "_t"
299 types[t[0]] = {"type": "enum", "data": t}
303 for t in api["enumflags"]:
304 t[0] = "vl_api_" + t[0] + "_t"
305 types[t[0]] = {"type": "enum", "data": t}
309 for t in api["unions"]:
310 t[0] = "vl_api_" + t[0] + "_t"
311 types[t[0]] = {"type": "union", "data": t}
316 for t in api["types"]:
317 t[0] = "vl_api_" + t[0] + "_t"
318 types[t[0]] = {"type": "type", "data": t}
323 for t, v in api["aliases"].items():
324 types["vl_api_" + t + "_t"] = {"type": "alias", "data": v}
329 services.update(api["services"])
336 for k, v in types.items():
338 if not vpp_get_type(k):
339 if v["type"] == "enum":
341 VPPEnumType(t[0], t[1:])
344 if not vpp_get_type(k):
345 if v["type"] == "enumflag":
347 VPPEnumFlagType(t[0], t[1:])
350 elif v["type"] == "union":
352 VPPUnionType(t[0], t[1:])
355 elif v["type"] == "type":
360 elif v["type"] == "alias":
365 if len(unresolved) == 0:
368 raise VPPValueError("Unresolved type definitions {}".format(unresolved))
372 for m in api["messages"]:
374 messages[m[0]] = VPPMessage(m[0], m[1:])
375 except VPPNotImplementedError:
377 logger.error("Not implemented error for {}".format(m[0]))
380 return messages, services
383 def load_api(apifiles=None, apidir=None):
387 # Pick up API definitions from default directory
389 if isinstance(apidir, list):
392 apifiles += VPPApiJSONFiles.find_api_files(d)
394 apifiles = VPPApiJSONFiles.find_api_files(apidir)
395 except (RuntimeError, VPPApiError):
396 raise VPPRuntimeError
398 for file in apifiles:
399 with open(file) as apidef_file:
400 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
404 return apifiles, messages, services
410 This class provides the APIs to VPP. The APIs are loaded
411 from provided .api.json files and makes functions accordingly.
412 These functions are documented in the VPP .api files, as they
413 are dynamically created.
415 Additionally, VPP can send callback messages; this class
416 provides a means to register a callback function to receive
417 these messages in a background thread.
420 VPPApiError = VPPApiError
421 VPPRuntimeError = VPPRuntimeError
422 VPPValueError = VPPValueError
423 VPPNotImplementedError = VPPNotImplementedError
424 VPPIOError = VPPIOError
437 server_address="/run/vpp/api.sock",
439 """Create a VPP API object.
441 apifiles is a list of files containing API
442 descriptions that will be loaded - methods will be
443 dynamically created reflecting these APIs. If not
444 provided this will load the API files from VPP's
445 default install location.
447 logger, if supplied, is the logging logger object to log to.
448 loglevel, if supplied, is the log level this logger is set
449 to report at (from the loglevels in the logging module).
452 logger = logging.getLogger(
453 "{}.{}".format(__name__, self.__class__.__name__)
455 if loglevel is not None:
456 logger.setLevel(loglevel)
463 self.header = VPPType("header", [["u16", "msgid"], ["u32", "client_index"]])
466 self.event_callback = None
467 self.message_queue = queue.Queue()
468 self.read_timeout = read_timeout
469 self.async_thread = async_thread
470 self.event_thread = None
471 self.testmode = testmode
472 self.server_address = server_address
473 self._apifiles = apifiles
477 self.apifiles, self.messages, self.services = VPPApiJSONFiles.load_api(
480 except VPPRuntimeError as e:
487 if len(self.messages) == 0 and not testmode:
488 raise VPPValueError(1, "Missing JSON message definitions")
489 if not (verify_enum_hint(VppEnum.vl_api_address_family_t)):
490 raise VPPRuntimeError("Invalid address family hints. " "Cannot continue.")
492 self.transport = VppTransport(
493 self, read_timeout=read_timeout, server_address=server_address
495 # Make sure we allow VPP to clean up the message rings.
496 atexit.register(vpp_atexit, weakref.ref(self))
498 add_convenience_methods()
500 def get_function(self, name):
501 return getattr(self._api, name)
504 """Multiprocessing-safe provider of unique context IDs."""
507 self.context = mp.Value(ctypes.c_uint, 0)
508 self.lock = mp.Lock()
511 """Get a new unique (or, at least, not recently used) context."""
513 self.context.value += 1
514 return self.context.value
516 get_context = ContextId()
518 def get_type(self, name):
519 return vpp_get_type(name)
523 if not hasattr(self, "_api"):
524 raise VPPApiError("Not connected, api definitions not available")
527 def make_function(self, msg, i, multipart, do_async):
531 return self._call_vpp_async(i, msg, **kwargs)
536 return self._call_vpp(i, msg, multipart, **kwargs)
538 f.__name__ = str(msg.name)
539 f.__doc__ = ", ".join(
540 ["%s %s" % (msg.fieldtypes[j], k) for j, k in enumerate(msg.fields)]
546 def make_pack_function(self, msg, i, multipart):
548 return self._call_vpp_pack(i, msg, **kwargs)
553 def _register_functions(self, do_async=False):
554 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
555 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
556 self._api = VppApiDynamicMethodHolder()
557 for name, msg in self.messages.items():
558 n = name + "_" + msg.crc[2:]
559 i = self.transport.get_msg_index(n)
561 self.id_msgdef[i] = msg
562 self.id_names[i] = name
564 # Create function for client side messages.
565 if name in self.services:
566 f = self.make_function(msg, i, self.services[name], do_async)
567 f_pack = self.make_pack_function(msg, i, self.services[name])
568 setattr(self._api, name, FuncWrapper(f))
569 setattr(self._api, name + "_pack", FuncWrapper(f_pack))
571 self.logger.debug("No such message type or failed CRC checksum: %s", n)
573 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, do_async):
574 pfx = chroot_prefix.encode("utf-8") if chroot_prefix else None
576 rv = self.transport.connect(name, pfx, msg_handler, rx_qlen, do_async)
578 raise VPPIOError(2, "Connect failed")
579 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
580 self._register_functions(do_async=do_async)
582 # Initialise control ping
583 crc = self.messages["control_ping"].crc
584 self.control_ping_index = self.transport.get_msg_index(
585 ("control_ping" + "_" + crc[2:])
587 self.control_ping_msgdef = self.messages["control_ping"]
588 if self.async_thread:
589 self.event_thread = threading.Thread(target=self.thread_msg_handler)
590 self.event_thread.daemon = True
591 self.event_thread.start()
593 self.event_thread = None
596 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
599 name - the name of the client.
600 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
601 do_async - if true, messages are sent without waiting for a reply
602 rx_qlen - the length of the VPP message receive queue between
605 msg_handler = self.transport.get_callback(do_async)
606 return self.connect_internal(
607 name, msg_handler, chroot_prefix, rx_qlen, do_async
610 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
611 """Attach to VPP in synchronous mode. Application must poll for events.
613 name - the name of the client.
614 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
615 rx_qlen - the length of the VPP message receive queue between
619 return self.connect_internal(name, None, chroot_prefix, rx_qlen, do_async=False)
621 def disconnect(self):
622 """Detach from VPP."""
623 rv = self.transport.disconnect()
624 if self.event_thread is not None:
625 self.message_queue.put("terminate event thread")
628 def msg_handler_sync(self, msg):
629 """Process an incoming message from VPP in sync mode.
631 The message may be a reply or it may be an async notification.
633 r = self.decode_incoming_msg(msg)
637 # If we have a context, then use the context to find any
638 # request waiting for a reply
640 if hasattr(r, "context") and r.context > 0:
644 # No context -> async notification that we feed to the callback
645 self.message_queue.put_nowait(r)
647 raise VPPIOError(2, "RPC reply message received in event handler")
649 def has_context(self, msg):
654 "header_with_context",
655 [["u16", "msgid"], ["u32", "client_index"], ["u32", "context"]],
658 (i, ci, context), size = header.unpack(msg, 0)
659 if self.id_names[i] == "rx_thread_exit":
663 # Decode message and returns a tuple.
665 msgobj = self.id_msgdef[i]
666 if "context" in msgobj.field_by_name and context >= 0:
670 def decode_incoming_msg(self, msg, no_type_conversion=False):
672 logger.warning("vpp_api.read failed")
675 (i, ci), size = self.header.unpack(msg, 0)
676 if self.id_names[i] == "rx_thread_exit":
680 # Decode message and returns a tuple.
682 msgobj = self.id_msgdef[i]
684 raise VPPIOError(2, "Reply message undefined")
686 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
689 def msg_handler_async(self, msg):
690 """Process a message from VPP in async mode.
692 In async mode, all messages are returned to the callback.
694 r = self.decode_incoming_msg(msg)
698 msgname = type(r).__name__
700 if self.event_callback:
701 self.event_callback(msgname, r)
703 def _control_ping(self, context):
704 """Send a ping command."""
705 self._call_vpp_async(
706 self.control_ping_index, self.control_ping_msgdef, context=context
709 def validate_args(self, msg, kwargs):
710 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
712 raise VPPValueError("Invalid argument {} to {}".format(list(d), msg.name))
714 def _add_stat(self, name, ms):
715 if not name in self.stats:
716 self.stats[name] = {"max": ms, "count": 1, "avg": ms}
718 if ms > self.stats[name]["max"]:
719 self.stats[name]["max"] = ms
720 self.stats[name]["count"] += 1
721 n = self.stats[name]["count"]
722 self.stats[name]["avg"] = self.stats[name]["avg"] * (n - 1) / n + ms / n
725 s = "\n=== API PAPI STATISTICS ===\n"
726 s += "{:<30} {:>4} {:>6} {:>6}\n".format("message", "cnt", "avg", "max")
727 for n in sorted(self.stats.items(), key=lambda v: v[1]["avg"], reverse=True):
728 s += "{:<30} {:>4} {:>6.2f} {:>6.2f}\n".format(
729 n[0], n[1]["count"], n[1]["avg"], n[1]["max"]
733 def get_field_options(self, msg, fld_name):
734 # when there is an option, the msgdef has 3 elements.
735 # ['u32', 'ring_size', {'default': 1024}]
736 for _def in self.messages[msg].msgdef:
737 if isinstance(_def, list) and len(_def) == 3 and _def[1] == fld_name:
740 def _call_vpp(self, i, msgdef, service, **kwargs):
741 """Given a message, send the message and await a reply.
743 msgdef - the message packing definition
744 i - the message type index
745 multipart - True if the message returns multiple
747 context - context number - chosen at random if not
749 The remainder of the kwargs are the arguments to the API call.
751 The return value is the message or message array containing
752 the response. It will raise an IOError exception if there was
753 no response within the timeout window.
756 if "context" not in kwargs:
757 context = self.get_context()
758 kwargs["context"] = context
760 context = kwargs["context"]
761 kwargs["_vl_msg_id"] = i
763 no_type_conversion = kwargs.pop("_no_type_conversion", False)
764 timeout = kwargs.pop("_timeout", None)
767 if self.transport.socket_index:
768 kwargs["client_index"] = self.transport.socket_index
769 except AttributeError:
771 self.validate_args(msgdef, kwargs)
773 s = "Calling {}({})".format(
774 msgdef.name, ",".join(["{!r}:{!r}".format(k, v) for k, v in kwargs.items()])
778 b = msgdef.pack(kwargs)
779 self.transport.suspend()
781 self.transport.write(b)
783 msgreply = service["reply"]
784 stream = True if "stream" in service else False
786 if "stream_msg" in service:
787 # New service['reply'] = _reply and service['stream_message'] = _details
788 stream_message = service["stream_msg"]
791 # Old service['reply'] = _details
792 stream_message = msgreply
793 msgreply = "control_ping_reply"
795 # Send a ping after the request - we use its response
796 # to detect that we have seen all results.
797 self._control_ping(context)
799 # Block until we get a reply.
802 r = self.read_blocking(no_type_conversion, timeout)
804 raise VPPIOError(2, "VPP API client: read failed")
805 msgname = type(r).__name__
806 if context not in r or r.context == 0 or context != r.context:
807 # Message being queued
808 self.message_queue.put_nowait(r)
810 if msgname != msgreply and (stream and (msgname != stream_message)):
811 print("REPLY MISMATCH", msgreply, msgname, stream_message, stream)
815 if msgname == msgreply:
816 if modern: # Return both reply and list
822 self.transport.resume()
824 s = "Return value: {!r}".format(r)
829 self._add_stat(msgdef.name, (te - ts) * 1000)
832 def _call_vpp_async(self, i, msg, **kwargs):
833 """Given a message, send the message and return the context.
835 msgdef - the message packing definition
836 i - the message type index
837 context - context number - chosen at random if not
839 The remainder of the kwargs are the arguments to the API call.
841 The reply message(s) will be delivered later to the registered callback.
842 The returned context will help with assigning which call
843 the reply belongs to.
845 if "context" not in kwargs:
846 context = self.get_context()
847 kwargs["context"] = context
849 context = kwargs["context"]
851 if self.transport.socket_index:
852 kwargs["client_index"] = self.transport.socket_index
853 except AttributeError:
854 kwargs["client_index"] = 0
855 kwargs["_vl_msg_id"] = i
858 self.transport.write(b)
861 def _call_vpp_pack(self, i, msg, **kwargs):
862 """Given a message, return the binary representation."""
863 kwargs["_vl_msg_id"] = i
864 kwargs["client_index"] = 0
865 kwargs["context"] = 0
866 return msg.pack(kwargs)
868 def read_blocking(self, no_type_conversion=False, timeout=None):
869 """Get next received message from transport within timeout, decoded.
871 Note that notifications have context zero
872 and are not put into receive queue (at least for socket transport),
873 use async_thread with registered callback for processing them.
875 If no message appears in the queue within timeout, return None.
877 Optionally, type conversion can be skipped,
878 as some of conversions are into less precise types.
880 When r is the return value of this, the caller can get message name as:
881 msgname = type(r).__name__
882 and context number (type long) as:
885 :param no_type_conversion: If false, type conversions are applied.
886 :type no_type_conversion: bool
887 :returns: Decoded message, or None if no message (within timeout).
888 :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
889 :raises VppTransportShmemIOError if timed out.
891 msg = self.transport.read(timeout=timeout)
894 return self.decode_incoming_msg(msg, no_type_conversion)
896 def register_event_callback(self, callback):
897 """Register a callback for async messages.
899 This will be called for async notifications in sync mode,
900 and all messages in async mode. In sync mode, replies to
901 requests will not come here.
903 callback is a fn(msg_type_name, msg_type) that will be
904 called when a message comes in. While this function is
905 executing, note that (a) you are in a background thread and
906 may wish to use threading.Lock to protect your datastructures,
907 and (b) message processing from VPP will stop (so if you take
908 a long while about it you may provoke reply timeouts or cause
909 VPP to fill the RX buffer). Passing None will disable the
912 self.event_callback = callback
914 def thread_msg_handler(self):
915 """Python thread calling the user registered message handler.
917 This is to emulate the old style event callback scheme. Modern
918 clients should provide their own thread to poll the event
922 r = self.message_queue.get()
923 if r == "terminate event thread":
925 msgname = type(r).__name__
926 if self.event_callback:
927 self.event_callback(msgname, r)
929 def validate_message_table(self, namecrctable):
930 """Take a dictionary of name_crc message names
931 and returns an array of missing messages"""
934 for name_crc in namecrctable:
935 i = self.transport.get_msg_index(name_crc)
937 missing_table.append(name_crc)
940 def dump_message_table(self):
941 """Return VPPs API message table as name_crc dictionary"""
942 return self.transport.message_table
944 def dump_message_table_filtered(self, msglist):
945 """Return VPPs API message table as name_crc dictionary,
946 filtered by message name list."""
948 replies = [self.services[n]["reply"] for n in msglist]
949 message_table_filtered = {}
950 for name in msglist + replies:
951 for k, v in self.transport.message_table.items():
952 if k.startswith(name):
953 message_table_filtered[k] = v
955 return message_table_filtered
959 "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, "
960 "logger=%s, read_timeout=%s, "
961 "server_address='%s'>"
972 def details_iter(self, f, **kwargs):
975 kwargs["cursor"] = cursor
976 rv, details = f(**kwargs)
979 if rv.retval == 0 or rv.retval != -165: