from resources.libraries.python.Constants import Constants
from resources.libraries.python.CpuUtils import CpuUtils
from resources.libraries.python.DUTSetup import DUTSetup
-from resources.libraries.python.IPUtil import convert_ipv4_netmask_prefix
from resources.libraries.python.L2Util import L2Util
from resources.libraries.python.PapiExecutor import PapiExecutor
from resources.libraries.python.parsers.JsonParser import JsonParser
:returns: Integer representation of PCI address.
:rtype: int
"""
-
pci = list(pci_str.split(':')[0:2])
pci.extend(pci_str.split(':')[2].split('.'))
:returns: SW interface index.
:rtype: int
"""
-
try:
sw_if_index = int(interface)
except ValueError:
:raises ValueError: If the state of interface is unexpected.
:raises ValueError: If the node has an unknown node type.
"""
-
if if_type == 'key':
if isinstance(interface, basestring):
sw_if_index = Topology.get_interface_sw_index(node, interface)
:raises ValueError: If the node type is "DUT".
:raises ValueError: If the node has an unknown node type.
"""
-
if node['type'] == NodeType.DUT:
raise ValueError('Node {}: Setting Ethernet MTU for interface '
'on DUT nodes not supported', node['host'])
:type node: dict
:returns: Nothing.
"""
-
for ifc in node['interfaces']:
InterfaceUtil.set_interface_ethernet_mtu(node, ifc, 1500)
:type interface: str or int
:type mtu: int
"""
-
if isinstance(interface, basestring):
sw_if_index = Topology.get_interface_sw_index(node, interface)
else:
:type node: dict
:type mtu: int
"""
-
for interface in node['interfaces']:
InterfaceUtil.vpp_set_interface_mtu(node, interface, mtu)
:type nodes: dict
:type mtu: int
"""
-
for node in nodes.values():
if node['type'] == NodeType.DUT:
InterfaceUtil.vpp_set_interfaces_mtu_on_node(node, mtu)
:raises RuntimeError: If any interface is not in link-up state after
defined number of retries.
"""
-
for _ in xrange(0, retries):
not_ready = list()
out = InterfaceUtil.vpp_get_interface_data(node)
:type retries: int
:returns: Nothing.
"""
-
for node in nodes.values():
if node['type'] == NodeType.DUT:
InterfaceUtil.vpp_node_interfaces_ready_wait(node, retries)
:raises TypeError: if the data type of interface is neither basestring
nor int.
"""
-
if interface is not None:
if isinstance(interface, basestring):
param = 'interface_name'
:returns: Name of the given interface.
:rtype: str
"""
-
if_data = InterfaceUtil.vpp_get_interface_data(node, sw_if_index)
if if_data['sup_sw_if_index'] != if_data['sw_if_index']:
if_data = InterfaceUtil.vpp_get_interface_data(
:returns: Name of the given interface.
:rtype: str
"""
-
if_data = InterfaceUtil.vpp_get_interface_data(node, interface_name)
return if_data.get('sw_if_index')
:returns: MAC address.
:rtype: str
"""
-
if_data = InterfaceUtil.vpp_get_interface_data(node, interface)
if if_data['sup_sw_if_index'] != if_data['sw_if_index']:
if_data = InterfaceUtil.vpp_get_interface_data(
return if_data.get('l2_address')
- @staticmethod
- def vpp_get_interface_ip_addresses(node, interface, ip_version):
- """Get list of IP addresses from an interface on a VPP node.
-
- TODO: Move to IPUtils
-
- :param node: VPP node to get data from.
- :param interface: Name of an interface on the VPP node.
- :param ip_version: IP protocol version (ipv4 or ipv6).
- :type node: dict
- :type interface: str
- :type ip_version: str
- :returns: List of dictionaries, each containing IP address, subnet
- prefix length and also the subnet mask for ipv4 addresses.
- Note: A single interface may have multiple IP addresses assigned.
- :rtype: list
- """
-
- try:
- sw_if_index = Topology.convert_interface_reference(
- node, interface, 'sw_if_index')
- except RuntimeError:
- if isinstance(interface, basestring):
- sw_if_index = InterfaceUtil.get_sw_if_index(node, interface)
- else:
- raise
-
- is_ipv6 = 1 if ip_version == 'ipv6' else 0
-
- cmd = 'ip_address_dump'
- cmd_reply = 'ip_address_details'
- args = dict(sw_if_index=sw_if_index,
- is_ipv6=is_ipv6)
- err_msg = 'Failed to get L2FIB dump on host {host}'.format(
- host=node['host'])
- with PapiExecutor(node) as papi_exec:
- papi_resp = papi_exec.add(cmd, **args).get_dump(err_msg)
-
- data = list()
- for item in papi_resp.reply[0]['api_reply']:
- item[cmd_reply]['ip'] = inet_ntop(AF_INET6, item[cmd_reply]['ip']) \
- if is_ipv6 else inet_ntop(AF_INET, item[cmd_reply]['ip'][0:4])
- data.append(item[cmd_reply])
-
- if ip_version == 'ipv4':
- for item in data:
- item['netmask'] = convert_ipv4_netmask_prefix(
- item['prefix_length'])
- return data
-
@staticmethod
def tg_set_interface_driver(node, pci_addr, driver):
"""Set interface driver on the TG node.
:param node: Node selected from DICT__nodes.
:type node: dict
"""
-
interface_list = InterfaceUtil.vpp_get_interface_data(node)
interface_dict = dict()
for ifc in interface_list:
:raises RuntimeError: if it is unable to create VLAN subinterface on the
node.
"""
-
iface_key = Topology.get_interface_by_name(node, interface)
sw_if_index = Topology.get_interface_sw_index(node, iface_key)
:raises RuntimeError: if it is unable to create VxLAN interface on the
node.
"""
-
try:
src_address = IPv6Address(unicode(source_ip))
dst_address = IPv6Address(unicode(destination_ip))
:raises TypeError: if the data type of interface is neither basestring
nor int.
"""
-
if interface is not None:
sw_if_index = InterfaceUtil.get_interface_index(node, interface)
else:
:returns: List of dictionaries with all vhost-user interfaces.
:rtype: list
"""
-
cmd = 'sw_interface_vhost_user_dump'
cmd_reply = 'sw_interface_vhost_user_details'
err_msg = 'Failed to get vhost-user dump on host {host}'.format(
a List of dictionaries containing all TAP data for the given node.
:rtype: dict or list
"""
-
cmd = 'sw_interface_tap_v2_dump'
cmd_reply = 'sw_interface_tap_v2_details'
err_msg = 'Failed to get TAP dump on host {host}'.format(
:rtype: tuple
:raises RuntimeError: If it is not possible to create sub-interface.
"""
-
subif_types = type_subif.split()
cmd = 'create_subif'
:rtype: tuple
:raises RuntimeError: If unable to create GRE tunnel interface.
"""
-
cmd = 'gre_tunnel_add_del'
tunnel = dict(type=0,
instance=Constants.BITWISE_NON_ZERO,
:raises RuntimeError: If it is not possible to create loopback on the
node.
"""
-
cmd = 'create_loopback'
args = dict(mac_address=0)
err_msg = 'Failed to create loopback interface on host {host}'.format(
:raises RuntimeError: If it is not possible to create bond interface on
the node.
"""
-
cmd = 'bond_create'
args = dict(id=int(Constants.BITWISE_NON_ZERO),
use_custom_mac=0 if mac is None else 1,
:type sw_if_idx: int
:type ifc_pfx: str
"""
-
if_key = Topology.add_new_port(node, ifc_pfx)
if ifc_name and sw_if_idx is None:
:raises RuntimeError: If it is not possible to create AVF interface on
the node.
"""
-
cmd = 'avf_create'
args = dict(pci_addr=InterfaceUtil.pci_to_int(vf_pci_addr),
enable_elog=0,
:raises RuntimeError: If it is not possible to enslave physical
interface to bond interface on the node.
"""
-
cmd = 'bond_enslave'
args = dict(
sw_if_index=Topology.get_interface_sw_index(node, interface),
:type node: dict
:type details: bool
"""
-
cmd = 'sw_interface_bond_dump'
cmd_reply = 'sw_interface_bond_details'
err_msg = 'Failed to get bond interface dump on host {host}'.format(
:type ip_version: str
:type table_index: int
"""
-
cmd = 'input_acl_set_interface'
args = dict(
sw_if_index=InterfaceUtil.get_interface_index(node, interface),
return papi_resp
- @staticmethod
- def get_interface_vrf_table(node, interface, ip_version='ipv4'):
- """Get vrf ID for the given interface.
-
- TODO: Move to proper IP library when implementing CSIT-1459.
-
- :param node: VPP node.
- :param interface: Name or sw_if_index of a specific interface.
- :type node: dict
- :param ip_version: IP protocol version (ipv4 or ipv6).
- :type interface: str or int
- :type ip_version: str
- :returns: vrf ID of the specified interface.
- :rtype: int
- """
- if isinstance(interface, basestring):
- sw_if_index = InterfaceUtil.get_sw_if_index(node, interface)
- else:
- sw_if_index = interface
-
- is_ipv6 = 1 if ip_version == 'ipv6' else 0
-
- cmd = 'sw_interface_get_table'
- args = dict(sw_if_index=sw_if_index,
- is_ipv6=is_ipv6)
- err_msg = 'Failed to get VRF id assigned to interface {ifc}'.format(
- ifc=interface)
- with PapiExecutor(node) as papi_exec:
- papi_resp = papi_exec.add(cmd, **args).get_replies(err_msg). \
- verify_reply(err_msg=err_msg)
-
- return papi_resp['vrf_id']
-
@staticmethod
def get_sw_if_index(node, interface_name):
"""Get sw_if_index for the given interface from actual interface dump.
:returns: sw_if_index of the given interface.
:rtype: str
"""
-
interface_data = InterfaceUtil.vpp_get_interface_data(
node, interface=interface_name)
return interface_data.get('sw_if_index')
interfaces.
:rtype: dict or list
"""
-
if interface_name is not None:
sw_if_index = InterfaceUtil.get_interface_index(
node, interface_name)
vxlan_gpe_data=data))
return data
- @staticmethod
- def vpp_ip_source_check_setup(node, if_name):
- """Setup Reverse Path Forwarding source check on interface.
-
- TODO: Move to proper IP library when implementing CSIT-1459.
-
- :param node: Node to setup RPF source check.
- :param if_name: Interface name to setup RPF source check.
- :type node: dict
- :type if_name: str
- """
-
- cmd = 'ip_source_check_interface_add_del'
- args = dict(
- sw_if_index=InterfaceUtil.get_interface_index(node, if_name),
- is_add=1,
- loose=0)
- err_msg = 'Failed to enable source check on interface {ifc}'.format(
- ifc=if_name)
- with PapiExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args).get_replies(err_msg). \
- verify_reply(err_msg=err_msg)
-
@staticmethod
def assign_interface_to_fib_table(node, interface, table_id, ipv6=False):
"""Assign VPP interface to specific VRF/FIB table.
:type table_id: int
:type ipv6: bool
"""
-
cmd = 'sw_interface_set_table'
args = dict(
sw_if_index=InterfaceUtil.get_interface_index(node, interface),
:type osi_layer: str
:returns: Virtual Function topology interface keys.
:rtype: list
+ :raises RuntimeError: If a reason preventing initialization is found.
"""
- ssh = SSH()
- ssh.connect(node)
-
# Read PCI address and driver.
pf_pci_addr = Topology.get_interface_pci_addr(node, ifc_key)
pf_mac_addr = Topology.get_interface_mac(node, ifc_key).split(":")
uio_driver = Topology.get_uio_driver(node)
kernel_driver = Topology.get_interface_driver(node, ifc_key)
+ if kernel_driver != "i40e":
+ raise RuntimeError(
+ "AVF needs i40e driver, not {driver} at node {host} ifc {ifc}"\
+ .format(driver=kernel_driver, host=node["host"], ifc=ifc_key))
current_driver = DUTSetup.get_pci_dev_driver(
node, pf_pci_addr.replace(':', r'\:'))
:returns: Number of created VXLAN interfaces.
:rtype: int
"""
-
try:
src_address_start = IPv6Address(unicode(src_ip_start))
dst_address_start = IPv6Address(unicode(dst_ip_start))
:type vxlan_count: int
:type node_vlan_if: str
"""
-
if_data = InterfaceUtil.vpp_get_interface_data(node)
with PapiExecutor(node) as papi_exec:
:type ip_step: int
:type bd_id_start: int
"""
-
try:
dst_address_start = IPv6Address(unicode(dst_ip_start))
af_inet = AF_INET6
:returns: Thread mapping information as a list of dictionaries.
:rtype: list
"""
-
cmd = 'sw_interface_rx_placement_dump'
cmd_reply = 'sw_interface_rx_placement_details'
err_msg = "Failed to run '{cmd}' PAPI command on host {host}!".format(
:raises RuntimeError: If failed to run command on host or if no API
reply received.
"""
-
cmd = 'sw_interface_set_rx_placement'
err_msg = "Failed to set interface RX placement to worker on host " \
"{host}!".format(host=node['host'])