3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
22 import multiprocessing as mp
33 from .vpp_format import verify_enum_hint
34 from .vpp_serializer import VPPType, VPPEnumType, VPPEnumFlagType, VPPUnionType
35 from .vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
39 except ModuleNotFoundError:
42 """placeholder for VppTransport as the implementation is dependent on
43 VPPAPIClient's initialization values
48 from .vpp_transport_socket import VppTransport
50 logger = logging.getLogger("vpp_papi")
51 logger.addHandler(logging.NullHandler())
55 "VppApiDynamicMethodHolder",
66 def metaclass(metaclass):
67 @functools.wraps(metaclass)
69 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
74 class VppEnumType(type):
75 def __getattr__(cls, name):
76 t = vpp_get_type(name)
80 @metaclass(VppEnumType)
85 @metaclass(VppEnumType)
90 def vpp_atexit(vpp_weakref):
91 """Clean up VPP connection on shutdown."""
92 vpp_instance = vpp_weakref()
93 if vpp_instance and vpp_instance.transport.connected:
94 logger.debug("Cleaning up VPP on exit")
95 vpp_instance.disconnect()
98 def add_convenience_methods():
99 # provide convenience methods to IP[46]Address.vapi_af
101 if 6 == self._version:
102 return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
103 if 4 == self._version:
104 return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
105 raise ValueError("Invalid _version.")
107 def _vapi_af_name(self):
108 if 6 == self._version:
110 if 4 == self._version:
112 raise ValueError("Invalid _version.")
114 ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
115 ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
118 class VppApiDynamicMethodHolder:
123 def __init__(self, func):
125 self.__name__ = func.__name__
126 self.__doc__ = func.__doc__
128 def __call__(self, **kwargs):
129 return self._func(**kwargs)
132 return "<FuncWrapper(func=<%s(%s)>)>" % (self.__name__, self.__doc__)
135 class VPPApiError(Exception):
139 class VPPNotImplementedError(NotImplementedError):
143 class VPPIOError(IOError):
147 class VPPRuntimeError(RuntimeError):
151 class VPPValueError(ValueError):
155 class VPPApiJSONFiles:
157 def find_api_dir(cls, dirs):
158 """Attempt to find the best directory in which API definition
159 files may reside. If the value VPP_API_DIR exists in the environment
160 then it is first on the search list. If we're inside a recognized
161 location in a VPP source tree (src/scripts and src/vpp-api/python)
162 then entries from there to the likely locations in build-root are
163 added. Finally the location used by system packages is added.
165 :returns: A single directory name, or None if no such directory
169 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
170 # in which case, plot a course to likely places in the src tree
171 import __main__ as main
173 if hasattr(main, "__file__"):
174 # get the path of the calling script
175 localdir = os.path.dirname(os.path.realpath(main.__file__))
177 # use cwd if there is no calling script
178 localdir = os.getcwd()
179 localdir_s = localdir.split(os.path.sep)
182 """Match dir against right-hand components of the script dir"""
183 d = dir.split("/") # param 'dir' assumes a / separator
185 return len(localdir_s) > length and localdir_s[-length:] == d
187 def sdir(srcdir, variant):
188 """Build a path from srcdir to the staged API files of
189 'variant' (typically '' or '_debug')"""
190 # Since 'core' and 'plugin' files are staged
191 # in separate directories, we target the parent dir.
192 return os.path.sep.join(
196 "install-vpp%s-native" % variant,
205 if dmatch("src/scripts"):
206 srcdir = os.path.sep.join(localdir_s[:-2])
207 elif dmatch("src/vpp-api/python"):
208 srcdir = os.path.sep.join(localdir_s[:-3])
210 # we're apparently running tests
211 srcdir = os.path.sep.join(localdir_s[:-1])
214 # we're in the source tree, try both the debug and release
216 dirs.append(sdir(srcdir, "_debug"))
217 dirs.append(sdir(srcdir, ""))
219 # Test for staged copies of the scripts
220 # For these, since we explicitly know if we're running a debug versus
221 # release variant, target only the relevant directory
222 if dmatch("build-root/install-vpp_debug-native/vpp/bin"):
223 srcdir = os.path.sep.join(localdir_s[:-4])
224 dirs.append(sdir(srcdir, "_debug"))
225 if dmatch("build-root/install-vpp-native/vpp/bin"):
226 srcdir = os.path.sep.join(localdir_s[:-4])
227 dirs.append(sdir(srcdir, ""))
229 # finally, try the location system packages typically install into
230 dirs.append(os.path.sep.join(("", "usr", "share", "vpp", "api")))
232 # check the directories for existence; first one wins
234 if os.path.isdir(dir):
240 def find_api_files(cls, api_dir=None, patterns="*"): # -> list
241 """Find API definition files from the given directory tree with the
242 given pattern. If no directory is given then find_api_dir() is used
243 to locate one. If no pattern is given then all definition files found
244 in the directory tree are used.
246 :param api_dir: A directory tree in which to locate API definition
247 files; subdirectories are descended into.
248 If this is None then find_api_dir() is called to discover it.
249 :param patterns: A list of patterns to use in each visited directory
250 when looking for files.
251 This can be a list/tuple object or a comma-separated string of
252 patterns. Each value in the list will have leading/trialing
254 The pattern specifies the first part of the filename, '.api.json'
256 The results are de-duplicated, thus overlapping patterns are fine.
257 If this is None it defaults to '*' meaning "all API files".
258 :returns: A list of file paths for the API files found.
261 api_dir = cls.find_api_dir([])
263 raise VPPApiError("api_dir cannot be located")
265 if isinstance(patterns, list) or isinstance(patterns, tuple):
266 patterns = [p.strip() + ".api.json" for p in patterns]
268 patterns = [p.strip() + ".api.json" for p in patterns.split(",")]
271 for root, dirnames, files in os.walk(api_dir):
272 # iterate all given patterns and de-dup the result
273 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
274 for filename in files:
275 api_files.append(os.path.join(root, filename))
280 def process_json_file(self, apidef_file):
281 api = json.load(apidef_file)
282 return self._process_json(api)
285 def process_json_str(self, json_str):
286 api = json.loads(json_str)
287 return self._process_json(api)
290 def _process_json(api): # -> Tuple[Dict, Dict]
295 for t in api["enums"]:
296 t[0] = "vl_api_" + t[0] + "_t"
297 types[t[0]] = {"type": "enum", "data": t}
301 for t in api["enumflags"]:
302 t[0] = "vl_api_" + t[0] + "_t"
303 types[t[0]] = {"type": "enum", "data": t}
307 for t in api["unions"]:
308 t[0] = "vl_api_" + t[0] + "_t"
309 types[t[0]] = {"type": "union", "data": t}
314 for t in api["types"]:
315 t[0] = "vl_api_" + t[0] + "_t"
316 types[t[0]] = {"type": "type", "data": t}
321 for t, v in api["aliases"].items():
322 types["vl_api_" + t + "_t"] = {"type": "alias", "data": v}
327 services.update(api["services"])
334 for k, v in types.items():
336 if not vpp_get_type(k):
337 if v["type"] == "enum":
339 VPPEnumType(t[0], t[1:])
342 if not vpp_get_type(k):
343 if v["type"] == "enumflag":
345 VPPEnumFlagType(t[0], t[1:])
348 elif v["type"] == "union":
350 VPPUnionType(t[0], t[1:])
353 elif v["type"] == "type":
358 elif v["type"] == "alias":
363 if len(unresolved) == 0:
366 raise VPPValueError("Unresolved type definitions {}".format(unresolved))
370 for m in api["messages"]:
372 messages[m[0]] = VPPMessage(m[0], m[1:])
373 except VPPNotImplementedError:
375 logger.error("Not implemented error for {}".format(m[0]))
378 return messages, services
384 This class provides the APIs to VPP. The APIs are loaded
385 from provided .api.json files and makes functions accordingly.
386 These functions are documented in the VPP .api files, as they
387 are dynamically created.
389 Additionally, VPP can send callback messages; this class
390 provides a means to register a callback function to receive
391 these messages in a background thread.
395 VPPApiError = VPPApiError
396 VPPRuntimeError = VPPRuntimeError
397 VPPValueError = VPPValueError
398 VPPNotImplementedError = VPPNotImplementedError
399 VPPIOError = VPPIOError
411 server_address="/run/vpp/api.sock",
413 """Create a VPP API object.
415 apifiles is a list of files containing API
416 descriptions that will be loaded - methods will be
417 dynamically created reflecting these APIs. If not
418 provided this will load the API files from VPP's
419 default install location.
421 logger, if supplied, is the logging logger object to log to.
422 loglevel, if supplied, is the log level this logger is set
423 to report at (from the loglevels in the logging module).
426 logger = logging.getLogger(
427 "{}.{}".format(__name__, self.__class__.__name__)
429 if loglevel is not None:
430 logger.setLevel(loglevel)
437 self.header = VPPType("header", [["u16", "msgid"], ["u32", "client_index"]])
439 self.event_callback = None
440 self.message_queue = queue.Queue()
441 self.read_timeout = read_timeout
442 self.async_thread = async_thread
443 self.event_thread = None
444 self.testmode = testmode
445 self.server_address = server_address
446 self._apifiles = apifiles
450 # Pick up API definitions from default directory
452 apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
453 except (RuntimeError, VPPApiError):
454 # In test mode we don't care that we can't find the API files
458 raise VPPRuntimeError
460 for file in apifiles:
461 with open(file) as apidef_file:
462 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
463 self.messages.update(m)
464 self.services.update(s)
466 self.apifiles = apifiles
469 if len(self.messages) == 0 and not testmode:
470 raise VPPValueError(1, "Missing JSON message definitions")
471 if not (verify_enum_hint(VppEnum.vl_api_address_family_t)):
472 raise VPPRuntimeError("Invalid address family hints. " "Cannot continue.")
474 self.transport = VppTransport(
475 self, read_timeout=read_timeout, server_address=server_address
477 # Make sure we allow VPP to clean up the message rings.
478 atexit.register(vpp_atexit, weakref.ref(self))
480 add_convenience_methods()
482 def get_function(self, name):
483 return getattr(self._api, name)
486 """Multiprocessing-safe provider of unique context IDs."""
489 self.context = mp.Value(ctypes.c_uint, 0)
490 self.lock = mp.Lock()
493 """Get a new unique (or, at least, not recently used) context."""
495 self.context.value += 1
496 return self.context.value
498 get_context = ContextId()
500 def get_type(self, name):
501 return vpp_get_type(name)
505 if not hasattr(self, "_api"):
506 raise VPPApiError("Not connected, api definitions not available")
509 def make_function(self, msg, i, multipart, do_async):
513 return self._call_vpp_async(i, msg, **kwargs)
518 return self._call_vpp(i, msg, multipart, **kwargs)
520 f.__name__ = str(msg.name)
521 f.__doc__ = ", ".join(
522 ["%s %s" % (msg.fieldtypes[j], k) for j, k in enumerate(msg.fields)]
528 def _register_functions(self, do_async=False):
529 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
530 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
531 self._api = VppApiDynamicMethodHolder()
532 for name, msg in self.messages.items():
533 n = name + "_" + msg.crc[2:]
534 i = self.transport.get_msg_index(n)
536 self.id_msgdef[i] = msg
537 self.id_names[i] = name
539 # Create function for client side messages.
540 if name in self.services:
541 f = self.make_function(msg, i, self.services[name], do_async)
542 setattr(self._api, name, FuncWrapper(f))
544 self.logger.debug("No such message type or failed CRC checksum: %s", n)
546 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, do_async):
547 pfx = chroot_prefix.encode("utf-8") if chroot_prefix else None
549 rv = self.transport.connect(name, pfx, msg_handler, rx_qlen)
551 raise VPPIOError(2, "Connect failed")
552 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
553 self._register_functions(do_async=do_async)
555 # Initialise control ping
556 crc = self.messages["control_ping"].crc
557 self.control_ping_index = self.transport.get_msg_index(
558 ("control_ping" + "_" + crc[2:])
560 self.control_ping_msgdef = self.messages["control_ping"]
561 if self.async_thread:
562 self.event_thread = threading.Thread(target=self.thread_msg_handler)
563 self.event_thread.daemon = True
564 self.event_thread.start()
566 self.event_thread = None
569 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
572 name - the name of the client.
573 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
574 do_async - if true, messages are sent without waiting for a reply
575 rx_qlen - the length of the VPP message receive queue between
578 msg_handler = self.transport.get_callback(do_async)
579 return self.connect_internal(
580 name, msg_handler, chroot_prefix, rx_qlen, do_async
583 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
584 """Attach to VPP in synchronous mode. Application must poll for events.
586 name - the name of the client.
587 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
588 rx_qlen - the length of the VPP message receive queue between
592 return self.connect_internal(name, None, chroot_prefix, rx_qlen, do_async=False)
594 def disconnect(self):
595 """Detach from VPP."""
596 rv = self.transport.disconnect()
597 if self.event_thread is not None:
598 self.message_queue.put("terminate event thread")
601 def msg_handler_sync(self, msg):
602 """Process an incoming message from VPP in sync mode.
604 The message may be a reply or it may be an async notification.
606 r = self.decode_incoming_msg(msg)
610 # If we have a context, then use the context to find any
611 # request waiting for a reply
613 if hasattr(r, "context") and r.context > 0:
617 # No context -> async notification that we feed to the callback
618 self.message_queue.put_nowait(r)
620 raise VPPIOError(2, "RPC reply message received in event handler")
622 def has_context(self, msg):
627 "header_with_context",
628 [["u16", "msgid"], ["u32", "client_index"], ["u32", "context"]],
631 (i, ci, context), size = header.unpack(msg, 0)
632 if self.id_names[i] == "rx_thread_exit":
636 # Decode message and returns a tuple.
638 msgobj = self.id_msgdef[i]
639 if "context" in msgobj.field_by_name and context >= 0:
643 def decode_incoming_msg(self, msg, no_type_conversion=False):
645 logger.warning("vpp_api.read failed")
648 (i, ci), size = self.header.unpack(msg, 0)
649 if self.id_names[i] == "rx_thread_exit":
653 # Decode message and returns a tuple.
655 msgobj = self.id_msgdef[i]
657 raise VPPIOError(2, "Reply message undefined")
659 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
662 def msg_handler_async(self, msg):
663 """Process a message from VPP in async mode.
665 In async mode, all messages are returned to the callback.
667 r = self.decode_incoming_msg(msg)
671 msgname = type(r).__name__
673 if self.event_callback:
674 self.event_callback(msgname, r)
676 def _control_ping(self, context):
677 """Send a ping command."""
678 self._call_vpp_async(
679 self.control_ping_index, self.control_ping_msgdef, context=context
682 def validate_args(self, msg, kwargs):
683 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
685 raise VPPValueError("Invalid argument {} to {}".format(list(d), msg.name))
687 def _add_stat(self, name, ms):
688 if not name in self.stats:
689 self.stats[name] = {"max": ms, "count": 1, "avg": ms}
691 if ms > self.stats[name]["max"]:
692 self.stats[name]["max"] = ms
693 self.stats[name]["count"] += 1
694 n = self.stats[name]["count"]
695 self.stats[name]["avg"] = self.stats[name]["avg"] * (n - 1) / n + ms / n
698 s = "\n=== API PAPI STATISTICS ===\n"
699 s += "{:<30} {:>4} {:>6} {:>6}\n".format("message", "cnt", "avg", "max")
700 for n in sorted(self.stats.items(), key=lambda v: v[1]["avg"], reverse=True):
701 s += "{:<30} {:>4} {:>6.2f} {:>6.2f}\n".format(
702 n[0], n[1]["count"], n[1]["avg"], n[1]["max"]
706 def get_field_options(self, msg, fld_name):
707 # when there is an option, the msgdef has 3 elements.
708 # ['u32', 'ring_size', {'default': 1024}]
709 for _def in self.messages[msg].msgdef:
710 if isinstance(_def, list) and len(_def) == 3 and _def[1] == fld_name:
713 def _call_vpp(self, i, msgdef, service, **kwargs):
714 """Given a message, send the message and await a reply.
716 msgdef - the message packing definition
717 i - the message type index
718 multipart - True if the message returns multiple
720 context - context number - chosen at random if not
722 The remainder of the kwargs are the arguments to the API call.
724 The return value is the message or message array containing
725 the response. It will raise an IOError exception if there was
726 no response within the timeout window.
729 if "context" not in kwargs:
730 context = self.get_context()
731 kwargs["context"] = context
733 context = kwargs["context"]
734 kwargs["_vl_msg_id"] = i
736 no_type_conversion = kwargs.pop("_no_type_conversion", False)
737 timeout = kwargs.pop("_timeout", None)
740 if self.transport.socket_index:
741 kwargs["client_index"] = self.transport.socket_index
742 except AttributeError:
744 self.validate_args(msgdef, kwargs)
746 s = "Calling {}({})".format(
747 msgdef.name, ",".join(["{!r}:{!r}".format(k, v) for k, v in kwargs.items()])
751 b = msgdef.pack(kwargs)
752 self.transport.suspend()
754 self.transport.write(b)
756 msgreply = service["reply"]
757 stream = True if "stream" in service else False
759 if "stream_msg" in service:
760 # New service['reply'] = _reply and service['stream_message'] = _details
761 stream_message = service["stream_msg"]
764 # Old service['reply'] = _details
765 stream_message = msgreply
766 msgreply = "control_ping_reply"
768 # Send a ping after the request - we use its response
769 # to detect that we have seen all results.
770 self._control_ping(context)
772 # Block until we get a reply.
775 r = self.read_blocking(no_type_conversion, timeout)
777 raise VPPIOError(2, "VPP API client: read failed")
778 msgname = type(r).__name__
779 if context not in r or r.context == 0 or context != r.context:
780 # Message being queued
781 self.message_queue.put_nowait(r)
783 if msgname != msgreply and (stream and (msgname != stream_message)):
784 print("REPLY MISMATCH", msgreply, msgname, stream_message, stream)
788 if msgname == msgreply:
789 if modern: # Return both reply and list
795 self.transport.resume()
797 s = "Return value: {!r}".format(r)
802 self._add_stat(msgdef.name, (te - ts) * 1000)
805 def _call_vpp_async(self, i, msg, **kwargs):
806 """Given a message, send the message and return the context.
808 msgdef - the message packing definition
809 i - the message type index
810 context - context number - chosen at random if not
812 The remainder of the kwargs are the arguments to the API call.
814 The reply message(s) will be delivered later to the registered callback.
815 The returned context will help with assigning which call
816 the reply belongs to.
818 if "context" not in kwargs:
819 context = self.get_context()
820 kwargs["context"] = context
822 context = kwargs["context"]
824 if self.transport.socket_index:
825 kwargs["client_index"] = self.transport.socket_index
826 except AttributeError:
827 kwargs["client_index"] = 0
828 kwargs["_vl_msg_id"] = i
831 self.transport.write(b)
834 def read_blocking(self, no_type_conversion=False, timeout=None):
835 """Get next received message from transport within timeout, decoded.
837 Note that notifications have context zero
838 and are not put into receive queue (at least for socket transport),
839 use async_thread with registered callback for processing them.
841 If no message appears in the queue within timeout, return None.
843 Optionally, type conversion can be skipped,
844 as some of conversions are into less precise types.
846 When r is the return value of this, the caller can get message name as:
847 msgname = type(r).__name__
848 and context number (type long) as:
851 :param no_type_conversion: If false, type conversions are applied.
852 :type no_type_conversion: bool
853 :returns: Decoded message, or None if no message (within timeout).
854 :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
855 :raises VppTransportShmemIOError if timed out.
857 msg = self.transport.read(timeout=timeout)
860 return self.decode_incoming_msg(msg, no_type_conversion)
862 def register_event_callback(self, callback):
863 """Register a callback for async messages.
865 This will be called for async notifications in sync mode,
866 and all messages in async mode. In sync mode, replies to
867 requests will not come here.
869 callback is a fn(msg_type_name, msg_type) that will be
870 called when a message comes in. While this function is
871 executing, note that (a) you are in a background thread and
872 may wish to use threading.Lock to protect your datastructures,
873 and (b) message processing from VPP will stop (so if you take
874 a long while about it you may provoke reply timeouts or cause
875 VPP to fill the RX buffer). Passing None will disable the
878 self.event_callback = callback
880 def thread_msg_handler(self):
881 """Python thread calling the user registered message handler.
883 This is to emulate the old style event callback scheme. Modern
884 clients should provide their own thread to poll the event
888 r = self.message_queue.get()
889 if r == "terminate event thread":
891 msgname = type(r).__name__
892 if self.event_callback:
893 self.event_callback(msgname, r)
895 def validate_message_table(self, namecrctable):
896 """Take a dictionary of name_crc message names
897 and returns an array of missing messages"""
900 for name_crc in namecrctable:
901 i = self.transport.get_msg_index(name_crc)
903 missing_table.append(name_crc)
906 def dump_message_table(self):
907 """Return VPPs API message table as name_crc dictionary"""
908 return self.transport.message_table
910 def dump_message_table_filtered(self, msglist):
911 """Return VPPs API message table as name_crc dictionary,
912 filtered by message name list."""
914 replies = [self.services[n]["reply"] for n in msglist]
915 message_table_filtered = {}
916 for name in msglist + replies:
917 for k, v in self.transport.message_table.items():
918 if k.startswith(name):
919 message_table_filtered[k] = v
921 return message_table_filtered
925 "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, "
926 "logger=%s, read_timeout=%s, "
927 "server_address='%s'>"
938 def details_iter(self, f, **kwargs):
941 kwargs["cursor"] = cursor
942 rv, details = f(**kwargs)
945 if rv.retval == 0 or rv.retval != -165: