def load_topo_from_yaml():
- """Loads topology from file defined in "${TOPOLOGY_PATH}" variable
+ """Load topology from file defined in "${TOPOLOGY_PATH}" variable
:return: nodes from loaded topology
"""
# Traffic Generator (this node has traffic generator on it)
TG = 'TG'
+
class NodeSubTypeTG(object):
#T-Rex traffic generator
TREX = 'TREX'
# Moongen
MOONGEN = 'MOONGEN'
- #IxNetwork
+ # IxNetwork
IXNET = 'IXNET'
DICT__nodes = load_topo_from_yaml()
the used topology.
"""
- def __init__(self):
- pass
-
@staticmethod
def get_node_by_hostname(nodes, hostname):
"""Get node from nodes of the topology by hostname.
@staticmethod
def _get_interface_by_key_value(node, key, value):
- """ Return node interface name according to key and value
+ """Return node interface name according to key and value
:param node: :param node: the node dictionary
:param key: key by which to select the interface.
return list_mac
def _extract_vpp_interface_by_mac(self, interfaces_list, mac_address):
- """Returns interface dictionary from interface_list by mac address.
+ """Return interface dictionary from interface_list by mac address.
Extracts interface dictionary from all of the interfaces in interfaces
list parsed from json according to mac_address of the interface
return interface_name
def _update_node_interface_data_from_json(self, node, interface_dump_json):
- """ Update node vpp data in node__DICT from json interface dump.
+ """Update node vpp data in node__DICT from json interface dump.
This method updates vpp interface names and sw indexexs according to
interface mac addresses found in interface_dump_json
format(ifc, if_mac))
ifc['name'] = interface_dict["interface_name"]
ifc['vpp_sw_index'] = interface_dict["sw_if_index"]
+ ifc['mtu'] = interface_dict["mtu"]
def update_vpp_interface_data_on_node(self, node):
"""Update vpp generated interface data for a given node in DICT__nodes
InterfaceSetup.tg_set_interfaces_udev_rules(node)
def update_all_interface_data_on_all_nodes(self, nodes):
- """ Update interface names on all nodes in DICT__nodes
+ """Update interface names on all nodes in DICT__nodes
:param nodes: Nodes in the topology.
:type nodes: dict
"""
for port in node['interfaces'].values():
port_name = port.get('name')
- if port_name is None:
- continue
if port_name == interface:
return port.get('vpp_sw_index')
return None
+ @staticmethod
+ def get_interface_mtu(node, interface):
+ """Get interface MTU.
+
+ Returns physical layer MTU (max. size of Ethernet frame).
+ :param node: Node to get interface MTU on.
+ :param interface: Interface name.
+ :type node: dict
+ :type interface: str
+ :return: MTU or None if not found.
+ :rtype: int
+ """
+ for port in node['interfaces'].values():
+ port_name = port.get('name')
+ if port_name == interface:
+ return port.get('mtu')
+
+ return None
+
@staticmethod
def get_interface_mac_by_port_key(node, port_key):
"""Get MAC address for the interface based on port key.
"""
for port in node['interfaces'].values():
port_name = port.get('name')
- if port_name is None:
- continue
if port_name == interface:
return port.get('mac_address')
:rtype: (dict, dict)
"""
link_name = None
- # get link name where the interface belongs to
+ # get link name where the interface belongs to
for port_name, port_data in node['interfaces'].iteritems():
if port_name == 'mgmt':
continue
@staticmethod
def _get_node_active_link_names(node):
- """Returns list of link names that are other than mgmt links
+ """Return list of link names that are other than mgmt links
:param node: node topology dictionary
:return: list of strings that represent link names occupied by the node
@keyword('Get active links connecting "${node1}" and "${node2}"')
def get_active_connecting_links(self, node1, node2):
- """Returns list of link names that connect together node1 and node2
+ """Return list of link names that connect together node1 and node2
:param node1: node topology dictionary
:param node2: node topology dictionary
:param node2: Second node.
:type node1: dict
:type node2: dict
- :return: Engress interfaces.
+ :return: Egress interfaces.
:rtype: list
"""
interfaces = []
:param node2: Second node.
:type node1: dict
:type node2: dict
- :return: Engress interface.
+ :return: Egress interface.
:rtype: str
"""
interfaces = self.get_egress_interfaces_for_nodes(node1, node2)
if not interfaces:
- raise RuntimeError('No engress interface for nodes')
+ raise RuntimeError('No egress interface for nodes')
return interfaces[0]
@keyword('Get link data useful in circular topology test from tg "${tgen}"'
' dut1 "${dut1}" dut2 "${dut2}"')
def get_links_dict_from_nodes(self, tgen, dut1, dut2):
- """Returns link combinations used in tests in circular topology.
+ """Return link combinations used in tests in circular topology.
For the time being it returns links from the Node path:
TG->DUT1->DUT2->TG
- :param tg: traffic generator node data
+ :param tgen: traffic generator node data
:param dut1: DUT1 node data
:param dut2: DUT2 node data
- :type tg: dict
+ :type tgen: dict
:type dut1: dict
:type dut2: dict
:return: dictionary of possible link combinations
@staticmethod
def get_node_hostname(node):
+ """Return host (hostname/ip address) of the node.
+
+ :param node: Node created from topology.
+ :type node: dict
+ :return: host as 'str' type
"""
- :param node: node dictionary
- :return: host name as 'str' type
- """
- return node['host']
\ No newline at end of file
+ return node['host']