-# Copyright (c) 2019 Cisco and/or its affiliates.
+# Copyright (c) 2020 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
from robot.api import logger
from robot.libraries.BuiltIn import BuiltIn, RobotNotRunningError
+from resources.libraries.python.Constants import Constants
+
__all__ = [
u"DICT__nodes", u"Topology", u"NodeType", u"SocketType", u"NodeSubTypeTG"
]
return safe_load(work_file.read())[u"nodes"]
-
class NodeType:
"""Defines node types used in topology dictionaries."""
- # TODO: Two letter initialisms are well-known, but too short for pylint.
- # Candidates: TG -> TGN, VM -> VNF.
-
# Device Under Test (this node has VPP running on it)
DUT = u"DUT"
# Traffic Generator (this node has traffic generator on it)
does not rely on the data retrieved from nodes, this allows to call most of
the methods without having filled active topology with internal nodes data.
"""
-
- def add_node_item(self, node, value, path):
+ @staticmethod
+ def add_node_item(node, value, path):
"""Add item to topology node.
:param node: Topology node.
elif isinstance(node[path[0]], str):
node[path[0]] = dict() if node[path[0]] == u"" \
else {node[path[0]]: u""}
- self.add_node_item(node[path[0]], value, path[1:])
+ Topology.add_node_item(node[path[0]], value, path[1:])
@staticmethod
def add_new_port(node, ptype):
"""
port_types = (
u"subinterface", u"vlan_subif", u"memif", u"tap", u"vhost",
- u"loopback", u"gre_tunnel", u"vxlan_tunnel", u"eth_bond", u"eth_avf"
+ u"loopback", u"gre_tunnel", u"vxlan_tunnel", u"eth_bond",
+ u"eth_avf", u"eth_rdma"
)
for node_data in nodes.values():
except KeyError:
return None
+ @staticmethod
+ def get_interface_ip4_prefix_length(node, iface_key):
+ """Get IP4 address prefix length for the interface.
+
+ :param node: Node to get prefix length on.
+ :param iface_key: Interface key from topology file.
+ :type node: dict
+ :type iface_key: str
+ :returns: Prefix length from topology file or the default
+ IP4 prefix length if not found.
+ :rtype: int
+ :raises: KeyError if iface_key is not found.
+ """
+ return node[u"interfaces"][iface_key].get(u"ip4_prefix_length", \
+ Constants.DEFAULT_IP4_PREFIX_LENGTH)
+
@staticmethod
def get_adjacent_node_and_interface(nodes_info, node, iface_key):
"""Get node and interface adjacent to specified interface
:rtype: list
"""
- logger.trace(f"node1: {str(node1)}")
- logger.trace(f"node2: {str(node2)}")
node1_links = self._get_node_active_link_names(
node1, filter_list=filter_list_node1
)
elif node2_links is None:
logger.error(u"Unable to find active links for node2")
else:
- connecting_links = list(set(node1_links).intersection(node2_links))
+ # Not using set operations, as we need deterministic order.
+ connecting_links = [
+ link for link in node1_links if link in node2_links
+ ]
return connecting_links
:param node: Node to set numa_node on.
:param iface_key: Interface key from topology file.
+ :param numa_node_id: Num_node ID.
:type node: dict
:type iface_key: str
+ :type numa_node_id: int
:returns: Return iface_key or None if not found.
"""
try:
except KeyError:
return None
- def add_new_socket(self, node, socket_type, socket_id, socket_path):
+ @staticmethod
+ def add_new_socket(node, socket_type, socket_id, socket_path):
"""Add socket file of specific SocketType and ID to node.
:param node: Node to add socket on.
:param socket_type: Socket type.
- :param socket_id: Socket id.
+ :param socket_id: Socket id, currently equals to unique node key.
:param socket_path: Socket absolute path.
:type node: dict
:type socket_type: SocketType
:type socket_path: str
"""
path = [u"sockets", socket_type, socket_id]
- self.add_node_item(node, socket_path, path)
+ Topology.add_node_item(node, socket_path, path)
+
+ @staticmethod
+ def del_node_socket_id(node, socket_type, socket_id):
+ """Delete socket of specific SocketType and ID from node.
+
+ :param node: Node to delete socket from.
+ :param socket_type: Socket type.
+ :param socket_id: Socket id, currently equals to unique node key.
+ :type node: dict
+ :type socket_type: SocketType
+ :type socket_id: str
+ """
+ node[u"sockets"][socket_type].pop(socket_id)
@staticmethod
def get_node_sockets(node, socket_type=None):