-# Copyright (c) 2019 Cisco and/or its affiliates.
+# Copyright (c) 2021 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
from resources.libraries.python.topology import Topology
-class TapFlags(IntEnum):
- """TAP interface flags."""
- TAP_FLAG_GSO = 1
+class TapFeaturesFlags(IntEnum):
+ """TAP Features Flags."""
+ TAP_API_FLAG_GSO = 1
+ TAP_API_FLAG_CSUM_OFFLOAD = 2
+ TAP_API_FLAG_PERSIST = 4
+ TAP_API_FLAG_ATTACH = 8
+ TAP_API_FLAG_TUN = 16
+ TAP_API_FLAG_GRO_COALESCE = 32
+ TAP_API_FLAG_PACKED = 64
+ TAP_API_FLAG_IN_ORDER = 128
-class Tap(object):
+class Tap:
"""Tap utilities."""
@staticmethod
- def add_tap_interface(node, tap_name, mac=None):
+ def add_tap_interface(
+ node, tap_name, mac=None, host_namespace=None, num_rx_queues=1,
+ rxq_size=0, txq_size=0, tap_feature_mask=0):
"""Add tap interface with name and optionally with MAC.
:param node: Node to add tap on.
:param tap_name: Tap interface name for linux tap.
:param mac: Optional MAC address for VPP tap.
+ :param host_namespace: Namespace.
+ :param num_rx_queues: Number of RX queues.
+ :param rxq_size: Size of RXQ (0 = Default API; 256 = Default VPP).
+ :param txq_size: Size of TXQ (0 = Default API; 256 = Default VPP).
+ :param tap_feature_mask: Mask of tap features to be enabled.
:type node: dict
- :type tap_name: str or unicode
+ :type tap_name: str
:type mac: str
+ :type host_namespace: str
+ :type num_rx_queues: int
+ :type rxq_size: int
+ :type txq_size: int
+ :type tap_feature_mask: int
:returns: Returns a interface index.
:rtype: int
"""
- if isinstance(tap_name, unicode):
- tap_name = str(tap_name)
- cmd = 'tap_create_v2'
+ cmd = u"tap_create_v2"
args = dict(
id=Constants.BITWISE_NON_ZERO,
- use_random_mac=False if mac else True,
+ use_random_mac=bool(mac is None),
mac_address=L2Util.mac_to_bin(mac) if mac else None,
+ num_rx_queues=int(num_rx_queues),
+ tx_ring_sz=int(txq_size),
+ rx_ring_sz=int(rxq_size),
host_mtu_set=False,
host_mac_addr_set=False,
host_ip4_prefix_set=False,
host_ip6_prefix_set=False,
host_ip4_gw_set=False,
host_ip6_gw_set=False,
- host_namespace_set=False,
+ host_namespace_set=bool(host_namespace),
+ host_namespace=host_namespace,
host_if_name_set=True,
- host_bridge_set=False,
host_if_name=tap_name,
+ host_bridge_set=False,
+ tap_flags=tap_feature_mask
)
- err_msg = 'Failed to create tap interface {tap} on host {host}'.format(
- tap=tap_name, host=node['host'])
+ err_msg = f"Failed to create tap interface {tap_name} " \
+ f"on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
sw_if_index = papi_exec.add(cmd, **args).get_sw_if_index(err_msg)
- if_key = Topology.add_new_port(node, 'tap')
+ if_key = Topology.add_new_port(node, u"tap")
Topology.update_interface_sw_if_index(node, if_key, sw_if_index)
Topology.update_interface_name(node, if_key, tap_name)
if mac is None:
:returns: VPP tap interface dev_name.
:rtype: str
"""
- return Tap.tap_dump(node, host_if_name).get('dev_name')
+ return Tap.tap_dump(node, host_if_name).get(u"dev_name")
@staticmethod
def vpp_get_tap_interface_mac(node, interface_name):
:returns: Processed tap interface dump.
:rtype: dict
"""
- tap_dump['host_mac_addr'] = str(tap_dump['host_mac_addr'])
- tap_dump['host_ip4_prefix'] = str(tap_dump['host_ip4_prefix'])
- tap_dump['host_ip6_prefix'] = str(tap_dump['host_ip6_prefix'])
- tap_dump['tap_flags'] = tap_dump['tap_flags'].value \
- if hasattr(tap_dump['tap_flags'], 'value') \
- else int(tap_dump['tap_flags'])
- tap_dump['host_namespace'] = None \
- if tap_dump['host_namespace'] == '(nil)' \
- else tap_dump['host_namespace']
- tap_dump['host_bridge'] = None \
- if tap_dump['host_bridge'] == '(nil)' \
- else tap_dump['host_bridge']
+ tap_dump[u"host_mac_addr"] = str(tap_dump[u"host_mac_addr"])
+ tap_dump[u"host_ip4_prefix"] = str(tap_dump[u"host_ip4_prefix"])
+ tap_dump[u"host_ip6_prefix"] = str(tap_dump[u"host_ip6_prefix"])
+ tap_dump[u"tap_flags"] = tap_dump[u"tap_flags"].value \
+ if hasattr(tap_dump[u"tap_flags"], u"value") \
+ else int(tap_dump[u"tap_flags"])
+ tap_dump[u"host_namespace"] = None \
+ if tap_dump[u"host_namespace"] == u"(nil)" \
+ else tap_dump[u"host_namespace"]
+ tap_dump[u"host_bridge"] = None \
+ if tap_dump[u"host_bridge"] == u"(nil)" \
+ else tap_dump[u"host_bridge"]
return tap_dump
- cmd = 'sw_interface_tap_v2_dump'
- err_msg = 'Failed to get TAP dump on host {host}'.format(
- host=node['host'])
+ cmd = u"sw_interface_tap_v2_dump"
+ err_msg = f"Failed to get TAP dump on host {node[u'host']}"
+
with PapiSocketExecutor(node) as papi_exec:
details = papi_exec.add(cmd).get_details(err_msg)
for dump in details:
if name is None:
data.append(process_tap_dump(dump))
- elif dump.get('host_if_name') == name:
+ elif dump.get(u"host_if_name") == name:
data = process_tap_dump(dump)
break
- logger.debug('TAP data:\n{tap_data}'.format(tap_data=data))
+ logger.debug(f"TAP data:\n{data}")
return data
+
+
+class TapFeatureMask:
+ """Tap features utilities"""
+
+ @staticmethod
+ def create_tap_feature_mask(**kwargs):
+ """Create tap feature mask with feature bits set according to kwargs.
+ :param kwargs: Key-value pairs of feature names and it's state
+ :type kwargs: dict
+ """
+ tap_feature_mask = 0
+
+ if u"all" in kwargs and kwargs[u"all"] is True:
+ for tap_feature_flag in TapFeaturesFlags:
+ tap_feature_mask |= 1 << (tap_feature_flag.value - 1)
+ else:
+ for feature_name, enabled in kwargs.items():
+ tap_feature_name = u"TAP_API_FLAG_" + feature_name.upper()
+ if tap_feature_name not in TapFeaturesFlags.__members__:
+ raise ValueError(u"Unsupported tap feature flag name")
+ if enabled:
+ tap_feature_mask |= \
+ 1 << (TapFeaturesFlags[tap_feature_name].value - 1)
+
+ return tap_feature_mask
+
+ @staticmethod
+ def is_feature_enabled(tap_feature_mask, tap_feature_flag):
+ """Checks if concrete tap feature is enabled within
+ tap_feature_mask
+ :param tap_feature_mask: Mask of enabled tap features
+ :param tap_feature_flag: Checked tap feature
+ :type tap_feature_mask: int
+ :type tap_feature_flag: TapFeaturesFlags
+ """
+ feature_flag_bit = 1 << tap_feature_flag.value
+ return (tap_feature_mask & feature_flag_bit) > 0