"""Interface util library."""
-from socket import AF_INET, AF_INET6, inet_ntop, inet_pton
+from socket import AF_INET, AF_INET6, inet_ntop
from time import sleep
from enum import IntEnum
-from ipaddress import IPv4Address, IPv6Address
-from ipaddress import AddressValueError, NetmaskValueError
+from ipaddress import ip_address
from robot.api import logger
from resources.libraries.python.Constants import Constants
from resources.libraries.python.CpuUtils import CpuUtils
from resources.libraries.python.DUTSetup import DUTSetup
-from resources.libraries.python.IPUtil import convert_ipv4_netmask_prefix
from resources.libraries.python.L2Util import L2Util
from resources.libraries.python.PapiExecutor import PapiExecutor
from resources.libraries.python.parsers.JsonParser import JsonParser
:returns: Integer representation of PCI address.
:rtype: int
"""
-
pci = list(pci_str.split(':')[0:2])
pci.extend(pci_str.split(':')[2].split('.'))
:returns: SW interface index.
:rtype: int
"""
-
try:
sw_if_index = int(interface)
except ValueError:
:raises ValueError: If the state of interface is unexpected.
:raises ValueError: If the node has an unknown node type.
"""
-
if if_type == 'key':
if isinstance(interface, basestring):
sw_if_index = Topology.get_interface_sw_index(node, interface)
:raises ValueError: If the node type is "DUT".
:raises ValueError: If the node has an unknown node type.
"""
-
if node['type'] == NodeType.DUT:
raise ValueError('Node {}: Setting Ethernet MTU for interface '
'on DUT nodes not supported', node['host'])
:type node: dict
:returns: Nothing.
"""
-
for ifc in node['interfaces']:
InterfaceUtil.set_interface_ethernet_mtu(node, ifc, 1500)
:type interface: str or int
:type mtu: int
"""
-
if isinstance(interface, basestring):
sw_if_index = Topology.get_interface_sw_index(node, interface)
else:
host=node['host'])
args = dict(sw_if_index=sw_if_index,
mtu=int(mtu))
- with PapiExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args).get_replies(err_msg).\
- verify_reply(err_msg=err_msg)
+ try:
+ with PapiExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_replies(err_msg).\
+ verify_reply(err_msg=err_msg)
+ except AssertionError as err:
+ # TODO: Make failure tolerance optional.
+ logger.debug("Setting MTU failed. Expected?\n{err}".format(
+ err=err))
@staticmethod
def vpp_set_interfaces_mtu_on_node(node, mtu=9200):
:type node: dict
:type mtu: int
"""
-
for interface in node['interfaces']:
InterfaceUtil.vpp_set_interface_mtu(node, interface, mtu)
:type nodes: dict
:type mtu: int
"""
-
for node in nodes.values():
if node['type'] == NodeType.DUT:
InterfaceUtil.vpp_set_interfaces_mtu_on_node(node, mtu)
:raises RuntimeError: If any interface is not in link-up state after
defined number of retries.
"""
-
for _ in xrange(0, retries):
not_ready = list()
out = InterfaceUtil.vpp_get_interface_data(node)
:type retries: int
:returns: Nothing.
"""
-
for node in nodes.values():
if node['type'] == NodeType.DUT:
InterfaceUtil.vpp_node_interfaces_ready_wait(node, retries)
:raises TypeError: if the data type of interface is neither basestring
nor int.
"""
-
if interface is not None:
if isinstance(interface, basestring):
param = 'interface_name'
:returns: Name of the given interface.
:rtype: str
"""
-
if_data = InterfaceUtil.vpp_get_interface_data(node, sw_if_index)
if if_data['sup_sw_if_index'] != if_data['sw_if_index']:
if_data = InterfaceUtil.vpp_get_interface_data(
:returns: Name of the given interface.
:rtype: str
"""
-
if_data = InterfaceUtil.vpp_get_interface_data(node, interface_name)
return if_data.get('sw_if_index')
:returns: MAC address.
:rtype: str
"""
-
if_data = InterfaceUtil.vpp_get_interface_data(node, interface)
if if_data['sup_sw_if_index'] != if_data['sw_if_index']:
if_data = InterfaceUtil.vpp_get_interface_data(
return if_data.get('l2_address')
- @staticmethod
- def vpp_get_interface_ip_addresses(node, interface, ip_version):
- """Get list of IP addresses from an interface on a VPP node.
-
- TODO: Move to IPUtils
-
- :param node: VPP node to get data from.
- :param interface: Name of an interface on the VPP node.
- :param ip_version: IP protocol version (ipv4 or ipv6).
- :type node: dict
- :type interface: str
- :type ip_version: str
- :returns: List of dictionaries, each containing IP address, subnet
- prefix length and also the subnet mask for ipv4 addresses.
- Note: A single interface may have multiple IP addresses assigned.
- :rtype: list
- """
-
- try:
- sw_if_index = Topology.convert_interface_reference(
- node, interface, 'sw_if_index')
- except RuntimeError:
- if isinstance(interface, basestring):
- sw_if_index = InterfaceUtil.get_sw_if_index(node, interface)
- else:
- raise
-
- is_ipv6 = 1 if ip_version == 'ipv6' else 0
-
- cmd = 'ip_address_dump'
- cmd_reply = 'ip_address_details'
- args = dict(sw_if_index=sw_if_index,
- is_ipv6=is_ipv6)
- err_msg = 'Failed to get L2FIB dump on host {host}'.format(
- host=node['host'])
- with PapiExecutor(node) as papi_exec:
- papi_resp = papi_exec.add(cmd, **args).get_dump(err_msg)
-
- data = list()
- for item in papi_resp.reply[0]['api_reply']:
- item[cmd_reply]['ip'] = inet_ntop(AF_INET6, item[cmd_reply]['ip']) \
- if is_ipv6 else inet_ntop(AF_INET, item[cmd_reply]['ip'][0:4])
- data.append(item[cmd_reply])
-
- if ip_version == 'ipv4':
- for item in data:
- item['netmask'] = convert_ipv4_netmask_prefix(
- item['prefix_length'])
- return data
-
@staticmethod
def tg_set_interface_driver(node, pci_addr, driver):
"""Set interface driver on the TG node.
:param node: Node selected from DICT__nodes.
:type node: dict
"""
-
interface_list = InterfaceUtil.vpp_get_interface_data(node)
interface_dict = dict()
for ifc in interface_list:
:raises RuntimeError: if it is unable to create VLAN subinterface on the
node.
"""
-
iface_key = Topology.get_interface_by_name(node, interface)
sw_if_index = Topology.get_interface_sw_index(node, iface_key)
:raises RuntimeError: if it is unable to create VxLAN interface on the
node.
"""
-
- try:
- src_address = IPv6Address(unicode(source_ip))
- dst_address = IPv6Address(unicode(destination_ip))
- af_inet = AF_INET6
- is_ipv6 = 1
- except (AddressValueError, NetmaskValueError):
- src_address = IPv4Address(unicode(source_ip))
- dst_address = IPv4Address(unicode(destination_ip))
- af_inet = AF_INET
- is_ipv6 = 0
+ src_address = ip_address(unicode(source_ip))
+ dst_address = ip_address(unicode(destination_ip))
cmd = 'vxlan_add_del_tunnel'
args = dict(is_add=1,
- is_ipv6=is_ipv6,
+ is_ipv6=1 if src_address.version == 6 else 0,
instance=Constants.BITWISE_NON_ZERO,
- src_address=inet_pton(af_inet, str(src_address)),
- dst_address=inet_pton(af_inet, str(dst_address)),
+ src_address=src_address.packed,
+ dst_address=dst_address.packed,
mcast_sw_if_index=Constants.BITWISE_NON_ZERO,
encap_vrf_id=0,
decap_next_index=Constants.BITWISE_NON_ZERO,
:raises TypeError: if the data type of interface is neither basestring
nor int.
"""
-
if interface is not None:
sw_if_index = InterfaceUtil.get_interface_index(node, interface)
else:
:returns: List of dictionaries with all vhost-user interfaces.
:rtype: list
"""
-
cmd = 'sw_interface_vhost_user_dump'
cmd_reply = 'sw_interface_vhost_user_details'
err_msg = 'Failed to get vhost-user dump on host {host}'.format(
a List of dictionaries containing all TAP data for the given node.
:rtype: dict or list
"""
-
cmd = 'sw_interface_tap_v2_dump'
cmd_reply = 'sw_interface_tap_v2_details'
err_msg = 'Failed to get TAP dump on host {host}'.format(
:rtype: tuple
:raises RuntimeError: If it is not possible to create sub-interface.
"""
-
subif_types = type_subif.split()
cmd = 'create_subif'
:rtype: tuple
:raises RuntimeError: If unable to create GRE tunnel interface.
"""
-
cmd = 'gre_tunnel_add_del'
tunnel = dict(type=0,
instance=Constants.BITWISE_NON_ZERO,
:raises RuntimeError: If it is not possible to create loopback on the
node.
"""
-
cmd = 'create_loopback'
args = dict(mac_address=0)
err_msg = 'Failed to create loopback interface on host {host}'.format(
:raises RuntimeError: If it is not possible to create bond interface on
the node.
"""
-
cmd = 'bond_create'
args = dict(id=int(Constants.BITWISE_NON_ZERO),
use_custom_mac=0 if mac is None else 1,
:type sw_if_idx: int
:type ifc_pfx: str
"""
-
if_key = Topology.add_new_port(node, ifc_pfx)
if ifc_name and sw_if_idx is None:
:raises RuntimeError: If it is not possible to create AVF interface on
the node.
"""
-
cmd = 'avf_create'
args = dict(pci_addr=InterfaceUtil.pci_to_int(vf_pci_addr),
enable_elog=0,
:raises RuntimeError: If it is not possible to enslave physical
interface to bond interface on the node.
"""
-
cmd = 'bond_enslave'
args = dict(
sw_if_index=Topology.get_interface_sw_index(node, interface),
:type node: dict
:type details: bool
"""
-
cmd = 'sw_interface_bond_dump'
cmd_reply = 'sw_interface_bond_details'
err_msg = 'Failed to get bond interface dump on host {host}'.format(
:type ip_version: str
:type table_index: int
"""
-
cmd = 'input_acl_set_interface'
args = dict(
sw_if_index=InterfaceUtil.get_interface_index(node, interface),
return papi_resp
- @staticmethod
- def get_interface_vrf_table(node, interface, ip_version='ipv4'):
- """Get vrf ID for the given interface.
-
- TODO: Move to proper IP library when implementing CSIT-1459.
-
- :param node: VPP node.
- :param interface: Name or sw_if_index of a specific interface.
- :type node: dict
- :param ip_version: IP protocol version (ipv4 or ipv6).
- :type interface: str or int
- :type ip_version: str
- :returns: vrf ID of the specified interface.
- :rtype: int
- """
- if isinstance(interface, basestring):
- sw_if_index = InterfaceUtil.get_sw_if_index(node, interface)
- else:
- sw_if_index = interface
-
- is_ipv6 = 1 if ip_version == 'ipv6' else 0
-
- cmd = 'sw_interface_get_table'
- args = dict(sw_if_index=sw_if_index,
- is_ipv6=is_ipv6)
- err_msg = 'Failed to get VRF id assigned to interface {ifc}'.format(
- ifc=interface)
- with PapiExecutor(node) as papi_exec:
- papi_resp = papi_exec.add(cmd, **args).get_replies(err_msg). \
- verify_reply(err_msg=err_msg)
-
- return papi_resp['vrf_id']
-
@staticmethod
def get_sw_if_index(node, interface_name):
"""Get sw_if_index for the given interface from actual interface dump.
:returns: sw_if_index of the given interface.
:rtype: str
"""
-
interface_data = InterfaceUtil.vpp_get_interface_data(
node, interface=interface_name)
return interface_data.get('sw_if_index')
interfaces.
:rtype: dict or list
"""
-
if interface_name is not None:
sw_if_index = InterfaceUtil.get_interface_index(
node, interface_name)
vxlan_gpe_data=data))
return data
- @staticmethod
- def vpp_ip_source_check_setup(node, if_name):
- """Setup Reverse Path Forwarding source check on interface.
-
- TODO: Move to proper IP library when implementing CSIT-1459.
-
- :param node: Node to setup RPF source check.
- :param if_name: Interface name to setup RPF source check.
- :type node: dict
- :type if_name: str
- """
-
- cmd = 'ip_source_check_interface_add_del'
- args = dict(
- sw_if_index=InterfaceUtil.get_interface_index(node, if_name),
- is_add=1,
- loose=0)
- err_msg = 'Failed to enable source check on interface {ifc}'.format(
- ifc=if_name)
- with PapiExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args).get_replies(err_msg). \
- verify_reply(err_msg=err_msg)
-
@staticmethod
def assign_interface_to_fib_table(node, interface, table_id, ipv6=False):
"""Assign VPP interface to specific VRF/FIB table.
:type table_id: int
:type ipv6: bool
"""
-
cmd = 'sw_interface_set_table'
args = dict(
sw_if_index=InterfaceUtil.get_interface_index(node, interface),
:rtype: list
:raises RuntimeError: If a reason preventing initialization is found.
"""
- ssh = SSH()
- ssh.connect(node)
-
# Read PCI address and driver.
pf_pci_addr = Topology.get_interface_pci_addr(node, ifc_key)
pf_mac_addr = Topology.get_interface_mac(node, ifc_key).split(":")
return vf_ifc_keys
- @staticmethod
- def vpp_create_multiple_vxlan_ipv4_tunnels(
- node, node_vxlan_if, node_vlan_if, op_node, op_node_if,
- n_tunnels, vni_start, src_ip_start, dst_ip_start, ip_step, ip_limit,
- bd_id_start):
- """Create multiple VXLAN tunnel interfaces and VLAN sub-interfaces on
- VPP node.
-
- Put each pair of VXLAN tunnel interface and VLAN sub-interface to
- separate bridge-domain.
-
- :param node: VPP node to create VXLAN tunnel interfaces.
- :param node_vxlan_if: VPP node interface key to create VXLAN tunnel
- interfaces.
- :param node_vlan_if: VPP node interface key to create VLAN
- sub-interface.
- :param op_node: Opposite VPP node for VXLAN tunnel interfaces.
- :param op_node_if: Opposite VPP node interface key for VXLAN tunnel
- interfaces.
- :param n_tunnels: Number of tunnel interfaces to create.
- :param vni_start: VNI start ID.
- :param src_ip_start: VXLAN tunnel source IP address start.
- :param dst_ip_start: VXLAN tunnel destination IP address start.
- :param ip_step: IP address incremental step.
- :param ip_limit: IP address limit.
- :param bd_id_start: Bridge-domain ID start.
- :type node: dict
- :type node_vxlan_if: str
- :type node_vlan_if: str
- :type op_node: dict
- :type op_node_if: str
- :type n_tunnels: int
- :type vni_start: int
- :type src_ip_start: str
- :type dst_ip_start: str
- :type ip_step: int
- :type ip_limit: str
- :type bd_id_start: int
- """
- # configure IPs, create VXLAN interfaces and VLAN sub-interfaces
- vxlan_count = InterfaceUtil.vpp_create_vxlan_and_vlan_interfaces(
- node, node_vxlan_if, node_vlan_if, n_tunnels, vni_start,
- src_ip_start, dst_ip_start, ip_step, ip_limit)
-
- # update topology with VXLAN interfaces and VLAN sub-interfaces data
- # and put interfaces up
- InterfaceUtil.vpp_put_vxlan_and_vlan_interfaces_up(
- node, vxlan_count, node_vlan_if)
-
- # configure bridge domains, ARPs and routes
- InterfaceUtil.vpp_put_vxlan_and_vlan_interfaces_to_bridge_domain(
- node, node_vxlan_if, vxlan_count, op_node, op_node_if, dst_ip_start,
- ip_step, bd_id_start)
-
- @staticmethod
- def vpp_create_vxlan_and_vlan_interfaces(
- node, node_vxlan_if, node_vlan_if, vxlan_count, vni_start,
- src_ip_start, dst_ip_start, ip_step, ip_limit):
- """
- Configure IPs, create VXLAN interfaces and VLAN sub-interfaces on VPP
- node.
-
- :param node: VPP node.
- :param node_vxlan_if: VPP node interface key to create VXLAN tunnel
- interfaces.
- :param node_vlan_if: VPP node interface key to create VLAN
- sub-interface.
- :param vxlan_count: Number of tunnel interfaces to create.
- :param vni_start: VNI start ID.
- :param src_ip_start: VXLAN tunnel source IP address start.
- :param dst_ip_start: VXLAN tunnel destination IP address start.
- :param ip_step: IP address incremental step.
- :param ip_limit: IP address limit.
- :type node: dict
- :type node_vxlan_if: str
- :type node_vlan_if: str
- :type vxlan_count: int
- :type vni_start: int
- :type src_ip_start: str
- :type dst_ip_start: str
- :type ip_step: int
- :type ip_limit: str
- :returns: Number of created VXLAN interfaces.
- :rtype: int
- """
-
- try:
- src_address_start = IPv6Address(unicode(src_ip_start))
- dst_address_start = IPv6Address(unicode(dst_ip_start))
- ip_address_limit = IPv6Address(unicode(ip_limit))
- af_inet = AF_INET6
- is_ipv6 = 1
- except (AddressValueError, NetmaskValueError):
- src_address_start = IPv4Address(unicode(src_ip_start))
- dst_address_start = IPv4Address(unicode(dst_ip_start))
- ip_address_limit = IPv4Address(unicode(ip_limit))
- af_inet = AF_INET
- is_ipv6 = 0
-
- with PapiExecutor(node) as papi_exec:
- for i in xrange(0, vxlan_count):
- src_ip = src_address_start + i * ip_step
- dst_ip = dst_address_start + i * ip_step
- if src_ip > ip_address_limit or dst_ip > ip_address_limit:
- logger.warn("Can't do more iterations - IP address limit "
- "has been reached.")
- vxlan_count = i
- break
- cmd = 'sw_interface_add_del_address'
- args = dict(
- sw_if_index=InterfaceUtil.get_interface_index(
- node, node_vxlan_if),
- is_add=1,
- is_ipv6=0,
- del_all=0,
- address_length=128 if is_ipv6 else 32,
- address=inet_pton(af_inet, str(src_ip)))
- papi_exec.add(cmd, **args)
- cmd = 'vxlan_add_del_tunnel'
- args = dict(
- is_add=1,
- is_ipv6=0,
- instance=Constants.BITWISE_NON_ZERO,
- src_address=inet_pton(af_inet, str(src_ip)),
- dst_address=inet_pton(af_inet, str(dst_ip)),
- mcast_sw_if_index=Constants.BITWISE_NON_ZERO,
- encap_vrf_id=0,
- decap_next_index=Constants.BITWISE_NON_ZERO,
- vni=int(vni_start)+i)
- papi_exec.add(cmd, **args)
- cmd = 'create_vlan_subif'
- args = dict(
- sw_if_index=InterfaceUtil.get_interface_index(
- node, node_vlan_if),
- vlan_id=i+1)
- papi_exec.add(cmd, **args)
- papi_exec.get_replies().verify_replies()
-
- return vxlan_count
-
- @staticmethod
- def vpp_put_vxlan_and_vlan_interfaces_up(node, vxlan_count, node_vlan_if):
- """
- Update topology with VXLAN interfaces and VLAN sub-interfaces data
- and put interfaces up.
-
- :param node: VPP node.
- :param vxlan_count: Number of tunnel interfaces.
- :param node_vlan_if: VPP node interface key where VLAN sub-interfaces
- have been created.
- :type node: dict
- :type vxlan_count: int
- :type node_vlan_if: str
- """
-
- if_data = InterfaceUtil.vpp_get_interface_data(node)
-
- with PapiExecutor(node) as papi_exec:
- for i in xrange(0, vxlan_count):
- vxlan_subif_key = Topology.add_new_port(node, 'vxlan_tunnel')
- vxlan_subif_name = 'vxlan_tunnel{nr}'.format(nr=i)
- vxlan_found = False
- vxlan_subif_idx = None
- vlan_subif_key = Topology.add_new_port(node, 'vlan_subif')
- vlan_subif_name = '{if_name}.{vlan}'.format(
- if_name=Topology.get_interface_name(
- node, node_vlan_if), vlan=i+1)
- vlan_found = False
- vlan_idx = None
- for data in if_data:
- if not vxlan_found \
- and data['interface_name'] == vxlan_subif_name:
- vxlan_subif_idx = data['sw_if_index']
- vxlan_found = True
- elif not vlan_found \
- and data['interface_name'] == vlan_subif_name:
- vlan_idx = data['sw_if_index']
- vlan_found = True
- if vxlan_found and vlan_found:
- break
- Topology.update_interface_sw_if_index(
- node, vxlan_subif_key, vxlan_subif_idx)
- Topology.update_interface_name(
- node, vxlan_subif_key, vxlan_subif_name)
- cmd = 'sw_interface_set_flags'
- args1 = dict(sw_if_index=vxlan_subif_idx,
- admin_up_down=1)
- Topology.update_interface_sw_if_index(
- node, vlan_subif_key, vlan_idx)
- Topology.update_interface_name(
- node, vlan_subif_key, vlan_subif_name)
- args2 = dict(sw_if_index=vlan_idx,
- admin_up_down=1)
- papi_exec.add(cmd, **args1).add(cmd, **args2)
- papi_exec.get_replies().verify_replies()
-
- @staticmethod
- def vpp_put_vxlan_and_vlan_interfaces_to_bridge_domain(
- node, node_vxlan_if, vxlan_count, op_node, op_node_if, dst_ip_start,
- ip_step, bd_id_start):
- """
- Configure ARPs and routes for VXLAN interfaces and put each pair of
- VXLAN tunnel interface and VLAN sub-interface to separate bridge-domain.
-
- :param node: VPP node.
- :param node_vxlan_if: VPP node interface key where VXLAN tunnel
- interfaces have been created.
- :param vxlan_count: Number of tunnel interfaces.
- :param op_node: Opposite VPP node for VXLAN tunnel interfaces.
- :param op_node_if: Opposite VPP node interface key for VXLAN tunnel
- interfaces.
- :param dst_ip_start: VXLAN tunnel destination IP address start.
- :param ip_step: IP address incremental step.
- :param bd_id_start: Bridge-domain ID start.
- :type node: dict
- :type node_vxlan_if: str
- :type vxlan_count: int
- :type op_node: dict
- :type op_node_if:
- :type dst_ip_start: str
- :type ip_step: int
- :type bd_id_start: int
- """
-
- try:
- dst_address_start = IPv6Address(unicode(dst_ip_start))
- af_inet = AF_INET6
- is_ipv6 = 1
- except (AddressValueError, NetmaskValueError):
- dst_address_start = IPv4Address(unicode(dst_ip_start))
- af_inet = AF_INET
- is_ipv6 = 0
-
- with PapiExecutor(node) as papi_exec:
- for i in xrange(0, vxlan_count):
- dst_ip = dst_address_start + i * ip_step
- neighbor = dict(
- sw_if_index=Topology.get_interface_sw_index(
- node, node_vxlan_if),
- flags=0,
- mac_address=str(
- Topology.get_interface_mac(op_node, op_node_if)),
- ip_address=str(dst_ip))
- cmd = 'ip_neighbor_add_del'
- args = dict(
- is_add=1,
- neighbor=neighbor)
- papi_exec.add(cmd, **args)
- cmd = 'ip_add_del_route'
- args = dict(
- next_hop_sw_if_index=Topology.get_interface_sw_index(
- node, node_vxlan_if),
- table_id=0,
- is_add=1,
- is_ipv6=is_ipv6,
- next_hop_weight=1,
- next_hop_proto=1 if is_ipv6 else 0,
- dst_address_length=128 if is_ipv6 else 32,
- dst_address=inet_pton(af_inet, str(dst_ip)),
- next_hop_address=inet_pton(af_inet, str(dst_ip)))
- papi_exec.add(cmd, **args)
- cmd = 'sw_interface_set_l2_bridge'
- args = dict(
- rx_sw_if_index=Topology.get_interface_sw_index(
- node, 'vxlan_tunnel{nr}'.format(nr=i+1)),
- bd_id=int(bd_id_start+i),
- shg=0,
- port_type=0,
- enable=1)
- papi_exec.add(cmd, **args)
- args = dict(
- rx_sw_if_index=Topology.get_interface_sw_index(
- node, 'vlan_subif{nr}'.format(nr=i+1)),
- bd_id=int(bd_id_start+i),
- shg=0,
- port_type=0,
- enable=1)
- papi_exec.add(cmd, **args)
- papi_exec.get_replies().verify_replies()
-
@staticmethod
def vpp_sw_interface_rx_placement_dump(node):
"""Dump VPP interface RX placement on node.
:returns: Thread mapping information as a list of dictionaries.
:rtype: list
"""
-
cmd = 'sw_interface_rx_placement_dump'
cmd_reply = 'sw_interface_rx_placement_details'
err_msg = "Failed to run '{cmd}' PAPI command on host {host}!".format(
:raises RuntimeError: If failed to run command on host or if no API
reply received.
"""
-
cmd = 'sw_interface_set_rx_placement'
err_msg = "Failed to set interface RX placement to worker on host " \
"{host}!".format(host=node['host'])
"""
worker_id = 0
worker_cnt = len(VPPUtil.vpp_show_threads(node)) - 1
+ if not worker_cnt:
+ return
for placement in InterfaceUtil.vpp_sw_interface_rx_placement_dump(node):
for interface in node['interfaces'].values():
if placement['sw_if_index'] == interface['vpp_sw_index'] \