-# Copyright (c) 2019 Cisco and/or its affiliates.
+# Copyright (c) 2020 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
from robot.api import logger
from robot.libraries.BuiltIn import BuiltIn, RobotNotRunningError
+from resources.libraries.python.Constants import Constants
+
__all__ = [
u"DICT__nodes", u"Topology", u"NodeType", u"SocketType", u"NodeSubTypeTG"
]
return safe_load(work_file.read())[u"nodes"]
-# pylint: disable=invalid-name
-
class NodeType:
"""Defines node types used in topology dictionaries."""
# Device Under Test (this node has VPP running on it)
DUT = u"DUT"
# Traffic Generator (this node has traffic generator on it)
+ # pylint: disable=invalid-name
TG = u"TG"
# Virtual Machine (this node running on DUT node)
+ # pylint: disable=invalid-name
VM = u"VM"
does not rely on the data retrieved from nodes, this allows to call most of
the methods without having filled active topology with internal nodes data.
"""
-
- def add_node_item(self, node, value, path):
+ @staticmethod
+ def add_node_item(node, value, path):
"""Add item to topology node.
:param node: Topology node.
elif isinstance(node[path[0]], str):
node[path[0]] = dict() if node[path[0]] == u"" \
else {node[path[0]]: u""}
- self.add_node_item(node[path[0]], value, path[1:])
+ Topology.add_node_item(node[path[0]], value, path[1:])
@staticmethod
def add_new_port(node, ptype):
"""
port_types = (
u"subinterface", u"vlan_subif", u"memif", u"tap", u"vhost",
- u"loopback", u"gre_tunnel", u"vxlan_tunnel", u"eth_bond", u"eth_avf"
+ u"loopback", u"gre_tunnel", u"vxlan_tunnel", u"eth_bond",
+ u"eth_avf", u"eth_rdma"
)
for node_data in nodes.values():
except KeyError:
return None
+ @staticmethod
+ def get_interface_ip4_prefix_length(node, iface_key):
+ """Get IP4 address prefix length for the interface.
+
+ :param node: Node to get prefix length on.
+ :param iface_key: Interface key from topology file.
+ :type node: dict
+ :type iface_key: str
+ :returns: Prefix length from topology file or the default
+ IP4 prefix length if not found.
+ :rtype: int
+ :raises: KeyError if iface_key is not found.
+ """
+ return node[u"interfaces"][iface_key].get(u"ip4_prefix_length", \
+ Constants.DEFAULT_IP4_PREFIX_LENGTH)
+
@staticmethod
def get_adjacent_node_and_interface(nodes_info, node, iface_key):
"""Get node and interface adjacent to specified interface
:rtype: list
"""
- logger.trace(f"node1: {str(node1)}")
- logger.trace(f"node2: {str(node2)}")
node1_links = self._get_node_active_link_names(
node1, filter_list=filter_list_node1
)
elif node2_links is None:
logger.error(u"Unable to find active links for node2")
else:
- connecting_links = list(set(node1_links).intersection(node2_links))
+ # Not using set operations, as we need deterministic order.
+ connecting_links = [
+ link for link in node1_links if link in node2_links
+ ]
return connecting_links
:param node: Node to set numa_node on.
:param iface_key: Interface key from topology file.
+ :param numa_node_id: Num_node ID.
:type node: dict
:type iface_key: str
+ :type numa_node_id: int
:returns: Return iface_key or None if not found.
"""
try:
except KeyError:
return None
- def add_new_socket(self, node, socket_type, socket_id, socket_path):
+ @staticmethod
+ def add_new_socket(node, socket_type, socket_id, socket_path):
"""Add socket file of specific SocketType and ID to node.
:param node: Node to add socket on.
:param socket_type: Socket type.
- :param socket_id: Socket id.
+ :param socket_id: Socket id, currently equals to unique node key.
:param socket_path: Socket absolute path.
:type node: dict
:type socket_type: SocketType
:type socket_path: str
"""
path = [u"sockets", socket_type, socket_id]
- self.add_node_item(node, socket_path, path)
+ Topology.add_node_item(node, socket_path, path)
+
+ @staticmethod
+ def del_node_socket_id(node, socket_type, socket_id):
+ """Delete socket of specific SocketType and ID from node.
+
+ :param node: Node to delete socket from.
+ :param socket_type: Socket type.
+ :param socket_id: Socket id, currently equals to unique node key.
+ :type node: dict
+ :type socket_type: SocketType
+ :type socket_id: str
+ """
+ node[u"sockets"][socket_type].pop(socket_id)
@staticmethod
def get_node_sockets(node, socket_type=None):