- try:
- reply = papi_fn(**command["api_args"])
- except IOError as err:
- # Ocassionally an error happens, try reconnect.
- logger.warn("Reconnect after error: {err!r}".format(
- err=err))
- self.vpp_instance.disconnect()
- # Testing showes immediate reconnect fails.
- time.sleep(1)
- self.vpp_instance.connect_sync("csit_socket")
- logger.trace("Reconnected.")
- reply = papi_fn(**command["api_args"])
- except (AttributeError, IOError) as err:
- raise_from(AssertionError(err_msg), err, level="INFO")
- # *_dump commands return list of objects, convert, ordinary reply.
- if not isinstance(reply, list):
- reply = [reply]
- for item in reply:
- dict_item = dictize(item)
- if "retval" in dict_item.keys():
- # *_details messages do not contain retval.
- retval = dict_item["retval"]
- if retval != 0:
- # TODO: What exactly to log and raise here?
- err = AssertionError("Retval {rv!r}".format(rv=retval))
- # Lowering log level, some retval!=0 calls are expected.
- # TODO: Expose level argument so callers can decide?
- raise_from(AssertionError(err_msg), err, level="DEBUG")
- replies.append(dict_item)
- return replies
-
-
-class PapiExecutor(object):
+ # Send the command maybe followed by control ping.
+ main_context = papi_fn(**command["api_args"])
+ if single_reply:
+ replies.append(PapiSocketExecutor._read(vpp_instance))
+ else:
+ ping_context = control_ping_fn()
+ # Receive the replies.
+ while 1:
+ reply = PapiSocketExecutor._read(vpp_instance)
+ if reply is None:
+ raise RuntimeError(
+ f"{err_msg}\nSync PAPI timed out."
+ )
+ if reply.context == ping_context:
+ break
+ if reply.context != main_context:
+ raise RuntimeError(
+ f"{err_msg}\nUnexpected context: {reply!r}"
+ )
+ replies.append(reply)
+ except (AttributeError, IOError, struct.error) as err:
+ # TODO: Add retry if it is still needed.
+ raise AssertionError(f"{err_msg}") from err
+ finally:
+ # Discard any unprocessed replies to avoid secondary failures.
+ PapiSocketExecutor._drain(vpp_instance, err_msg)
+ # Process replies for this command.
+ for reply in replies:
+ self.crc_checker.check_api_name(reply.__class__.__name__)
+ dictized_reply = dictize_and_check_retval(reply, err_msg)
+ ret_list.append(dictized_reply)
+ return ret_list
+
+ def _execute_async(self, local_list, err_msg):
+ """Read, process and return replies.
+
+ The messages were already sent by .add() in this mode,
+ local_list is used just so we know how many replies to read.
+
+ Beware: It is not clear what to do when socket read fails
+ in the middle of async processing.
+
+ The implementation assumes each command results in exactly one reply,
+ there is no reordering in either commands nor replies,
+ and context numbers increase one by one (and are matching for replies).
+
+ To speed processing up, reply CRC values are not checked.
+
+ The current implementation does not limit the number of messages
+ in-flight, we rely on VPP PAPI background thread to move replies
+ from socket to queue fast enough.
+
+ :param local_list: The list of PAPI commands to get replies for.
+ :param err_msg: The message used if the PAPI command(s) execution fails.
+ :type local_list: list
+ :type err_msg: str
+ :returns: Papi replies parsed into a dict-like object, with fields
+ according to API (possibly including retval).
+ :rtype: List[UserDict]
+ :raises RuntimeError: If the replies are not all correct.
+ """
+ vpp_instance = self.get_connected_client()
+ ret_list = list()
+ try:
+ for index, _ in enumerate(local_list):
+ # Blocks up to timeout.
+ reply = PapiSocketExecutor._read(vpp_instance)
+ if reply is None:
+ time_msg = f"PAPI async timeout: idx {index}"
+ raise RuntimeError(f"{err_msg}\n{time_msg}")
+ ret_list.append(dictize_and_check_retval(reply, err_msg))
+ finally:
+ # Discard any unprocessed replies to avoid secondary failures.
+ PapiSocketExecutor._drain(vpp_instance, err_msg)
+ return ret_list
+
+
+class Disconnector:
+ """Class for holding a single keyword."""
+
+ @staticmethod
+ def disconnect_all_papi_connections():
+ """Disconnect all connected client instances, tear down the SSH tunnels.
+
+ Also remove the local sockets by deleting the temporary directory.
+ Put disconnected client instances to the reuse list.
+ The added attributes are not cleaned up,
+ as their values will get overwritten on next connect.
+
+ Call this method just before killing/restarting all VPP instances.
+
+ This could be a class method of PapiSocketExecutor.
+ But Robot calls methods on instances, and it would be weird
+ to give node argument for constructor in import.
+ Also, as we have a class of the same name as the module,
+ the keywords defined on module level are not accessible.
+ """
+ cls = PapiSocketExecutor
+ # Iterate over copy of entries so deletions do not mess with iterator.
+ for key in list(cls.conn_cache.keys()):
+ cls.disconnect_by_key(key)
+
+
+class PapiExecutor: