3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
22 import multiprocessing as mp
33 from .vpp_format import verify_enum_hint
34 from .vpp_serializer import VPPType, VPPEnumType, VPPEnumFlagType, VPPUnionType
35 from .vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
39 except ModuleNotFoundError:
42 """placeholder for VppTransport as the implementation is dependent on
43 VPPAPIClient's initialization values
48 from .vpp_transport_socket import VppTransport
50 logger = logging.getLogger("vpp_papi")
51 logger.addHandler(logging.NullHandler())
55 "VppApiDynamicMethodHolder",
66 def metaclass(metaclass):
67 @functools.wraps(metaclass)
69 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
74 class VppEnumType(type):
75 def __getattr__(cls, name):
76 t = vpp_get_type(name)
80 @metaclass(VppEnumType)
85 @metaclass(VppEnumType)
90 def vpp_atexit(vpp_weakref):
91 """Clean up VPP connection on shutdown."""
92 vpp_instance = vpp_weakref()
93 if vpp_instance and vpp_instance.transport.connected:
94 logger.debug("Cleaning up VPP on exit")
95 vpp_instance.disconnect()
98 def add_convenience_methods():
99 # provide convenience methods to IP[46]Address.vapi_af
101 if 6 == self._version:
102 return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
103 if 4 == self._version:
104 return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
105 raise ValueError("Invalid _version.")
107 def _vapi_af_name(self):
108 if 6 == self._version:
110 if 4 == self._version:
112 raise ValueError("Invalid _version.")
114 ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
115 ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
118 class VppApiDynamicMethodHolder:
123 def __init__(self, func):
125 self.__name__ = func.__name__
126 self.__doc__ = func.__doc__
128 def __call__(self, **kwargs):
129 return self._func(**kwargs)
132 return "<FuncWrapper(func=<%s(%s)>)>" % (self.__name__, self.__doc__)
135 class VPPApiError(Exception):
139 class VPPNotImplementedError(NotImplementedError):
143 class VPPIOError(IOError):
147 class VPPRuntimeError(RuntimeError):
151 class VPPValueError(ValueError):
155 class VPPApiJSONFiles:
157 def find_api_dir(cls, dirs):
158 """Attempt to find the best directory in which API definition
159 files may reside. If the value VPP_API_DIR exists in the environment
160 then it is first on the search list. If we're inside a recognized
161 location in a VPP source tree (src/scripts and src/vpp-api/python)
162 then entries from there to the likely locations in build-root are
163 added. Finally the location used by system packages is added.
165 :returns: A single directory name, or None if no such directory
169 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
170 # in which case, plot a course to likely places in the src tree
171 import __main__ as main
173 if hasattr(main, "__file__"):
174 # get the path of the calling script
175 localdir = os.path.dirname(os.path.realpath(main.__file__))
177 # use cwd if there is no calling script
178 localdir = os.getcwd()
179 localdir_s = localdir.split(os.path.sep)
182 """Match dir against right-hand components of the script dir"""
183 d = dir.split("/") # param 'dir' assumes a / separator
185 return len(localdir_s) > length and localdir_s[-length:] == d
187 def sdir(srcdir, variant):
188 """Build a path from srcdir to the staged API files of
189 'variant' (typically '' or '_debug')"""
190 # Since 'core' and 'plugin' files are staged
191 # in separate directories, we target the parent dir.
192 return os.path.sep.join(
196 "install-vpp%s-native" % variant,
205 if dmatch("src/scripts"):
206 srcdir = os.path.sep.join(localdir_s[:-2])
207 elif dmatch("src/vpp-api/python"):
208 srcdir = os.path.sep.join(localdir_s[:-3])
210 # we're apparently running tests
211 srcdir = os.path.sep.join(localdir_s[:-1])
214 # we're in the source tree, try both the debug and release
216 dirs.append(sdir(srcdir, "_debug"))
217 dirs.append(sdir(srcdir, ""))
219 # Test for staged copies of the scripts
220 # For these, since we explicitly know if we're running a debug versus
221 # release variant, target only the relevant directory
222 if dmatch("build-root/install-vpp_debug-native/vpp/bin"):
223 srcdir = os.path.sep.join(localdir_s[:-4])
224 dirs.append(sdir(srcdir, "_debug"))
225 if dmatch("build-root/install-vpp-native/vpp/bin"):
226 srcdir = os.path.sep.join(localdir_s[:-4])
227 dirs.append(sdir(srcdir, ""))
229 # finally, try the location system packages typically install into
230 dirs.append(os.path.sep.join(("", "usr", "share", "vpp", "api")))
232 # check the directories for existence; first one wins
234 if os.path.isdir(dir):
240 def find_api_files(cls, api_dir=None, patterns="*"): # -> list
241 """Find API definition files from the given directory tree with the
242 given pattern. If no directory is given then find_api_dir() is used
243 to locate one. If no pattern is given then all definition files found
244 in the directory tree are used.
246 :param api_dir: A directory tree in which to locate API definition
247 files; subdirectories are descended into.
248 If this is None then find_api_dir() is called to discover it.
249 :param patterns: A list of patterns to use in each visited directory
250 when looking for files.
251 This can be a list/tuple object or a comma-separated string of
252 patterns. Each value in the list will have leading/trialing
254 The pattern specifies the first part of the filename, '.api.json'
256 The results are de-duplicated, thus overlapping patterns are fine.
257 If this is None it defaults to '*' meaning "all API files".
258 :returns: A list of file paths for the API files found.
261 api_dir = cls.find_api_dir([])
263 raise VPPApiError("api_dir cannot be located")
265 if isinstance(patterns, list) or isinstance(patterns, tuple):
266 patterns = [p.strip() + ".api.json" for p in patterns]
268 patterns = [p.strip() + ".api.json" for p in patterns.split(",")]
271 for root, dirnames, files in os.walk(api_dir):
272 # iterate all given patterns and de-dup the result
273 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
274 for filename in files:
275 api_files.append(os.path.join(root, filename))
280 def process_json_file(self, apidef_file):
281 api = json.load(apidef_file)
282 return self._process_json(api)
285 def process_json_str(self, json_str):
286 api = json.loads(json_str)
287 return self._process_json(api)
290 def _process_json(api): # -> Tuple[Dict, Dict]
295 for t in api["enums"]:
296 t[0] = "vl_api_" + t[0] + "_t"
297 types[t[0]] = {"type": "enum", "data": t}
301 for t in api["enumflags"]:
302 t[0] = "vl_api_" + t[0] + "_t"
303 types[t[0]] = {"type": "enum", "data": t}
307 for t in api["unions"]:
308 t[0] = "vl_api_" + t[0] + "_t"
309 types[t[0]] = {"type": "union", "data": t}
314 for t in api["types"]:
315 t[0] = "vl_api_" + t[0] + "_t"
316 types[t[0]] = {"type": "type", "data": t}
321 for t, v in api["aliases"].items():
322 types["vl_api_" + t + "_t"] = {"type": "alias", "data": v}
327 services.update(api["services"])
334 for k, v in types.items():
336 if not vpp_get_type(k):
337 if v["type"] == "enum":
339 VPPEnumType(t[0], t[1:])
342 if not vpp_get_type(k):
343 if v["type"] == "enumflag":
345 VPPEnumFlagType(t[0], t[1:])
348 elif v["type"] == "union":
350 VPPUnionType(t[0], t[1:])
353 elif v["type"] == "type":
358 elif v["type"] == "alias":
363 if len(unresolved) == 0:
366 raise VPPValueError("Unresolved type definitions {}".format(unresolved))
370 for m in api["messages"]:
372 messages[m[0]] = VPPMessage(m[0], m[1:])
373 except VPPNotImplementedError:
375 logger.error("Not implemented error for {}".format(m[0]))
378 return messages, services
384 This class provides the APIs to VPP. The APIs are loaded
385 from provided .api.json files and makes functions accordingly.
386 These functions are documented in the VPP .api files, as they
387 are dynamically created.
389 Additionally, VPP can send callback messages; this class
390 provides a means to register a callback function to receive
391 these messages in a background thread.
395 VPPApiError = VPPApiError
396 VPPRuntimeError = VPPRuntimeError
397 VPPValueError = VPPValueError
398 VPPNotImplementedError = VPPNotImplementedError
399 VPPIOError = VPPIOError
411 server_address="/run/vpp/api.sock",
413 """Create a VPP API object.
415 apifiles is a list of files containing API
416 descriptions that will be loaded - methods will be
417 dynamically created reflecting these APIs. If not
418 provided this will load the API files from VPP's
419 default install location.
421 logger, if supplied, is the logging logger object to log to.
422 loglevel, if supplied, is the log level this logger is set
423 to report at (from the loglevels in the logging module).
426 logger = logging.getLogger(
427 "{}.{}".format(__name__, self.__class__.__name__)
429 if loglevel is not None:
430 logger.setLevel(loglevel)
437 self.header = VPPType("header", [["u16", "msgid"], ["u32", "client_index"]])
439 self.event_callback = None
440 self.message_queue = queue.Queue()
441 self.read_timeout = read_timeout
442 self.async_thread = async_thread
443 self.event_thread = None
444 self.testmode = testmode
445 self.server_address = server_address
446 self._apifiles = apifiles
450 # Pick up API definitions from default directory
452 if isinstance(self.apidir, list):
454 for d in self.apidir:
455 apifiles += VPPApiJSONFiles.find_api_files(d)
457 apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
458 except (RuntimeError, VPPApiError):
459 # In test mode we don't care that we can't find the API files
463 raise VPPRuntimeError
465 for file in apifiles:
466 with open(file) as apidef_file:
467 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
468 self.messages.update(m)
469 self.services.update(s)
471 self.apifiles = apifiles
474 if len(self.messages) == 0 and not testmode:
475 raise VPPValueError(1, "Missing JSON message definitions")
476 if not (verify_enum_hint(VppEnum.vl_api_address_family_t)):
477 raise VPPRuntimeError("Invalid address family hints. " "Cannot continue.")
479 self.transport = VppTransport(
480 self, read_timeout=read_timeout, server_address=server_address
482 # Make sure we allow VPP to clean up the message rings.
483 atexit.register(vpp_atexit, weakref.ref(self))
485 add_convenience_methods()
487 def get_function(self, name):
488 return getattr(self._api, name)
491 """Multiprocessing-safe provider of unique context IDs."""
494 self.context = mp.Value(ctypes.c_uint, 0)
495 self.lock = mp.Lock()
498 """Get a new unique (or, at least, not recently used) context."""
500 self.context.value += 1
501 return self.context.value
503 get_context = ContextId()
505 def get_type(self, name):
506 return vpp_get_type(name)
510 if not hasattr(self, "_api"):
511 raise VPPApiError("Not connected, api definitions not available")
514 def make_function(self, msg, i, multipart, do_async):
518 return self._call_vpp_async(i, msg, **kwargs)
523 return self._call_vpp(i, msg, multipart, **kwargs)
525 f.__name__ = str(msg.name)
526 f.__doc__ = ", ".join(
527 ["%s %s" % (msg.fieldtypes[j], k) for j, k in enumerate(msg.fields)]
533 def make_pack_function(self, msg, i, multipart):
535 return self._call_vpp_pack(i, msg, **kwargs)
540 def _register_functions(self, do_async=False):
541 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
542 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
543 self._api = VppApiDynamicMethodHolder()
544 for name, msg in self.messages.items():
545 n = name + "_" + msg.crc[2:]
546 i = self.transport.get_msg_index(n)
548 self.id_msgdef[i] = msg
549 self.id_names[i] = name
551 # Create function for client side messages.
552 if name in self.services:
553 f = self.make_function(msg, i, self.services[name], do_async)
554 f_pack = self.make_pack_function(msg, i, self.services[name])
555 setattr(self._api, name, FuncWrapper(f))
556 setattr(self._api, name + "_pack", FuncWrapper(f_pack))
558 self.logger.debug("No such message type or failed CRC checksum: %s", n)
560 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, do_async):
561 pfx = chroot_prefix.encode("utf-8") if chroot_prefix else None
563 rv = self.transport.connect(name, pfx, msg_handler, rx_qlen, do_async)
565 raise VPPIOError(2, "Connect failed")
566 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
567 self._register_functions(do_async=do_async)
569 # Initialise control ping
570 crc = self.messages["control_ping"].crc
571 self.control_ping_index = self.transport.get_msg_index(
572 ("control_ping" + "_" + crc[2:])
574 self.control_ping_msgdef = self.messages["control_ping"]
575 if self.async_thread:
576 self.event_thread = threading.Thread(target=self.thread_msg_handler)
577 self.event_thread.daemon = True
578 self.event_thread.start()
580 self.event_thread = None
583 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
586 name - the name of the client.
587 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
588 do_async - if true, messages are sent without waiting for a reply
589 rx_qlen - the length of the VPP message receive queue between
592 msg_handler = self.transport.get_callback(do_async)
593 return self.connect_internal(
594 name, msg_handler, chroot_prefix, rx_qlen, do_async
597 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
598 """Attach to VPP in synchronous mode. Application must poll for events.
600 name - the name of the client.
601 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
602 rx_qlen - the length of the VPP message receive queue between
606 return self.connect_internal(name, None, chroot_prefix, rx_qlen, do_async=False)
608 def disconnect(self):
609 """Detach from VPP."""
610 rv = self.transport.disconnect()
611 if self.event_thread is not None:
612 self.message_queue.put("terminate event thread")
615 def msg_handler_sync(self, msg):
616 """Process an incoming message from VPP in sync mode.
618 The message may be a reply or it may be an async notification.
620 r = self.decode_incoming_msg(msg)
624 # If we have a context, then use the context to find any
625 # request waiting for a reply
627 if hasattr(r, "context") and r.context > 0:
631 # No context -> async notification that we feed to the callback
632 self.message_queue.put_nowait(r)
634 raise VPPIOError(2, "RPC reply message received in event handler")
636 def has_context(self, msg):
641 "header_with_context",
642 [["u16", "msgid"], ["u32", "client_index"], ["u32", "context"]],
645 (i, ci, context), size = header.unpack(msg, 0)
646 if self.id_names[i] == "rx_thread_exit":
650 # Decode message and returns a tuple.
652 msgobj = self.id_msgdef[i]
653 if "context" in msgobj.field_by_name and context >= 0:
657 def decode_incoming_msg(self, msg, no_type_conversion=False):
659 logger.warning("vpp_api.read failed")
662 (i, ci), size = self.header.unpack(msg, 0)
663 if self.id_names[i] == "rx_thread_exit":
667 # Decode message and returns a tuple.
669 msgobj = self.id_msgdef[i]
671 raise VPPIOError(2, "Reply message undefined")
673 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
676 def msg_handler_async(self, msg):
677 """Process a message from VPP in async mode.
679 In async mode, all messages are returned to the callback.
681 r = self.decode_incoming_msg(msg)
685 msgname = type(r).__name__
687 if self.event_callback:
688 self.event_callback(msgname, r)
690 def _control_ping(self, context):
691 """Send a ping command."""
692 self._call_vpp_async(
693 self.control_ping_index, self.control_ping_msgdef, context=context
696 def validate_args(self, msg, kwargs):
697 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
699 raise VPPValueError("Invalid argument {} to {}".format(list(d), msg.name))
701 def _add_stat(self, name, ms):
702 if not name in self.stats:
703 self.stats[name] = {"max": ms, "count": 1, "avg": ms}
705 if ms > self.stats[name]["max"]:
706 self.stats[name]["max"] = ms
707 self.stats[name]["count"] += 1
708 n = self.stats[name]["count"]
709 self.stats[name]["avg"] = self.stats[name]["avg"] * (n - 1) / n + ms / n
712 s = "\n=== API PAPI STATISTICS ===\n"
713 s += "{:<30} {:>4} {:>6} {:>6}\n".format("message", "cnt", "avg", "max")
714 for n in sorted(self.stats.items(), key=lambda v: v[1]["avg"], reverse=True):
715 s += "{:<30} {:>4} {:>6.2f} {:>6.2f}\n".format(
716 n[0], n[1]["count"], n[1]["avg"], n[1]["max"]
720 def get_field_options(self, msg, fld_name):
721 # when there is an option, the msgdef has 3 elements.
722 # ['u32', 'ring_size', {'default': 1024}]
723 for _def in self.messages[msg].msgdef:
724 if isinstance(_def, list) and len(_def) == 3 and _def[1] == fld_name:
727 def _call_vpp(self, i, msgdef, service, **kwargs):
728 """Given a message, send the message and await a reply.
730 msgdef - the message packing definition
731 i - the message type index
732 multipart - True if the message returns multiple
734 context - context number - chosen at random if not
736 The remainder of the kwargs are the arguments to the API call.
738 The return value is the message or message array containing
739 the response. It will raise an IOError exception if there was
740 no response within the timeout window.
743 if "context" not in kwargs:
744 context = self.get_context()
745 kwargs["context"] = context
747 context = kwargs["context"]
748 kwargs["_vl_msg_id"] = i
750 no_type_conversion = kwargs.pop("_no_type_conversion", False)
751 timeout = kwargs.pop("_timeout", None)
754 if self.transport.socket_index:
755 kwargs["client_index"] = self.transport.socket_index
756 except AttributeError:
758 self.validate_args(msgdef, kwargs)
760 s = "Calling {}({})".format(
761 msgdef.name, ",".join(["{!r}:{!r}".format(k, v) for k, v in kwargs.items()])
765 b = msgdef.pack(kwargs)
766 self.transport.suspend()
768 self.transport.write(b)
770 msgreply = service["reply"]
771 stream = True if "stream" in service else False
773 if "stream_msg" in service:
774 # New service['reply'] = _reply and service['stream_message'] = _details
775 stream_message = service["stream_msg"]
778 # Old service['reply'] = _details
779 stream_message = msgreply
780 msgreply = "control_ping_reply"
782 # Send a ping after the request - we use its response
783 # to detect that we have seen all results.
784 self._control_ping(context)
786 # Block until we get a reply.
789 r = self.read_blocking(no_type_conversion, timeout)
791 raise VPPIOError(2, "VPP API client: read failed")
792 msgname = type(r).__name__
793 if context not in r or r.context == 0 or context != r.context:
794 # Message being queued
795 self.message_queue.put_nowait(r)
797 if msgname != msgreply and (stream and (msgname != stream_message)):
798 print("REPLY MISMATCH", msgreply, msgname, stream_message, stream)
802 if msgname == msgreply:
803 if modern: # Return both reply and list
809 self.transport.resume()
811 s = "Return value: {!r}".format(r)
816 self._add_stat(msgdef.name, (te - ts) * 1000)
819 def _call_vpp_async(self, i, msg, **kwargs):
820 """Given a message, send the message and return the context.
822 msgdef - the message packing definition
823 i - the message type index
824 context - context number - chosen at random if not
826 The remainder of the kwargs are the arguments to the API call.
828 The reply message(s) will be delivered later to the registered callback.
829 The returned context will help with assigning which call
830 the reply belongs to.
832 if "context" not in kwargs:
833 context = self.get_context()
834 kwargs["context"] = context
836 context = kwargs["context"]
838 if self.transport.socket_index:
839 kwargs["client_index"] = self.transport.socket_index
840 except AttributeError:
841 kwargs["client_index"] = 0
842 kwargs["_vl_msg_id"] = i
845 self.transport.write(b)
848 def _call_vpp_pack(self, i, msg, **kwargs):
849 """Given a message, return the binary representation."""
850 kwargs["_vl_msg_id"] = i
851 kwargs["client_index"] = 0
852 kwargs["context"] = 0
853 return msg.pack(kwargs)
855 def read_blocking(self, no_type_conversion=False, timeout=None):
856 """Get next received message from transport within timeout, decoded.
858 Note that notifications have context zero
859 and are not put into receive queue (at least for socket transport),
860 use async_thread with registered callback for processing them.
862 If no message appears in the queue within timeout, return None.
864 Optionally, type conversion can be skipped,
865 as some of conversions are into less precise types.
867 When r is the return value of this, the caller can get message name as:
868 msgname = type(r).__name__
869 and context number (type long) as:
872 :param no_type_conversion: If false, type conversions are applied.
873 :type no_type_conversion: bool
874 :returns: Decoded message, or None if no message (within timeout).
875 :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
876 :raises VppTransportShmemIOError if timed out.
878 msg = self.transport.read(timeout=timeout)
881 return self.decode_incoming_msg(msg, no_type_conversion)
883 def register_event_callback(self, callback):
884 """Register a callback for async messages.
886 This will be called for async notifications in sync mode,
887 and all messages in async mode. In sync mode, replies to
888 requests will not come here.
890 callback is a fn(msg_type_name, msg_type) that will be
891 called when a message comes in. While this function is
892 executing, note that (a) you are in a background thread and
893 may wish to use threading.Lock to protect your datastructures,
894 and (b) message processing from VPP will stop (so if you take
895 a long while about it you may provoke reply timeouts or cause
896 VPP to fill the RX buffer). Passing None will disable the
899 self.event_callback = callback
901 def thread_msg_handler(self):
902 """Python thread calling the user registered message handler.
904 This is to emulate the old style event callback scheme. Modern
905 clients should provide their own thread to poll the event
909 r = self.message_queue.get()
910 if r == "terminate event thread":
912 msgname = type(r).__name__
913 if self.event_callback:
914 self.event_callback(msgname, r)
916 def validate_message_table(self, namecrctable):
917 """Take a dictionary of name_crc message names
918 and returns an array of missing messages"""
921 for name_crc in namecrctable:
922 i = self.transport.get_msg_index(name_crc)
924 missing_table.append(name_crc)
927 def dump_message_table(self):
928 """Return VPPs API message table as name_crc dictionary"""
929 return self.transport.message_table
931 def dump_message_table_filtered(self, msglist):
932 """Return VPPs API message table as name_crc dictionary,
933 filtered by message name list."""
935 replies = [self.services[n]["reply"] for n in msglist]
936 message_table_filtered = {}
937 for name in msglist + replies:
938 for k, v in self.transport.message_table.items():
939 if k.startswith(name):
940 message_table_filtered[k] = v
942 return message_table_filtered
946 "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, "
947 "logger=%s, read_timeout=%s, "
948 "server_address='%s'>"
959 def details_iter(self, f, **kwargs):
962 kwargs["cursor"] = cursor
963 rv, details = f(**kwargs)
966 if rv.retval == 0 or rv.retval != -165: