3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from __future__ import print_function
18 from __future__ import absolute_import
22 import multiprocessing as mp
33 from .vpp_format import verify_enum_hint
34 from .vpp_serializer import VPPType, VPPEnumType, VPPEnumFlagType, VPPUnionType
35 from .vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
39 except ModuleNotFoundError:
42 """placeholder for VppTransport as the implementation is dependent on
43 VPPAPIClient's initialization values
48 from .vpp_transport_socket import VppTransport
50 logger = logging.getLogger("vpp_papi")
51 logger.addHandler(logging.NullHandler())
55 "VppApiDynamicMethodHolder",
66 def metaclass(metaclass):
67 @functools.wraps(metaclass)
69 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
74 class VppEnumType(type):
75 def __getattr__(cls, name):
76 t = vpp_get_type(name)
80 @metaclass(VppEnumType)
85 @metaclass(VppEnumType)
90 def vpp_atexit(vpp_weakref):
91 """Clean up VPP connection on shutdown."""
92 vpp_instance = vpp_weakref()
93 if vpp_instance and vpp_instance.transport.connected:
94 logger.debug("Cleaning up VPP on exit")
95 vpp_instance.disconnect()
98 def add_convenience_methods():
99 # provide convenience methods to IP[46]Address.vapi_af
101 if 6 == self._version:
102 return VppEnum.vl_api_address_family_t.ADDRESS_IP6.value
103 if 4 == self._version:
104 return VppEnum.vl_api_address_family_t.ADDRESS_IP4.value
105 raise ValueError("Invalid _version.")
107 def _vapi_af_name(self):
108 if 6 == self._version:
110 if 4 == self._version:
112 raise ValueError("Invalid _version.")
114 ipaddress._IPAddressBase.vapi_af = property(_vapi_af)
115 ipaddress._IPAddressBase.vapi_af_name = property(_vapi_af_name)
118 class VppApiDynamicMethodHolder:
123 def __init__(self, func):
125 self.__name__ = func.__name__
126 self.__doc__ = func.__doc__
128 def __call__(self, **kwargs):
129 return self._func(**kwargs)
132 return "<FuncWrapper(func=<%s(%s)>)>" % (self.__name__, self.__doc__)
135 class VPPApiError(Exception):
139 class VPPNotImplementedError(NotImplementedError):
143 class VPPIOError(IOError):
147 class VPPRuntimeError(RuntimeError):
151 class VPPValueError(ValueError):
155 class VPPApiJSONFiles:
157 def find_api_dir(cls, dirs):
158 """Attempt to find the best directory in which API definition
159 files may reside. If the value VPP_API_DIR exists in the environment
160 then it is first on the search list. If we're inside a recognized
161 location in a VPP source tree (src/scripts and src/vpp-api/python)
162 then entries from there to the likely locations in build-root are
163 added. Finally the location used by system packages is added.
165 :returns: A single directory name, or None if no such directory
169 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
170 # in which case, plot a course to likely places in the src tree
171 import __main__ as main
173 if hasattr(main, "__file__"):
174 # get the path of the calling script
175 localdir = os.path.dirname(os.path.realpath(main.__file__))
177 # use cwd if there is no calling script
178 localdir = os.getcwd()
179 localdir_s = localdir.split(os.path.sep)
182 """Match dir against right-hand components of the script dir"""
183 d = dir.split("/") # param 'dir' assumes a / separator
185 return len(localdir_s) > length and localdir_s[-length:] == d
187 def sdir(srcdir, variant):
188 """Build a path from srcdir to the staged API files of
189 'variant' (typically '' or '_debug')"""
190 # Since 'core' and 'plugin' files are staged
191 # in separate directories, we target the parent dir.
192 return os.path.sep.join(
196 "install-vpp%s-native" % variant,
205 if dmatch("src/scripts"):
206 srcdir = os.path.sep.join(localdir_s[:-2])
207 elif dmatch("src/vpp-api/python"):
208 srcdir = os.path.sep.join(localdir_s[:-3])
210 # we're apparently running tests
211 srcdir = os.path.sep.join(localdir_s[:-1])
214 # we're in the source tree, try both the debug and release
216 dirs.append(sdir(srcdir, "_debug"))
217 dirs.append(sdir(srcdir, ""))
219 # Test for staged copies of the scripts
220 # For these, since we explicitly know if we're running a debug versus
221 # release variant, target only the relevant directory
222 if dmatch("build-root/install-vpp_debug-native/vpp/bin"):
223 srcdir = os.path.sep.join(localdir_s[:-4])
224 dirs.append(sdir(srcdir, "_debug"))
225 if dmatch("build-root/install-vpp-native/vpp/bin"):
226 srcdir = os.path.sep.join(localdir_s[:-4])
227 dirs.append(sdir(srcdir, ""))
229 # finally, try the location system packages typically install into
230 dirs.append(os.path.sep.join(("", "usr", "share", "vpp", "api")))
232 # check the directories for existence; first one wins
234 if os.path.isdir(dir):
240 def find_api_files(cls, api_dir=None, patterns="*"): # -> list
241 """Find API definition files from the given directory tree with the
242 given pattern. If no directory is given then find_api_dir() is used
243 to locate one. If no pattern is given then all definition files found
244 in the directory tree are used.
246 :param api_dir: A directory tree in which to locate API definition
247 files; subdirectories are descended into.
248 If this is None then find_api_dir() is called to discover it.
249 :param patterns: A list of patterns to use in each visited directory
250 when looking for files.
251 This can be a list/tuple object or a comma-separated string of
252 patterns. Each value in the list will have leading/trialing
254 The pattern specifies the first part of the filename, '.api.json'
256 The results are de-duplicated, thus overlapping patterns are fine.
257 If this is None it defaults to '*' meaning "all API files".
258 :returns: A list of file paths for the API files found.
261 api_dir = cls.find_api_dir([])
263 raise VPPApiError("api_dir cannot be located")
265 if isinstance(patterns, list) or isinstance(patterns, tuple):
266 patterns = [p.strip() + ".api.json" for p in patterns]
268 patterns = [p.strip() + ".api.json" for p in patterns.split(",")]
271 for root, dirnames, files in os.walk(api_dir):
272 # iterate all given patterns and de-dup the result
273 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
274 for filename in files:
275 api_files.append(os.path.join(root, filename))
280 def process_json_file(self, apidef_file):
281 api = json.load(apidef_file)
282 return self._process_json(api)
285 def process_json_str(self, json_str):
286 api = json.loads(json_str)
287 return self._process_json(api)
290 def _process_json(api): # -> Tuple[Dict, Dict]
295 for t in api["enums"]:
296 t[0] = "vl_api_" + t[0] + "_t"
297 types[t[0]] = {"type": "enum", "data": t}
301 for t in api["enumflags"]:
302 t[0] = "vl_api_" + t[0] + "_t"
303 types[t[0]] = {"type": "enum", "data": t}
307 for t in api["unions"]:
308 t[0] = "vl_api_" + t[0] + "_t"
309 types[t[0]] = {"type": "union", "data": t}
314 for t in api["types"]:
315 t[0] = "vl_api_" + t[0] + "_t"
316 types[t[0]] = {"type": "type", "data": t}
321 for t, v in api["aliases"].items():
322 types["vl_api_" + t + "_t"] = {"type": "alias", "data": v}
327 services.update(api["services"])
334 for k, v in types.items():
336 if not vpp_get_type(k):
337 if v["type"] == "enum":
339 VPPEnumType(t[0], t[1:])
342 if not vpp_get_type(k):
343 if v["type"] == "enumflag":
345 VPPEnumFlagType(t[0], t[1:])
348 elif v["type"] == "union":
350 VPPUnionType(t[0], t[1:])
353 elif v["type"] == "type":
358 elif v["type"] == "alias":
363 if len(unresolved) == 0:
366 raise VPPValueError("Unresolved type definitions {}".format(unresolved))
370 for m in api["messages"]:
372 messages[m[0]] = VPPMessage(m[0], m[1:])
373 except VPPNotImplementedError:
375 logger.error("Not implemented error for {}".format(m[0]))
378 return messages, services
384 This class provides the APIs to VPP. The APIs are loaded
385 from provided .api.json files and makes functions accordingly.
386 These functions are documented in the VPP .api files, as they
387 are dynamically created.
389 Additionally, VPP can send callback messages; this class
390 provides a means to register a callback function to receive
391 these messages in a background thread.
395 VPPApiError = VPPApiError
396 VPPRuntimeError = VPPRuntimeError
397 VPPValueError = VPPValueError
398 VPPNotImplementedError = VPPNotImplementedError
399 VPPIOError = VPPIOError
411 server_address="/run/vpp/api.sock",
413 """Create a VPP API object.
415 apifiles is a list of files containing API
416 descriptions that will be loaded - methods will be
417 dynamically created reflecting these APIs. If not
418 provided this will load the API files from VPP's
419 default install location.
421 logger, if supplied, is the logging logger object to log to.
422 loglevel, if supplied, is the log level this logger is set
423 to report at (from the loglevels in the logging module).
426 logger = logging.getLogger(
427 "{}.{}".format(__name__, self.__class__.__name__)
429 if loglevel is not None:
430 logger.setLevel(loglevel)
437 self.header = VPPType("header", [["u16", "msgid"], ["u32", "client_index"]])
439 self.event_callback = None
440 self.message_queue = queue.Queue()
441 self.read_timeout = read_timeout
442 self.async_thread = async_thread
443 self.event_thread = None
444 self.testmode = testmode
445 self.server_address = server_address
446 self._apifiles = apifiles
450 # Pick up API definitions from default directory
452 if isinstance(self.apidir, list):
454 for d in self.apidir:
455 apifiles += VPPApiJSONFiles.find_api_files(d)
457 apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
458 except (RuntimeError, VPPApiError):
459 # In test mode we don't care that we can't find the API files
463 raise VPPRuntimeError
465 for file in apifiles:
466 with open(file) as apidef_file:
467 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
468 self.messages.update(m)
469 self.services.update(s)
471 self.apifiles = apifiles
474 if len(self.messages) == 0 and not testmode:
475 raise VPPValueError(1, "Missing JSON message definitions")
476 if not (verify_enum_hint(VppEnum.vl_api_address_family_t)):
477 raise VPPRuntimeError("Invalid address family hints. " "Cannot continue.")
479 self.transport = VppTransport(
480 self, read_timeout=read_timeout, server_address=server_address
482 # Make sure we allow VPP to clean up the message rings.
483 atexit.register(vpp_atexit, weakref.ref(self))
485 add_convenience_methods()
487 def get_function(self, name):
488 return getattr(self._api, name)
491 """Multiprocessing-safe provider of unique context IDs."""
494 self.context = mp.Value(ctypes.c_uint, 0)
495 self.lock = mp.Lock()
498 """Get a new unique (or, at least, not recently used) context."""
500 self.context.value += 1
501 return self.context.value
503 get_context = ContextId()
505 def get_type(self, name):
506 return vpp_get_type(name)
510 if not hasattr(self, "_api"):
511 raise VPPApiError("Not connected, api definitions not available")
514 def make_function(self, msg, i, multipart, do_async):
518 return self._call_vpp_async(i, msg, **kwargs)
523 return self._call_vpp(i, msg, multipart, **kwargs)
525 f.__name__ = str(msg.name)
526 f.__doc__ = ", ".join(
527 ["%s %s" % (msg.fieldtypes[j], k) for j, k in enumerate(msg.fields)]
533 def _register_functions(self, do_async=False):
534 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
535 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
536 self._api = VppApiDynamicMethodHolder()
537 for name, msg in self.messages.items():
538 n = name + "_" + msg.crc[2:]
539 i = self.transport.get_msg_index(n)
541 self.id_msgdef[i] = msg
542 self.id_names[i] = name
544 # Create function for client side messages.
545 if name in self.services:
546 f = self.make_function(msg, i, self.services[name], do_async)
547 setattr(self._api, name, FuncWrapper(f))
549 self.logger.debug("No such message type or failed CRC checksum: %s", n)
551 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, do_async):
552 pfx = chroot_prefix.encode("utf-8") if chroot_prefix else None
554 rv = self.transport.connect(name, pfx, msg_handler, rx_qlen, do_async)
556 raise VPPIOError(2, "Connect failed")
557 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
558 self._register_functions(do_async=do_async)
560 # Initialise control ping
561 crc = self.messages["control_ping"].crc
562 self.control_ping_index = self.transport.get_msg_index(
563 ("control_ping" + "_" + crc[2:])
565 self.control_ping_msgdef = self.messages["control_ping"]
566 if self.async_thread:
567 self.event_thread = threading.Thread(target=self.thread_msg_handler)
568 self.event_thread.daemon = True
569 self.event_thread.start()
571 self.event_thread = None
574 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
577 name - the name of the client.
578 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
579 do_async - if true, messages are sent without waiting for a reply
580 rx_qlen - the length of the VPP message receive queue between
583 msg_handler = self.transport.get_callback(do_async)
584 return self.connect_internal(
585 name, msg_handler, chroot_prefix, rx_qlen, do_async
588 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
589 """Attach to VPP in synchronous mode. Application must poll for events.
591 name - the name of the client.
592 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
593 rx_qlen - the length of the VPP message receive queue between
597 return self.connect_internal(name, None, chroot_prefix, rx_qlen, do_async=False)
599 def disconnect(self):
600 """Detach from VPP."""
601 rv = self.transport.disconnect()
602 if self.event_thread is not None:
603 self.message_queue.put("terminate event thread")
606 def msg_handler_sync(self, msg):
607 """Process an incoming message from VPP in sync mode.
609 The message may be a reply or it may be an async notification.
611 r = self.decode_incoming_msg(msg)
615 # If we have a context, then use the context to find any
616 # request waiting for a reply
618 if hasattr(r, "context") and r.context > 0:
622 # No context -> async notification that we feed to the callback
623 self.message_queue.put_nowait(r)
625 raise VPPIOError(2, "RPC reply message received in event handler")
627 def has_context(self, msg):
632 "header_with_context",
633 [["u16", "msgid"], ["u32", "client_index"], ["u32", "context"]],
636 (i, ci, context), size = header.unpack(msg, 0)
637 if self.id_names[i] == "rx_thread_exit":
641 # Decode message and returns a tuple.
643 msgobj = self.id_msgdef[i]
644 if "context" in msgobj.field_by_name and context >= 0:
648 def decode_incoming_msg(self, msg, no_type_conversion=False):
650 logger.warning("vpp_api.read failed")
653 (i, ci), size = self.header.unpack(msg, 0)
654 if self.id_names[i] == "rx_thread_exit":
658 # Decode message and returns a tuple.
660 msgobj = self.id_msgdef[i]
662 raise VPPIOError(2, "Reply message undefined")
664 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
667 def msg_handler_async(self, msg):
668 """Process a message from VPP in async mode.
670 In async mode, all messages are returned to the callback.
672 r = self.decode_incoming_msg(msg)
676 msgname = type(r).__name__
678 if self.event_callback:
679 self.event_callback(msgname, r)
681 def _control_ping(self, context):
682 """Send a ping command."""
683 self._call_vpp_async(
684 self.control_ping_index, self.control_ping_msgdef, context=context
687 def validate_args(self, msg, kwargs):
688 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
690 raise VPPValueError("Invalid argument {} to {}".format(list(d), msg.name))
692 def _add_stat(self, name, ms):
693 if not name in self.stats:
694 self.stats[name] = {"max": ms, "count": 1, "avg": ms}
696 if ms > self.stats[name]["max"]:
697 self.stats[name]["max"] = ms
698 self.stats[name]["count"] += 1
699 n = self.stats[name]["count"]
700 self.stats[name]["avg"] = self.stats[name]["avg"] * (n - 1) / n + ms / n
703 s = "\n=== API PAPI STATISTICS ===\n"
704 s += "{:<30} {:>4} {:>6} {:>6}\n".format("message", "cnt", "avg", "max")
705 for n in sorted(self.stats.items(), key=lambda v: v[1]["avg"], reverse=True):
706 s += "{:<30} {:>4} {:>6.2f} {:>6.2f}\n".format(
707 n[0], n[1]["count"], n[1]["avg"], n[1]["max"]
711 def get_field_options(self, msg, fld_name):
712 # when there is an option, the msgdef has 3 elements.
713 # ['u32', 'ring_size', {'default': 1024}]
714 for _def in self.messages[msg].msgdef:
715 if isinstance(_def, list) and len(_def) == 3 and _def[1] == fld_name:
718 def _call_vpp(self, i, msgdef, service, **kwargs):
719 """Given a message, send the message and await a reply.
721 msgdef - the message packing definition
722 i - the message type index
723 multipart - True if the message returns multiple
725 context - context number - chosen at random if not
727 The remainder of the kwargs are the arguments to the API call.
729 The return value is the message or message array containing
730 the response. It will raise an IOError exception if there was
731 no response within the timeout window.
734 if "context" not in kwargs:
735 context = self.get_context()
736 kwargs["context"] = context
738 context = kwargs["context"]
739 kwargs["_vl_msg_id"] = i
741 no_type_conversion = kwargs.pop("_no_type_conversion", False)
742 timeout = kwargs.pop("_timeout", None)
745 if self.transport.socket_index:
746 kwargs["client_index"] = self.transport.socket_index
747 except AttributeError:
749 self.validate_args(msgdef, kwargs)
751 s = "Calling {}({})".format(
752 msgdef.name, ",".join(["{!r}:{!r}".format(k, v) for k, v in kwargs.items()])
756 b = msgdef.pack(kwargs)
757 self.transport.suspend()
759 self.transport.write(b)
761 msgreply = service["reply"]
762 stream = True if "stream" in service else False
764 if "stream_msg" in service:
765 # New service['reply'] = _reply and service['stream_message'] = _details
766 stream_message = service["stream_msg"]
769 # Old service['reply'] = _details
770 stream_message = msgreply
771 msgreply = "control_ping_reply"
773 # Send a ping after the request - we use its response
774 # to detect that we have seen all results.
775 self._control_ping(context)
777 # Block until we get a reply.
780 r = self.read_blocking(no_type_conversion, timeout)
782 raise VPPIOError(2, "VPP API client: read failed")
783 msgname = type(r).__name__
784 if context not in r or r.context == 0 or context != r.context:
785 # Message being queued
786 self.message_queue.put_nowait(r)
788 if msgname != msgreply and (stream and (msgname != stream_message)):
789 print("REPLY MISMATCH", msgreply, msgname, stream_message, stream)
793 if msgname == msgreply:
794 if modern: # Return both reply and list
800 self.transport.resume()
802 s = "Return value: {!r}".format(r)
807 self._add_stat(msgdef.name, (te - ts) * 1000)
810 def _call_vpp_async(self, i, msg, **kwargs):
811 """Given a message, send the message and return the context.
813 msgdef - the message packing definition
814 i - the message type index
815 context - context number - chosen at random if not
817 The remainder of the kwargs are the arguments to the API call.
819 The reply message(s) will be delivered later to the registered callback.
820 The returned context will help with assigning which call
821 the reply belongs to.
823 if "context" not in kwargs:
824 context = self.get_context()
825 kwargs["context"] = context
827 context = kwargs["context"]
829 if self.transport.socket_index:
830 kwargs["client_index"] = self.transport.socket_index
831 except AttributeError:
832 kwargs["client_index"] = 0
833 kwargs["_vl_msg_id"] = i
836 self.transport.write(b)
839 def read_blocking(self, no_type_conversion=False, timeout=None):
840 """Get next received message from transport within timeout, decoded.
842 Note that notifications have context zero
843 and are not put into receive queue (at least for socket transport),
844 use async_thread with registered callback for processing them.
846 If no message appears in the queue within timeout, return None.
848 Optionally, type conversion can be skipped,
849 as some of conversions are into less precise types.
851 When r is the return value of this, the caller can get message name as:
852 msgname = type(r).__name__
853 and context number (type long) as:
856 :param no_type_conversion: If false, type conversions are applied.
857 :type no_type_conversion: bool
858 :returns: Decoded message, or None if no message (within timeout).
859 :rtype: Whatever VPPType.unpack returns, depends on no_type_conversion.
860 :raises VppTransportShmemIOError if timed out.
862 msg = self.transport.read(timeout=timeout)
865 return self.decode_incoming_msg(msg, no_type_conversion)
867 def register_event_callback(self, callback):
868 """Register a callback for async messages.
870 This will be called for async notifications in sync mode,
871 and all messages in async mode. In sync mode, replies to
872 requests will not come here.
874 callback is a fn(msg_type_name, msg_type) that will be
875 called when a message comes in. While this function is
876 executing, note that (a) you are in a background thread and
877 may wish to use threading.Lock to protect your datastructures,
878 and (b) message processing from VPP will stop (so if you take
879 a long while about it you may provoke reply timeouts or cause
880 VPP to fill the RX buffer). Passing None will disable the
883 self.event_callback = callback
885 def thread_msg_handler(self):
886 """Python thread calling the user registered message handler.
888 This is to emulate the old style event callback scheme. Modern
889 clients should provide their own thread to poll the event
893 r = self.message_queue.get()
894 if r == "terminate event thread":
896 msgname = type(r).__name__
897 if self.event_callback:
898 self.event_callback(msgname, r)
900 def validate_message_table(self, namecrctable):
901 """Take a dictionary of name_crc message names
902 and returns an array of missing messages"""
905 for name_crc in namecrctable:
906 i = self.transport.get_msg_index(name_crc)
908 missing_table.append(name_crc)
911 def dump_message_table(self):
912 """Return VPPs API message table as name_crc dictionary"""
913 return self.transport.message_table
915 def dump_message_table_filtered(self, msglist):
916 """Return VPPs API message table as name_crc dictionary,
917 filtered by message name list."""
919 replies = [self.services[n]["reply"] for n in msglist]
920 message_table_filtered = {}
921 for name in msglist + replies:
922 for k, v in self.transport.message_table.items():
923 if k.startswith(name):
924 message_table_filtered[k] = v
926 return message_table_filtered
930 "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, "
931 "logger=%s, read_timeout=%s, "
932 "server_address='%s'>"
943 def details_iter(self, f, **kwargs):
946 kwargs["cursor"] = cursor
947 rv, details = f(**kwargs)
950 if rv.retval == 0 or rv.retval != -165: