"""Interface util library."""
-from socket import AF_INET, AF_INET6, inet_ntop
from time import sleep
from enum import IntEnum
from resources.libraries.python.VPPUtil import VPPUtil
-class LinkBondLoadBalance(IntEnum):
- """Link bonding load balance."""
- L2 = 0
- L34 = 1
- L23 = 2
+class InterfaceStatusFlags(IntEnum):
+ """Interface status flags."""
+ IF_STATUS_API_FLAG_ADMIN_UP = 1
+ IF_STATUS_API_FLAG_LINK_UP = 2
+
+
+class MtuProto(IntEnum):
+ """MTU protocol."""
+ MTU_PROTO_API_L3 = 0
+ MTU_PROTO_API_IP4 = 1
+ MTU_PROTO_API_IP6 = 2
+ MTU_PROTO_API_MPLS = 3
+ MTU_PROTO_API_N = 4
+
+
+class LinkDuplex(IntEnum):
+ """Link duplex"""
+ LINK_DUPLEX_API_UNKNOWN = 0
+ LINK_DUPLEX_API_HALF = 1
+ LINK_DUPLEX_API_FULL = 2
+
+
+class SubInterfaceFlags(IntEnum):
+ """Sub-interface flags."""
+ SUB_IF_API_FLAG_NO_TAGS = 1
+ SUB_IF_API_FLAG_ONE_TAG = 2
+ SUB_IF_API_FLAG_TWO_TAGS = 4
+ SUB_IF_API_FLAG_DOT1AD = 8
+ SUB_IF_API_FLAG_EXACT_MATCH = 16
+ SUB_IF_API_FLAG_DEFAULT = 32
+ SUB_IF_API_FLAG_OUTER_VLAN_ID_ANY = 64
+ SUB_IF_API_FLAG_INNER_VLAN_ID_ANY = 128
+ SUB_IF_API_FLAG_DOT1AH = 256
+
+
+class RxMode(IntEnum):
+ """RX mode"""
+ RX_MODE_API_UNKNOWN = 0
+ RX_MODE_API_POLLING = 1
+ RX_MODE_API_INTERRUPT = 2
+ RX_MODE_API_ADAPTIVE = 3
+ RX_MODE_API_DEFAULT = 4
+
+
+class IfType(IntEnum):
+ """Interface type"""
+ # A hw interface
+ IF_API_TYPE_HARDWARE = 0
+ # A sub-interface
+ IF_API_TYPE_SUB = 1
+ IF_API_TYPE_P2P = 2
+ IF_API_TYPE_PIPE = 3
+
+
+class LinkBondLoadBalanceAlgo(IntEnum):
+ """Link bonding load balance algorithm."""
+ BOND_API_LB_ALGO_L2 = 0
+ BOND_API_LB_ALGO_L34 = 1
+ BOND_API_LB_ALGO_L23 = 2
+ BOND_API_LB_ALGO_RR = 3
+ BOND_API_LB_ALGO_BC = 4
+ BOND_API_LB_ALGO_AB = 5
class LinkBondMode(IntEnum):
- """Link bonding load balance."""
- ROUND_ROBIN = 1
- ACTIVE_BACKUP = 2
- XOR = 3
- BROADCAST = 4
- LACP = 5
+ """Link bonding mode."""
+ BOND_API_MODE_ROUND_ROBIN = 1
+ BOND_API_MODE_ACTIVE_BACKUP = 2
+ BOND_API_MODE_XOR = 3
+ BOND_API_MODE_BROADCAST = 4
+ BOND_API_MODE_LACP = 5
class InterfaceUtil(object):
if node['type'] == NodeType.DUT:
if state == 'up':
- admin_up_down = 1
+ flags = InterfaceStatusFlags.IF_STATUS_API_FLAG_ADMIN_UP.value
elif state == 'down':
- admin_up_down = 0
+ flags = 0
else:
raise ValueError('Unexpected interface state: {state}'.format(
state=state))
cmd = 'sw_interface_set_flags'
err_msg = 'Failed to set interface state on host {host}'.format(
host=node['host'])
- args = dict(sw_if_index=sw_if_index,
- admin_up_down=admin_up_down)
+ args = dict(
+ sw_if_index=int(sw_if_index),
+ flags=flags)
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
elif node['type'] == NodeType.TG or node['type'] == NodeType.VM:
not_ready = list()
out = InterfaceUtil.vpp_get_interface_data(node)
for interface in out:
- if interface.get('admin_up_down') == 1:
- if interface.get('link_up_down') != 1:
- not_ready.append(interface.get('interface_name'))
+ if interface.get('flags') == 1:
+ not_ready.append(interface.get('interface_name'))
if not not_ready:
break
else:
- logger.debug('Interfaces still in link-down state:\n{ifs} '
+ logger.debug('Interfaces still not in link-up state:\n{ifs} '
'\nWaiting...'.format(ifs=not_ready))
sleep(1)
else:
:raises TypeError: if the data type of interface is neither basestring
nor int.
"""
+ def process_if_dump(if_dump):
+ """Process interface dump.
+
+ :param if_dump: Interface dump.
+ :type if_dump: dict
+ :returns: Processed interface dump.
+ :rtype: dict
+ """
+ if_dump['l2_address'] = str(if_dump['l2_address'])
+ if_dump['b_dmac'] = str(if_dump['b_dmac'])
+ if_dump['b_smac'] = str(if_dump['b_smac'])
+ if_dump['flags'] = if_dump['flags'].value
+ if_dump['type'] = if_dump['type'].value
+ if_dump['link_duplex'] = if_dump['link_duplex'].value
+ if_dump['sub_if_flags'] = if_dump['sub_if_flags'].value \
+ if hasattr(if_dump['sub_if_flags'], 'value') \
+ else int(if_dump['sub_if_flags'])
+
+ return if_dump
+
if interface is not None:
if isinstance(interface, basestring):
param = 'interface_name'
param = ''
cmd = 'sw_interface_dump'
- args = dict(name_filter_valid=0,
- name_filter='')
+ args = dict(
+ name_filter_valid=False,
+ name_filter=''
+ )
err_msg = 'Failed to get interface dump on host {host}'.format(
host=node['host'])
with PapiSocketExecutor(node) as papi_exec:
details = papi_exec.add(cmd, **args).get_details(err_msg)
-
- def process_if_dump(if_dump):
- """Process interface dump.
-
- :param if_dump: Interface dump.
- :type if_dump: dict
- :returns: Processed interface dump.
- :rtype: dict
- """
- if_dump['interface_name'] = if_dump['interface_name'].rstrip('\x00')
- if_dump['tag'] = if_dump['tag'].rstrip('\x00')
- if_dump['l2_address'] = L2Util.bin_to_mac(if_dump['l2_address'])
- if_dump['b_dmac'] = L2Util.bin_to_mac(if_dump['b_dmac'])
- if_dump['b_smac'] = L2Util.bin_to_mac(if_dump['b_smac'])
- return if_dump
+ logger.debug('Received data:\n{d!r}'.format(d=details))
data = list() if interface is None else dict()
- for if_dump in details:
+ for dump in details:
if interface is None:
- data.append(process_if_dump(if_dump))
- elif str(if_dump.get(param)).rstrip('\x00') == str(interface):
- data = process_if_dump(if_dump)
+ data.append(process_if_dump(dump))
+ elif str(dump.get(param)).rstrip('\x00') == str(interface):
+ data = process_if_dump(dump)
break
logger.debug('Interface data:\n{if_data}'.format(if_data=data))
"""Create VLAN sub-interface on node.
:param node: Node to add VLAN subinterface on.
- :param interface: Interface name on which create VLAN subinterface.
+ :param interface: Interface name or index on which create VLAN
+ subinterface.
:param vlan: VLAN ID of the subinterface to be created.
:type node: dict
- :type interface: str
+ :type interface: str on int
:type vlan: int
:returns: Name and index of created subinterface.
:rtype: tuple
:raises RuntimeError: if it is unable to create VLAN subinterface on the
- node.
+ node or interface cannot be converted.
"""
- iface_key = Topology.get_interface_by_name(node, interface)
- sw_if_index = Topology.get_interface_sw_index(node, iface_key)
+ sw_if_index = InterfaceUtil.get_interface_index(node, interface)
cmd = 'create_vlan_subif'
- args = dict(sw_if_index=sw_if_index,
- vlan_id=int(vlan))
+ args = dict(
+ sw_if_index=sw_if_index,
+ vlan_id=int(vlan)
+ )
err_msg = 'Failed to create VLAN sub-interface on host {host}'.format(
host=node['host'])
with PapiSocketExecutor(node) as papi_exec:
return sw_if_index
+ @staticmethod
+ def set_vxlan_bypass(node, interface=None):
+ """Add the 'ip4-vxlan-bypass' graph node for a given interface.
+
+ By adding the IPv4 vxlan-bypass graph node to an interface, the node
+ checks for and validate input vxlan packet and bypass ip4-lookup,
+ ip4-local, ip4-udp-lookup nodes to speedup vxlan packet forwarding.
+ This node will cause extra overhead to for non-vxlan packets which is
+ kept at a minimum.
+
+ :param node: Node where to set VXLAN bypass.
+ :param interface: Numeric index or name string of a specific interface.
+ :type node: dict
+ :type interface: int or str
+ :raises RuntimeError: if it failed to set VXLAN bypass on interface.
+ """
+ sw_if_index = InterfaceUtil.get_interface_index(node, interface)
+
+ cmd = 'sw_interface_set_vxlan_bypass'
+ args = dict(is_ipv6=0,
+ sw_if_index=sw_if_index,
+ enable=1)
+ err_msg = 'Failed to set VXLAN bypass on interface on host {host}'.\
+ format(host=node['host'])
+ with PapiSocketExecutor(node) as papi_exec:
+ papi_exec.add(cmd, **args).get_replies(err_msg)
+
@staticmethod
def vxlan_dump(node, interface=None):
"""Get VxLAN data for the given interface.
:raises TypeError: if the data type of interface is neither basestring
nor int.
"""
- if interface is not None:
- sw_if_index = InterfaceUtil.get_interface_index(node, interface)
- else:
- sw_if_index = int(Constants.BITWISE_NON_ZERO)
-
- cmd = 'vxlan_tunnel_dump'
- args = dict(sw_if_index=sw_if_index)
- err_msg = 'Failed to get VXLAN dump on host {host}'.format(
- host=node['host'])
- with PapiSocketExecutor(node) as papi_exec:
- details = papi_exec.add(cmd, **args).get_details(err_msg)
-
def process_vxlan_dump(vxlan_dump):
"""Process vxlan dump.
"""
if vxlan_dump['is_ipv6']:
vxlan_dump['src_address'] = \
- inet_ntop(AF_INET6, vxlan_dump['src_address'])
+ ip_address(unicode(vxlan_dump['src_address']))
vxlan_dump['dst_address'] = \
- inet_ntop(AF_INET6, vxlan_dump['dst_address'])
+ ip_address(unicode(vxlan_dump['dst_address']))
else:
vxlan_dump['src_address'] = \
- inet_ntop(AF_INET, vxlan_dump['src_address'][0:4])
+ ip_address(unicode(vxlan_dump['src_address'][0:4]))
vxlan_dump['dst_address'] = \
- inet_ntop(AF_INET, vxlan_dump['dst_address'][0:4])
+ ip_address(unicode(vxlan_dump['dst_address'][0:4]))
return vxlan_dump
+ if interface is not None:
+ sw_if_index = InterfaceUtil.get_interface_index(node, interface)
+ else:
+ sw_if_index = int(Constants.BITWISE_NON_ZERO)
+
+ cmd = 'vxlan_tunnel_dump'
+ args = dict(sw_if_index=sw_if_index)
+ err_msg = 'Failed to get VXLAN dump on host {host}'.format(
+ host=node['host'])
+ with PapiSocketExecutor(node) as papi_exec:
+ details = papi_exec.add(cmd, **args).get_details(err_msg)
+
data = list() if interface is None else dict()
- for vxlan_dump in details:
+ for dump in details:
if interface is None:
- data.append(process_vxlan_dump(vxlan_dump))
- elif vxlan_dump['sw_if_index'] == sw_if_index:
- data = process_vxlan_dump(vxlan_dump)
+ data.append(process_vxlan_dump(dump))
+ elif dump['sw_if_index'] == sw_if_index:
+ data = process_vxlan_dump(dump)
break
logger.debug('VXLAN data:\n{vxlan_data}'.format(vxlan_data=data))
:returns: List of dictionaries with all vhost-user interfaces.
:rtype: list
"""
- cmd = 'sw_interface_vhost_user_dump'
- err_msg = 'Failed to get vhost-user dump on host {host}'.format(
- host=node['host'])
- with PapiSocketExecutor(node) as papi_exec:
- details = papi_exec.add(cmd).get_details(err_msg)
-
def process_vhost_dump(vhost_dump):
"""Process vhost dump.
vhost_dump['sock_filename'].rstrip('\x00')
return vhost_dump
- for vhost_dump in details:
- # In-place edits.
- process_vhost_dump(vhost_dump)
-
- logger.debug('Vhost-user details:\n{vhost_details}'.format(
- vhost_details=details))
- return details
-
- @staticmethod
- def tap_dump(node, name=None):
- """Get all TAP interface data from the given node, or data about
- a specific TAP interface.
-
- TODO: Move to Tap.py
-
- :param node: VPP node to get data from.
- :param name: Optional name of a specific TAP interface.
- :type node: dict
- :type name: str
- :returns: Dictionary of information about a specific TAP interface, or
- a List of dictionaries containing all TAP data for the given node.
- :rtype: dict or list
- """
- cmd = 'sw_interface_tap_v2_dump'
- err_msg = 'Failed to get TAP dump on host {host}'.format(
+ cmd = 'sw_interface_vhost_user_dump'
+ err_msg = 'Failed to get vhost-user dump on host {host}'.format(
host=node['host'])
with PapiSocketExecutor(node) as papi_exec:
details = papi_exec.add(cmd).get_details(err_msg)
- def process_tap_dump(tap_dump):
- """Process tap dump.
-
- :param tap_dump: Tap interface dump.
- :type tap_dump: dict
- :returns: Processed tap interface dump.
- :rtype: dict
- """
- tap_dump['dev_name'] = tap_dump['dev_name'].rstrip('\x00')
- tap_dump['host_if_name'] = tap_dump['host_if_name'].rstrip('\x00')
- tap_dump['host_namespace'] = \
- tap_dump['host_namespace'].rstrip('\x00')
- tap_dump['host_mac_addr'] = \
- L2Util.bin_to_mac(tap_dump['host_mac_addr'])
- tap_dump['host_ip4_addr'] = \
- inet_ntop(AF_INET, tap_dump['host_ip4_addr'])
- tap_dump['host_ip6_addr'] = \
- inet_ntop(AF_INET6, tap_dump['host_ip6_addr'])
- return tap_dump
-
- data = list() if name is None else dict()
- for tap_dump in details:
- if name is None:
- data.append(process_tap_dump(tap_dump))
- elif tap_dump.get('dev_name').rstrip('\x00') == name:
- data = process_tap_dump(tap_dump)
- break
+ for dump in details:
+ # In-place edits.
+ process_vhost_dump(dump)
- logger.debug('TAP data:\n{tap_data}'.format(tap_data=data))
- return data
+ logger.debug('Vhost-user details:\n{vhost_details}'.format(
+ vhost_details=details))
+ return details
@staticmethod
def create_subinterface(node, interface, sub_id, outer_vlan_id=None,
"""
subif_types = type_subif.split()
+ flags = 0
+ if 'no_tags' in subif_types:
+ flags = flags | SubInterfaceFlags.SUB_IF_API_FLAG_NO_TAGS
+ if 'one_tag' in subif_types:
+ flags = flags | SubInterfaceFlags.SUB_IF_API_FLAG_ONE_TAG
+ if 'two_tags' in subif_types:
+ flags = flags | SubInterfaceFlags.SUB_IF_API_FLAG_TWO_TAGS
+ if 'dot1ad' in subif_types:
+ flags = flags | SubInterfaceFlags.SUB_IF_API_FLAG_DOT1AD
+ if 'exact_match' in subif_types:
+ flags = flags | SubInterfaceFlags.SUB_IF_API_FLAG_EXACT_MATCH
+ if 'default_sub' in subif_types:
+ flags = flags | SubInterfaceFlags.SUB_IF_API_FLAG_DEFAULT
+ if type_subif == 'default_sub':
+ flags = flags | SubInterfaceFlags.SUB_IF_API_FLAG_INNER_VLAN_ID_ANY\
+ | SubInterfaceFlags.SUB_IF_API_FLAG_OUTER_VLAN_ID_ANY
+
cmd = 'create_subif'
args = dict(
sw_if_index=InterfaceUtil.get_interface_index(node, interface),
sub_id=int(sub_id),
- no_tags=1 if 'no_tags' in subif_types else 0,
- one_tag=1 if 'one_tag' in subif_types else 0,
- two_tags=1 if 'two_tags' in subif_types else 0,
- dot1ad=1 if 'dot1ad' in subif_types else 0,
- exact_match=1 if 'exact_match' in subif_types else 0,
- default_sub=1 if 'default_sub' in subif_types else 0,
- outer_vlan_id_any=1 if type_subif == 'default_sub' else 0,
- inner_vlan_id_any=1 if type_subif == 'default_sub' else 0,
+ sub_if_flags=flags.value if hasattr(flags, 'value') else int(flags),
outer_vlan_id=int(outer_vlan_id) if outer_vlan_id else 0,
- inner_vlan_id=int(inner_vlan_id) if inner_vlan_id else 0)
+ inner_vlan_id=int(inner_vlan_id) if inner_vlan_id else 0
+ )
err_msg = 'Failed to create sub-interface on host {host}'.format(
host=node['host'])
with PapiSocketExecutor(node) as papi_exec:
return ifc_name, sw_if_index
@staticmethod
- def vpp_create_loopback(node):
+ def vpp_create_loopback(node, mac=None):
"""Create loopback interface on VPP node.
:param node: Node to create loopback interface on.
+ :param mac: Optional MAC address for loopback interface.
:type node: dict
+ :type mac: str
:returns: SW interface index.
:rtype: int
:raises RuntimeError: If it is not possible to create loopback on the
node.
"""
cmd = 'create_loopback'
- args = dict(mac_address=0)
+ args = dict(mac_address=L2Util.mac_to_bin(mac) if mac else 0)
err_msg = 'Failed to create loopback interface on host {host}'.format(
host=node['host'])
with PapiSocketExecutor(node) as papi_exec:
Topology.update_interface_sw_if_index(node, if_key, sw_if_index)
ifc_name = InterfaceUtil.vpp_get_interface_name(node, sw_if_index)
Topology.update_interface_name(node, if_key, ifc_name)
+ if mac:
+ mac = InterfaceUtil.vpp_get_interface_mac(node, ifc_name)
+ Topology.update_interface_mac_address(node, if_key, mac)
return sw_if_index
the node.
"""
cmd = 'bond_create'
- args = dict(id=int(Constants.BITWISE_NON_ZERO),
- use_custom_mac=0 if mac is None else 1,
- mac_address=0 if mac is None else L2Util.mac_to_bin(mac),
- mode=getattr(LinkBondMode, '{md}'.format(
- md=mode.replace('-', '_').upper())).value,
- lb=0 if load_balance is None else getattr(
- LinkBondLoadBalance, '{lb}'.format(
- lb=load_balance.upper())).value)
+ args = dict(
+ id=int(Constants.BITWISE_NON_ZERO),
+ use_custom_mac=False if mac is None else True,
+ mac_address=L2Util.mac_to_bin(mac) if mac else None,
+ mode=getattr(LinkBondMode, 'BOND_API_MODE_{md}'.format(
+ md=mode.replace('-', '_').upper())).value,
+ lb=0 if load_balance is None else getattr(
+ LinkBondLoadBalanceAlgo, 'BOND_API_LB_ALGO_{lb}'.format(
+ lb=load_balance.upper())).value,
+ numa_only=False
+ )
err_msg = 'Failed to create bond interface on host {host}'.format(
host=node['host'])
with PapiSocketExecutor(node) as papi_exec:
sw_if_index = papi_exec.add(cmd, **args).get_sw_if_index(err_msg)
- InterfaceUtil.add_eth_interface(node, sw_if_index=sw_if_index,
- ifc_pfx='eth_bond')
+ InterfaceUtil.add_eth_interface(
+ node, sw_if_index=sw_if_index, ifc_pfx='eth_bond')
if_key = Topology.get_interface_by_sw_index(node, sw_if_index)
return if_key
args = dict(
sw_if_index=Topology.get_interface_sw_index(node, interface),
bond_sw_if_index=Topology.get_interface_sw_index(node, bond_if),
- is_passive=0,
- is_long_timeout=0)
+ is_passive=False,
+ is_long_timeout=False
+ )
err_msg = 'Failed to enslave physical interface {ifc} to bond ' \
'interface {bond} on host {host}'.format(ifc=interface,
bond=bond_if,
details = papi_exec.add(cmd).get_details(err_msg)
for bond in details:
- data += ('{b}\n'.format(b=bond['interface_name'].rstrip('\x00')))
- data += (' mode: {m}\n'.format(m=LinkBondMode(
- bond['mode']).name.lower()))
- data += (' load balance: {lb}\n'.format(lb=LinkBondLoadBalance(
- bond['lb']).name.lower()))
+ data += ('{b}\n'.format(b=bond['interface_name']))
+ data += (' mode: {m}\n'.format(
+ m=bond['mode'].name.replace('BOND_API_MODE_', '').lower()))
+ data += (' load balance: {lb}\n'.format(
+ lb=bond['lb'].name.replace('BOND_API_LB_ALGO_', '').lower()))
data += (' number of active slaves: {n}\n'.format(
n=bond['active_slaves']))
if verbose:
with PapiSocketExecutor(node) as papi_exec:
details = papi_exec.add(cmd, **args).get_details(err_msg)
- def process_slave_dump(slave_dump):
- """Process slave dump.
-
- :param slave_dump: Slave interface dump.
- :type slave_dump: dict
- :returns: Processed slave interface dump.
- :rtype: dict
- """
- slave_dump['interface_name'] = slave_dump['interface_name'].\
- rstrip('\x00')
- return slave_dump
-
- for slave_dump in details:
- # In-place edits.
- process_slave_dump(slave_dump)
-
logger.debug('Slave data:\n{slave_data}'.format(slave_data=details))
return details
interfaces.
:rtype: dict or list
"""
- if interface_name is not None:
- sw_if_index = InterfaceUtil.get_interface_index(
- node, interface_name)
- else:
- sw_if_index = int(Constants.BITWISE_NON_ZERO)
-
- cmd = 'vxlan_gpe_tunnel_dump'
- args = dict(sw_if_index=sw_if_index)
- err_msg = 'Failed to get VXLAN-GPE dump on host {host}'.format(
- host=node['host'])
- with PapiSocketExecutor(node) as papi_exec:
- details = papi_exec.add(cmd, **args).get_details(err_msg)
-
def process_vxlan_gpe_dump(vxlan_dump):
"""Process vxlan_gpe dump.
"""
if vxlan_dump['is_ipv6']:
vxlan_dump['local'] = \
- inet_ntop(AF_INET6, vxlan_dump['local'])
+ ip_address(unicode(vxlan_dump['local']))
vxlan_dump['remote'] = \
- inet_ntop(AF_INET6, vxlan_dump['remote'])
+ ip_address(unicode(vxlan_dump['remote']))
else:
vxlan_dump['local'] = \
- inet_ntop(AF_INET, vxlan_dump['local'][0:4])
+ ip_address(unicode(vxlan_dump['local'][0:4]))
vxlan_dump['remote'] = \
- inet_ntop(AF_INET, vxlan_dump['remote'][0:4])
+ ip_address(unicode(vxlan_dump['remote'][0:4]))
return vxlan_dump
+ if interface_name is not None:
+ sw_if_index = InterfaceUtil.get_interface_index(
+ node, interface_name)
+ else:
+ sw_if_index = int(Constants.BITWISE_NON_ZERO)
+
+ cmd = 'vxlan_gpe_tunnel_dump'
+ args = dict(sw_if_index=sw_if_index)
+ err_msg = 'Failed to get VXLAN-GPE dump on host {host}'.format(
+ host=node['host'])
+ with PapiSocketExecutor(node) as papi_exec:
+ details = papi_exec.add(cmd, **args).get_details(err_msg)
+
data = list() if interface_name is None else dict()
- for vxlan_dump in details:
+ for dump in details:
if interface_name is None:
- data.append(process_vxlan_gpe_dump(vxlan_dump))
- elif vxlan_dump['sw_if_index'] == sw_if_index:
- data = process_vxlan_gpe_dump(vxlan_dump)
+ data.append(process_vxlan_gpe_dump(dump))
+ elif dump['sw_if_index'] == sw_if_index:
+ data = process_vxlan_gpe_dump(dump)
break
logger.debug('VXLAN-GPE data:\n{vxlan_gpe_data}'.format(
cmd = 'sw_interface_set_table'
args = dict(
sw_if_index=InterfaceUtil.get_interface_index(node, interface),
- is_ipv6=1 if ipv6 else 0,
+ is_ipv6=ipv6,
vrf_id=int(table_id))
err_msg = 'Failed to assign interface {ifc} to FIB table'.format(
ifc=interface)
@staticmethod
def init_avf_interface(node, ifc_key, numvfs=1, osi_layer='L2'):
- """Init PCI device by creating VFs and bind them to vfio-pci for AVF
+ """Init PCI device by creating VIFs and bind them to vfio-pci for AVF
driver testing on DUT.
:param node: DUT node.
:param ifc_key: Interface key from topology file.
- :param numvfs: Number of VFs to initialize, 0 - disable the VFs.
+ :param numvfs: Number of VIFs to initialize, 0 - disable the VIFs.
:param osi_layer: OSI Layer type to initialize TG with.
Default value "L2" sets linux interface spoof off.
:type node: dict
pf_mac_addr = Topology.get_interface_mac(node, ifc_key).split(":")
uio_driver = Topology.get_uio_driver(node)
kernel_driver = Topology.get_interface_driver(node, ifc_key)
- if kernel_driver != "i40e":
+ if kernel_driver not in ("i40e", "i40evf"):
raise RuntimeError(
- "AVF needs i40e driver, not {driver} at node {host} ifc {ifc}"\
- .format(driver=kernel_driver, host=node["host"], ifc=ifc_key))
+ "AVF needs i40e-compatible driver, not {driver} at node {host}"
+ " ifc {ifc}".format(
+ driver=kernel_driver, host=node["host"], ifc=ifc_key))
current_driver = DUTSetup.get_pci_dev_driver(
node, pf_pci_addr.replace(':', r'\:'))
# Bind to kernel driver.
DUTSetup.pci_driver_bind(node, pf_pci_addr, kernel_driver)
- # Initialize PCI VFs
+ # Initialize PCI VFs.
DUTSetup.set_sriov_numvfs(node, pf_pci_addr, numvfs)
vf_ifc_keys = []
DUTSetup.pci_vf_driver_bind(node, pf_pci_addr, vf_id, uio_driver)
# Add newly created ports into topology file
- vf_ifc_name = '{pf_if_key}_vf'.format(pf_if_key=ifc_key)
+ vf_ifc_name = '{pf_if_key}_vif'.format(pf_if_key=ifc_key)
vf_pci_addr = DUTSetup.get_virtfn_pci_addr(node, pf_pci_addr, vf_id)
vf_ifc_key = Topology.add_new_port(node, vf_ifc_name)
Topology.update_interface_name(node, vf_ifc_key,
cmd = 'sw_interface_set_rx_placement'
err_msg = "Failed to set interface RX placement to worker on host " \
"{host}!".format(host=node['host'])
- args = dict(sw_if_index=sw_if_index, queue_id=queue_id,
- worker_id=worker_id)
+ args = dict(
+ sw_if_index=sw_if_index,
+ queue_id=queue_id,
+ worker_id=worker_id,
+ is_main=False
+ )
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)