-# Copyright (c) 2019 Cisco and/or its affiliates.
+# Copyright (c) 2021 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
from robot.api import logger
from robot.libraries.BuiltIn import BuiltIn, RobotNotRunningError
+from resources.libraries.python.Constants import Constants
+
__all__ = [
u"DICT__nodes", u"Topology", u"NodeType", u"SocketType", u"NodeSubTypeTG"
]
return safe_load(work_file.read())[u"nodes"]
-
class NodeType:
"""Defines node types used in topology dictionaries."""
- # TODO: Two letter initialisms are well-known, but too short for pylint.
- # Candidates: TG -> TGN, VM -> VNF.
-
# Device Under Test (this node has VPP running on it)
DUT = u"DUT"
# Traffic Generator (this node has traffic generator on it)
does not rely on the data retrieved from nodes, this allows to call most of
the methods without having filled active topology with internal nodes data.
"""
-
- def add_node_item(self, node, value, path):
+ @staticmethod
+ def add_node_item(node, value, path):
"""Add item to topology node.
:param node: Topology node.
elif isinstance(node[path[0]], str):
node[path[0]] = dict() if node[path[0]] == u"" \
else {node[path[0]]: u""}
- self.add_node_item(node[path[0]], value, path[1:])
+ Topology.add_node_item(node[path[0]], value, path[1:])
@staticmethod
def add_new_port(node, ptype):
"""
port_types = (
u"subinterface", u"vlan_subif", u"memif", u"tap", u"vhost",
- u"loopback", u"gre_tunnel", u"vxlan_tunnel", u"eth_bond", u"eth_avf"
+ u"loopback", u"gre_tunnel", u"vxlan_tunnel", u"eth_bond",
+ u"eth_avf", u"eth_rdma", u"geneve_tunnel", u"eth_af_xdp",
+ u"gtpu_tunnel"
)
for node_data in nodes.values():
return links
@staticmethod
- def _get_interface_by_key_value(node, key, value):
+ def _get_interface_by_key_value(node, key, value, subsequent=False):
"""Return node interface key from topology file
according to key and value.
:param node: The node dictionary.
:param key: Key by which to select the interface.
:param value: Value that should be found using the key.
+ :param subsequent: Use second interface of the link. Useful for
+ back-to-back links. Default: False
:type node: dict
:type key: string
:type value: string
+ :type subsequent: bool
:returns: Interface key from topology file
:rtype: string
"""
k_val = if_val.get(key)
if k_val is not None:
if k_val == value:
- retval = if_key
- break
+ if subsequent:
+ subsequent = False
+ else:
+ retval = if_key
+ break
return retval
@staticmethod
return Topology._get_interface_by_key_value(node, u"name", iface_name)
@staticmethod
- def get_interface_by_link_name(node, link_name):
+ def get_interface_by_link_name(node, link_name, subsequent=False):
"""Return interface key of link on node.
This method returns the interface name associated with a given link
:param node: The node topology dictionary.
:param link_name: Name of the link that a interface is connected to.
+ :param subsequent: Use second interface of the link. Useful for
+ back-to-back links. Default: False
:type node: dict
:type link_name: string
+ :type subsequent: bool
:returns: Interface key of the interface connected to the given link.
:rtype: str
"""
- return Topology._get_interface_by_key_value(node, u"link", link_name)
+ return Topology._get_interface_by_key_value(
+ node, u"link", link_name, subsequent=subsequent
+ )
def get_interfaces_by_link_names(self, node, link_names):
"""Return dictionary of dictionaries {"interfaceN", interface name}.
except KeyError:
return None
+ @staticmethod
+ def get_interface_ip4_prefix_length(node, iface_key):
+ """Get IP4 address prefix length for the interface.
+
+ :param node: Node to get prefix length on.
+ :param iface_key: Interface key from topology file.
+ :type node: dict
+ :type iface_key: str
+ :returns: Prefix length from topology file or the default
+ IP4 prefix length if not found.
+ :rtype: int
+ :raises: KeyError if iface_key is not found.
+ """
+ return node[u"interfaces"][iface_key].get(u"ip4_prefix_length", \
+ Constants.DEFAULT_IP4_PREFIX_LENGTH)
+
@staticmethod
def get_adjacent_node_and_interface(nodes_info, node, iface_key):
"""Get node and interface adjacent to specified interface
return None
@staticmethod
- def _get_node_active_link_names(node, filter_list=None):
+ def _get_node_active_link_names(node, filter_list=None, topo_has_dut=True):
"""Return list of link names that are other than mgmt links.
:param node: Node topology dictionary.
:param filter_list: Link filter criteria.
+ :param topo_has_dut: Whether we require back-to-back links.
:type node: dict
:type filter_list: list of strings
+ :type topo_has_dut: bool
:returns: List of link names occupied by the node.
:rtype: None or list of string
"""
link_names.append(interface[u"link"])
if not link_names:
link_names = None
+ if not topo_has_dut:
+ new_link_names = list()
+ for link_name in link_names:
+ count = 0
+ for interface in interfaces.values():
+ link = interface.get(u"link", None)
+ if link == link_name:
+ count += 1
+ if count == 2:
+ new_link_names.append(link_name)
+ link_names = new_link_names
return link_names
def get_active_connecting_links(
:rtype: list
"""
- logger.trace(f"node1: {str(node1)}")
- logger.trace(f"node2: {str(node2)}")
- node1_links = self._get_node_active_link_names(
- node1, filter_list=filter_list_node1
- )
- node2_links = self._get_node_active_link_names(
- node2, filter_list=filter_list_node2
- )
+ if node1 != node2:
+ node1_links = self._get_node_active_link_names(
+ node1, filter_list=filter_list_node1
+ )
+ node2_links = self._get_node_active_link_names(
+ node2, filter_list=filter_list_node2
+ )
+ else:
+ # Looking for back-to-back links.
+ node1_links = self._get_node_active_link_names(
+ node1, filter_list=filter_list_node1, topo_has_dut=False
+ )
+ node2_links = node1_links
connecting_links = None
if node1_links is None:
elif node2_links is None:
logger.error(u"Unable to find active links for node2")
else:
- connecting_links = list(set(node1_links).intersection(node2_links))
+ # Not using set operations, as we need deterministic order.
+ connecting_links = [
+ link for link in node1_links if link in node2_links
+ ]
return connecting_links
:param node: Node to set numa_node on.
:param iface_key: Interface key from topology file.
+ :param numa_node_id: Num_node ID.
:type node: dict
:type iface_key: str
+ :type numa_node_id: int
:returns: Return iface_key or None if not found.
"""
try:
except KeyError:
return None
- def add_new_socket(self, node, socket_type, socket_id, socket_path):
+ @staticmethod
+ def add_new_socket(node, socket_type, socket_id, socket_path):
"""Add socket file of specific SocketType and ID to node.
:param node: Node to add socket on.
:param socket_type: Socket type.
- :param socket_id: Socket id.
+ :param socket_id: Socket id, currently equals to unique node key.
:param socket_path: Socket absolute path.
:type node: dict
:type socket_type: SocketType
:type socket_path: str
"""
path = [u"sockets", socket_type, socket_id]
- self.add_node_item(node, socket_path, path)
+ Topology.add_node_item(node, socket_path, path)
+
+ @staticmethod
+ def del_node_socket_id(node, socket_type, socket_id):
+ """Delete socket of specific SocketType and ID from node.
+
+ :param node: Node to delete socket from.
+ :param socket_type: Socket type.
+ :param socket_id: Socket id, currently equals to unique node key.
+ :type node: dict
+ :type socket_type: SocketType
+ :type socket_id: str
+ """
+ node[u"sockets"][socket_type].pop(socket_id)
@staticmethod
def get_node_sockets(node, socket_type=None):
"""
for node in nodes.values():
if u"sockets" in list(node.keys()):
+ # Containers are disconnected and destroyed already.
node.pop(u"sockets")