from robot.api import logger
-from resources.libraries.python.PapiExecutor import PapiExecutor
+from resources.libraries.python.PapiExecutor import PapiSocketExecutor
from resources.libraries.python.topology import NodeType, Topology
from resources.libraries.python.InterfaceUtil import InterfaceUtil
-class VhostUser(object):
+class VhostUser:
"""Vhost-user interfaces L1 library."""
@staticmethod
- def _sw_interface_vhost_user_dump(node):
- """Get the Vhost-user dump on the given node.
-
- :param node: Given node to get Vhost dump from.
- :type node: dict
- :returns: List of Vhost-user interfaces data extracted from Papi
- response.
- :rtype: list
- """
- cmd = "sw_interface_vhost_user_dump"
- with PapiExecutor(node) as papi_exec:
- details = papi_exec.add(cmd).get_details()
-
- for vhost in details:
- vhost["interface_name"] = vhost["interface_name"].rstrip('\x00')
- vhost["sock_filename"] = vhost["sock_filename"].rstrip('\x00')
-
- logger.debug("VhostUser details:\n{details}".format(details=details))
-
- return details
-
- @staticmethod
- def vpp_create_vhost_user_interface(node, socket):
+ def vpp_create_vhost_user_interface(
+ node, socket, is_server=False, enable_gso=False):
"""Create Vhost-user interface on VPP node.
:param node: Node to create Vhost-user interface on.
:param socket: Vhost-user interface socket path.
+ :param is_server: Server side of connection. Default: False
+ :param enable_gso: Generic segmentation offloading. Default: False
:type node: dict
:type socket: str
+ :type is_server: bool
+ :type enable_gso: bool
:returns: SW interface index.
:rtype: int
"""
- cmd = 'create_vhost_user_if'
- err_msg = 'Failed to create Vhost-user interface on host {host}'.format(
- host=node['host'])
+ cmd = u"create_vhost_user_if"
+ err_msg = f"Failed to create Vhost-user interface " \
+ f"on host {node[u'host']}"
args = dict(
- sock_filename=str(socket)
+ is_server=bool(is_server),
+ sock_filename=str(socket),
+ enable_gso=bool(enable_gso)
)
- with PapiExecutor(node) as papi_exec:
+
+ with PapiSocketExecutor(node) as papi_exec:
sw_if_index = papi_exec.add(cmd, **args).get_sw_if_index(err_msg)
# Update the Topology:
- if_key = Topology.add_new_port(node, 'vhost')
+ if_key = Topology.add_new_port(node, u"vhost")
Topology.update_interface_sw_if_index(node, if_key, sw_if_index)
ifc_name = InterfaceUtil.vpp_get_interface_name(node, sw_if_index)
:returns: Interface name or None if not found.
:rtype: str
"""
- for interface in node['interfaces'].values():
- if interface.get('socket') == socket:
- return interface.get('name')
+ for interface in node[u"interfaces"].values():
+ if interface.get(u"socket") == socket:
+ return interface.get(u"name")
return None
@staticmethod
:returns: l2_address of the given interface.
:rtype: str
"""
-
return InterfaceUtil.vpp_get_interface_mac(node, sw_if_index)
- @staticmethod
- def vpp_show_vhost(node):
- """Get Vhost-user data for the given node.
-
- :param node: VPP node to get interface data from.
- :type node: dict
- """
- VhostUser._sw_interface_vhost_user_dump(node)
-
@staticmethod
def show_vpp_vhost_on_all_duts(nodes):
"""Show Vhost-user on all DUTs.
:type nodes: dict
"""
for node in nodes.values():
- if node['type'] == NodeType.DUT:
- VhostUser.vpp_show_vhost(node)
+ if node[u"type"] == NodeType.DUT:
+ VhostUser.vhost_user_dump(node)
+
+ @staticmethod
+ def vhost_user_dump(node):
+ """Get vhost-user data for the given node.
+
+ :param node: VPP node to get interface data from.
+ :type node: dict
+ :returns: List of dictionaries with all vhost-user interfaces.
+ :rtype: list
+ """
+ cmd = u"sw_interface_vhost_user_dump"
+ err_msg = f"Failed to get vhost-user dump on host {node['host']}"
+
+ with PapiSocketExecutor(node) as papi_exec:
+ details = papi_exec.add(cmd).get_details(err_msg)
+
+ logger.debug(f"Vhost-user details:\n{details}")
+ return details