-# Copyright (c) 2019 Cisco and/or its affiliates.
+# Copyright (c) 2021 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
"""Defines nodes and topology structure."""
+import re
+
from collections import Counter
-from yaml import load
+from yaml import safe_load
from robot.api import logger
from robot.libraries.BuiltIn import BuiltIn, RobotNotRunningError
-from robot.api.deco import keyword
-__all__ = ["DICT__nodes", 'Topology', 'NodeType']
+from resources.libraries.python.Constants import Constants
+
+__all__ = [
+ u"DICT__nodes", u"Topology", u"NodeType", u"SocketType", u"NodeSubTypeTG"
+]
def load_topo_from_yaml():
:returns: Nodes from loaded topology.
"""
try:
- topo_path = BuiltIn().get_variable_value("${TOPOLOGY_PATH}")
+ topo_path = BuiltIn().get_variable_value(u"${TOPOLOGY_PATH}")
except RobotNotRunningError:
return ''
with open(topo_path) as work_file:
- return load(work_file.read())['nodes']
+ return safe_load(work_file.read())[u"nodes"]
-# pylint: disable=invalid-name
-
-class NodeType(object):
+class NodeType:
"""Defines node types used in topology dictionaries."""
# Device Under Test (this node has VPP running on it)
- DUT = 'DUT'
+ DUT = u"DUT"
# Traffic Generator (this node has traffic generator on it)
- TG = 'TG'
+ # pylint: disable=invalid-name
+ TG = u"TG"
# Virtual Machine (this node running on DUT node)
- VM = 'VM'
+ # pylint: disable=invalid-name
+ VM = u"VM"
-class NodeSubTypeTG(object):
+class NodeSubTypeTG:
"""Defines node sub-type TG - traffic generator."""
# T-Rex traffic generator
- TREX = 'TREX'
+ TREX = u"TREX"
# Moongen
- MOONGEN = 'MOONGEN'
+ MOONGEN = u"MOONGEN"
# IxNetwork
- IXNET = 'IXNET'
+ IXNET = u"IXNET"
+
+
+class SocketType:
+ """Defines socket types used in topology dictionaries."""
+ # VPP Socket PAPI
+ PAPI = u"PAPI"
+ # VPP PAPI Stats (legacy option until stats are migrated to Socket PAPI)
+ STATS = u"STATS"
+
DICT__nodes = load_topo_from_yaml()
-class Topology(object):
+class Topology:
"""Topology data manipulation and extraction methods.
Defines methods used for manipulation and extraction of data from
does not rely on the data retrieved from nodes, this allows to call most of
the methods without having filled active topology with internal nodes data.
"""
+ @staticmethod
+ def add_node_item(node, value, path):
+ """Add item to topology node.
+
+ :param node: Topology node.
+ :param value: Value to insert.
+ :param path: Path where to insert item.
+ :type node: dict
+ :type value: str
+ :type path: list
+ """
+ if len(path) == 1:
+ node[path[0]] = value
+ return
+ if path[0] not in node:
+ node[path[0]] = dict()
+ elif isinstance(node[path[0]], str):
+ node[path[0]] = dict() if node[path[0]] == u"" \
+ else {node[path[0]]: u""}
+ Topology.add_node_item(node[path[0]], value, path[1:])
@staticmethod
def add_new_port(node, ptype):
max_ports = 1000000
iface = None
for i in range(1, max_ports):
- if node['interfaces'].get(str(ptype) + str(i)) is None:
+ if node[u"interfaces"].get(str(ptype) + str(i)) is None:
iface = str(ptype) + str(i)
- node['interfaces'][iface] = dict()
+ node[u"interfaces"][iface] = dict()
break
return iface
:returns: Nothing
"""
try:
- node['interfaces'].pop(iface_key)
+ node[u"interfaces"].pop(iface_key)
except KeyError:
pass
:type ptype: str
:returns: Nothing
"""
- for if_key in list(node['interfaces']):
+ for if_key in list(node[u"interfaces"]):
if if_key.startswith(str(ptype)):
- node['interfaces'].pop(if_key)
+ node[u"interfaces"].pop(if_key)
@staticmethod
def remove_all_added_ports_on_all_duts_from_topology(nodes):
:type nodes: dict
:returns: Nothing
"""
- port_types = ('subinterface', 'vlan_subif', 'memif', 'tap', 'vhost',
- 'loopback', 'gre_tunnel', 'vxlan_tunnel', 'eth_bond',
- 'avf')
+ port_types = (
+ u"subinterface", u"vlan_subif", u"memif", u"tap", u"vhost",
+ u"loopback", u"gre_tunnel", u"vxlan_tunnel", u"eth_bond",
+ u"eth_avf", u"eth_rdma", u"geneve_tunnel", u"eth_af_xdp",
+ u"gtpu_tunnel"
+ )
for node_data in nodes.values():
- if node_data['type'] == NodeType.DUT:
+ if node_data[u"type"] == NodeType.DUT:
for ptype in port_types:
Topology.remove_all_ports(node_data, ptype)
+ @staticmethod
+ def remove_all_vif_ports(node):
+ """Remove all Virtual Interfaces on DUT node.
+
+ :param node: Node to remove VIF ports on.
+ :type node: dict
+ :returns: Nothing
+ """
+ reg_ex = re.compile(r"port\d+_vif\d+")
+ for if_key in list(node[u"interfaces"]):
+ if re.match(reg_ex, if_key):
+ node[u"interfaces"].pop(if_key)
+
+ @staticmethod
+ def remove_all_added_vif_ports_on_all_duts_from_topology(nodes):
+ """Remove all added Virtual Interfaces on all DUT nodes in
+ the topology.
+
+ :param nodes: Nodes in the topology.
+ :type nodes: dict
+ :returns: Nothing
+ """
+ for node_data in nodes.values():
+ if node_data[u"type"] == NodeType.DUT:
+ Topology.remove_all_vif_ports(node_data)
+
@staticmethod
def update_interface_sw_if_index(node, iface_key, sw_if_index):
"""Update sw_if_index on the interface from the node.
:type iface_key: str
:type sw_if_index: int
"""
- node['interfaces'][iface_key]['vpp_sw_index'] = int(sw_if_index)
+ node[u"interfaces"][iface_key][u"vpp_sw_index"] = int(sw_if_index)
@staticmethod
def update_interface_name(node, iface_key, name):
:type iface_key: str
:type name: str
"""
- node['interfaces'][iface_key]['name'] = str(name)
+ node[u"interfaces"][iface_key][u"name"] = str(name)
@staticmethod
def update_interface_mac_address(node, iface_key, mac_address):
:type iface_key: str
:type mac_address: str
"""
- node['interfaces'][iface_key]['mac_address'] = str(mac_address)
+ node[u"interfaces"][iface_key][u"mac_address"] = str(mac_address)
@staticmethod
def update_interface_pci_address(node, iface_key, pci_address):
:type iface_key: str
:type pci_address: str
"""
- node['interfaces'][iface_key]['pci_address'] = str(pci_address)
+ node[u"interfaces"][iface_key][u"pci_address"] = str(pci_address)
@staticmethod
def update_interface_vlan(node, iface_key, vlan):
:type iface_key: str
:type vlan: str
"""
- node['interfaces'][iface_key]['vlan'] = int(vlan)
+ node[u"interfaces"][iface_key][u"vlan"] = int(vlan)
@staticmethod
def update_interface_vhost_socket(node, iface_key, vhost_socket):
:type iface_key: str
:type vhost_socket: str
"""
- node['interfaces'][iface_key]['vhost_socket'] = str(vhost_socket)
+ node[u"interfaces"][iface_key][u"vhost_socket"] = str(vhost_socket)
@staticmethod
def update_interface_memif_socket(node, iface_key, memif_socket):
:type iface_key: str
:type memif_socket: str
"""
- node['interfaces'][iface_key]['memif_socket'] = str(memif_socket)
+ node[u"interfaces"][iface_key][u"memif_socket"] = str(memif_socket)
@staticmethod
def update_interface_memif_id(node, iface_key, memif_id):
:type iface_key: str
:type memif_id: str
"""
- node['interfaces'][iface_key]['memif_id'] = str(memif_id)
+ node[u"interfaces"][iface_key][u"memif_id"] = str(memif_id)
@staticmethod
def update_interface_memif_role(node, iface_key, memif_role):
:type iface_key: str
:type memif_role: str
"""
- node['interfaces'][iface_key]['memif_role'] = str(memif_role)
+ node[u"interfaces"][iface_key][u"memif_role"] = str(memif_role)
@staticmethod
def update_interface_tap_dev_name(node, iface_key, dev_name):
:type dev_name: str
:returns: Nothing
"""
- node['interfaces'][iface_key]['dev_name'] = str(dev_name)
+ node[u"interfaces"][iface_key][u"dev_name"] = str(dev_name)
@staticmethod
def get_node_by_hostname(nodes, hostname):
:returns: Node dictionary or None if not found.
"""
for node in nodes.values():
- if node['host'] == hostname:
+ if node[u"host"] == hostname:
return node
return None
:returns: Links in the topology.
:rtype: list
"""
- links = []
+ links = list()
for node in nodes.values():
- for interface in node['interfaces'].values():
- link = interface.get('link')
+ for interface in node[u"interfaces"].values():
+ link = interface.get(u"link")
if link is not None:
if link not in links:
links.append(link)
:returns: Interface key from topology file
:rtype: string
"""
- interfaces = node['interfaces']
+ interfaces = node[u"interfaces"]
retval = None
- for if_key, if_val in interfaces.iteritems():
+ for if_key, if_val in interfaces.items():
k_val = if_val.get(key)
if k_val is not None:
if k_val == value:
:returns: Interface key.
:rtype: str
"""
- return Topology._get_interface_by_key_value(node, "name", iface_name)
+ return Topology._get_interface_by_key_value(node, u"name", iface_name)
@staticmethod
def get_interface_by_link_name(node, link_name):
:returns: Interface key of the interface connected to the given link.
:rtype: str
"""
- return Topology._get_interface_by_key_value(node, "link", link_name)
+ return Topology._get_interface_by_key_value(node, u"link", link_name)
def get_interfaces_by_link_names(self, node, link_names):
"""Return dictionary of dictionaries {"interfaceN", interface name}.
links.
:rtype: dict
"""
- retval = {}
- interface_key_tpl = "interface{}"
+ retval = dict()
interface_number = 1
for link_name in link_names:
interface = self.get_interface_by_link_name(node, link_name)
- interface_name = self.get_interface_name(node, interface)
- interface_key = interface_key_tpl.format(str(interface_number))
- retval[interface_key] = interface_name
+ retval[f"interface{str(interface_number)}"] = \
+ self.get_interface_name(node, interface)
interface_number += 1
return retval
interface index assigned to the interface by vpp for a given node.
:param node: The node topology dictionary.
- :param sw_if_index: sw_if_index of the link that a interface is connected to.
+ :param sw_if_index: sw_if_index of the link that a interface is
+ connected to.
:type node: dict
:type sw_if_index: int
:returns: Interface name of the interface connected to the given link.
:rtype: str
"""
- return Topology._get_interface_by_key_value(node, "vpp_sw_index",
- sw_if_index)
+ return Topology._get_interface_by_key_value(
+ node, u"vpp_sw_index", sw_if_index
+ )
@staticmethod
def get_interface_sw_index(node, iface_key):
:rtype: int or None
"""
try:
- if isinstance(iface_key, basestring):
- return node['interfaces'][iface_key].get('vpp_sw_index')
+ if isinstance(iface_key, str):
+ return node[u"interfaces"][iface_key].get(u"vpp_sw_index")
# TODO: use only iface_key, do not use integer
return int(iface_key)
except (KeyError, ValueError):
:raises TypeError: If provided interface name is not a string.
"""
try:
- if not isinstance(iface_name, basestring):
- raise TypeError("Interface name must be a string.")
+ if not isinstance(iface_name, str):
+ raise TypeError(u"Interface name must be a string.")
iface_key = Topology.get_interface_by_name(node, iface_name)
- return node['interfaces'][iface_key].get('vpp_sw_index')
+ return node[u"interfaces"][iface_key].get(u"vpp_sw_index")
except (KeyError, ValueError):
return None
:rtype: int
"""
try:
- return node['interfaces'][iface_key].get('mtu')
+ return node[u"interfaces"][iface_key].get(u"mtu")
except KeyError:
return None
:rtype: str
"""
try:
- return node['interfaces'][iface_key].get('name')
+ return node[u"interfaces"][iface_key].get(u"name")
except KeyError:
return None
if isinstance(interface, int):
key = Topology.get_interface_by_sw_index(node, interface)
if key is None:
- raise RuntimeError("Interface with sw_if_index={0} does not "
- "exist in topology.".format(interface))
+ raise RuntimeError(
+ f"Interface with sw_if_index={interface} does not exist "
+ f"in topology."
+ )
elif interface in Topology.get_node_interfaces(node):
key = interface
- elif interface in Topology.get_links({"dut": node}):
+ elif interface in Topology.get_links({u"dut": node}):
key = Topology.get_interface_by_link_name(node, interface)
- elif isinstance(interface, basestring):
+ elif isinstance(interface, str):
key = Topology.get_interface_by_name(node, interface)
if key is None:
- raise RuntimeError("Interface with key, name or link name "
- "\"{0}\" does not exist in topology."
- .format(interface))
+ raise RuntimeError(
+ f"Interface with key, name or link name \"{interface}\" "
+ f"does not exist in topology."
+ )
else:
- raise TypeError("Type of interface argument must be integer"
- " or string.")
+ raise TypeError(
+ u"Type of interface argument must be integer or string."
+ )
return key
@staticmethod
key = Topology.convert_interface_reference_to_key(node, interface)
conversions = {
- "key": lambda x, y: y,
- "name": Topology.get_interface_name,
- "sw_if_index": Topology.get_interface_sw_index
+ u"key": lambda x, y: y,
+ u"name": Topology.get_interface_name,
+ u"sw_if_index": Topology.get_interface_sw_index
}
try:
return conversions[wanted_format](node, key)
except KeyError:
- raise ValueError("Unrecognized return value wanted: {0}."
- "Valid options are key, name, sw_if_index"
- .format(wanted_format))
+ raise ValueError(
+ f"Unrecognized return value wanted: {wanted_format}."
+ f"Valid options are key, name, sw_if_index"
+ )
@staticmethod
def get_interface_numa_node(node, iface_key):
:rtype: int
"""
try:
- return node['interfaces'][iface_key].get('numa_node')
+ return node[u"interfaces"][iface_key].get(u"numa_node")
except KeyError:
return None
numa_list = []
for if_key in iface_keys:
try:
- numa_list.append(node['interfaces'][if_key].get('numa_node'))
+ numa_list.append(node[u"interfaces"][if_key].get(u"numa_node"))
except KeyError:
pass
:returns: Return MAC or None if not found.
"""
try:
- return node['interfaces'][iface_key].get('mac_address')
+ return node[u"interfaces"][iface_key].get(u"mac_address")
except KeyError:
return None
:returns: Return IP4 or None if not found.
"""
try:
- return node['interfaces'][iface_key].get('ip4_address', None)
+ return node[u"interfaces"][iface_key].get(u"ip4_address")
except KeyError:
return None
+ @staticmethod
+ def get_interface_ip4_prefix_length(node, iface_key):
+ """Get IP4 address prefix length for the interface.
+
+ :param node: Node to get prefix length on.
+ :param iface_key: Interface key from topology file.
+ :type node: dict
+ :type iface_key: str
+ :returns: Prefix length from topology file or the default
+ IP4 prefix length if not found.
+ :rtype: int
+ :raises: KeyError if iface_key is not found.
+ """
+ return node[u"interfaces"][iface_key].get(u"ip4_prefix_length", \
+ Constants.DEFAULT_IP4_PREFIX_LENGTH)
+
@staticmethod
def get_adjacent_node_and_interface(nodes_info, node, iface_key):
"""Get node and interface adjacent to specified interface
"""
link_name = None
# get link name where the interface belongs to
- for if_key, if_val in node['interfaces'].iteritems():
- if if_key == 'mgmt':
+ for if_key, if_val in node[u"interfaces"].items():
+ if if_key == u"mgmt":
continue
if if_key == iface_key:
- link_name = if_val['link']
+ link_name = if_val[u"link"]
break
if link_name is None:
# find link
for node_data in nodes_info.values():
# skip self
- if node_data['host'] == node['host']:
+ if node_data[u"host"] == node[u"host"]:
continue
for if_key, if_val \
- in node_data['interfaces'].iteritems():
- if 'link' not in if_val:
+ in node_data[u"interfaces"].items():
+ if u"link" not in if_val:
continue
- if if_val['link'] == link_name:
+ if if_val[u"link"] == link_name:
return node_data, if_key
return None
:returns: Return PCI address or None if not found.
"""
try:
- return node['interfaces'][iface_key].get('pci_address')
+ return node[u"interfaces"][iface_key].get(u"pci_address")
except KeyError:
return None
:returns: Return interface driver or None if not found.
"""
try:
- return node['interfaces'][iface_key].get('driver')
+ return node[u"interfaces"][iface_key].get(u"driver")
except KeyError:
return None
:returns: Return interface vlan or None if not found.
"""
try:
- return node['interfaces'][iface_key].get('vlan')
+ return node[u"interfaces"][iface_key].get(u"vlan")
except KeyError:
return None
:returns: Return list of keys of all interfaces.
:rtype: list
"""
- return node['interfaces'].keys()
+ return node[u"interfaces"].keys()
@staticmethod
def get_node_link_mac(node, link_name):
:returns: MAC address string.
:rtype: str
"""
- for port in node['interfaces'].values():
- if port.get('link') == link_name:
- return port.get('mac_address')
+ for port in node[u"interfaces"].values():
+ if port.get(u"link") == link_name:
+ return port.get(u"mac_address")
return None
@staticmethod
:returns: List of link names occupied by the node.
:rtype: None or list of string
"""
- interfaces = node['interfaces']
+ interfaces = node[u"interfaces"]
link_names = []
for interface in interfaces.values():
- if 'link' in interface:
- if (filter_list is not None) and ('model' in interface):
+ if u"link" in interface:
+ if (filter_list is not None) and (u"model" in interface):
for filt in filter_list:
- if filt == interface['model']:
- link_names.append(interface['link'])
- elif (filter_list is not None) and ('model' not in interface):
- logger.trace('Cannot apply filter on interface: {}'
- .format(str(interface)))
+ if filt == interface[u"model"]:
+ link_names.append(interface[u"link"])
+ elif (filter_list is not None) and (u"model" not in interface):
+ logger.trace(
+ f"Cannot apply filter on interface: {str(interface)}"
+ )
else:
- link_names.append(interface['link'])
+ link_names.append(interface[u"link"])
if not link_names:
link_names = None
return link_names
- @keyword('Get active links connecting "${node1}" and "${node2}"')
- def get_active_connecting_links(self, node1, node2,
- filter_list_node1=None,
- filter_list_node2=None):
+ def get_active_connecting_links(
+ self, node1, node2, filter_list_node1=None, filter_list_node2=None):
"""Return list of link names that connect together node1 and node2.
:param node1: Node topology dictionary.
:rtype: list
"""
- logger.trace("node1: {}".format(str(node1)))
- logger.trace("node2: {}".format(str(node2)))
node1_links = self._get_node_active_link_names(
- node1,
- filter_list=filter_list_node1)
+ node1, filter_list=filter_list_node1
+ )
node2_links = self._get_node_active_link_names(
- node2,
- filter_list=filter_list_node2)
+ node2, filter_list=filter_list_node2
+ )
connecting_links = None
if node1_links is None:
- logger.error("Unable to find active links for node1")
+ logger.error(u"Unable to find active links for node1")
elif node2_links is None:
- logger.error("Unable to find active links for node2")
+ logger.error(u"Unable to find active links for node2")
else:
- connecting_links = list(set(node1_links).intersection(node2_links))
+ # Not using set operations, as we need deterministic order.
+ connecting_links = [
+ link for link in node1_links if link in node2_links
+ ]
return connecting_links
- @keyword('Get first active connecting link between node "${node1}" and '
- '"${node2}"')
def get_first_active_connecting_link(self, node1, node2):
- """
+ """Get first link connecting the two nodes together.
:param node1: Connected node.
:param node2: Connected node.
"""
connecting_links = self.get_active_connecting_links(node1, node2)
if not connecting_links:
- raise RuntimeError("No links connecting the nodes were found")
+ raise RuntimeError(u"No links connecting the nodes were found")
return connecting_links[0]
- @keyword('Get egress interfaces name on "${node1}" for link with '
- '"${node2}"')
def get_egress_interfaces_name_for_nodes(self, node1, node2):
"""Get egress interfaces on node1 for link with node2.
:returns: Egress interfaces.
:rtype: list
"""
- interfaces = []
+ interfaces = list()
links = self.get_active_connecting_links(node1, node2)
if not links:
- raise RuntimeError('No link between nodes')
- for interface in node1['interfaces'].values():
- link = interface.get('link')
+ raise RuntimeError(u"No link between nodes")
+ for interface in node1[u"interfaces"].values():
+ link = interface.get(u"link")
if link is None:
continue
if link in links:
continue
- name = interface.get('name')
+ name = interface.get(u"name")
if name is None:
continue
interfaces.append(name)
return interfaces
- @keyword('Get first egress interface name on "${node1}" for link with '
- '"${node2}"')
def get_first_egress_interface_for_nodes(self, node1, node2):
"""Get first egress interface on node1 for link with node2.
"""
interfaces = self.get_egress_interfaces_name_for_nodes(node1, node2)
if not interfaces:
- raise RuntimeError('No egress interface for nodes')
+ raise RuntimeError(u"No egress interface for nodes")
return interfaces[0]
- @keyword('Get link data useful in circular topology test from tg "${tgen}"'
- ' dut1 "${dut1}" dut2 "${dut2}"')
def get_links_dict_from_nodes(self, tgen, dut1, dut2):
"""Return link combinations used in tests in circular topology.
tg_traffic_links = [dut1_tg_link, dut2_tg_link]
dut1_bd_links = [dut1_dut2_link, dut1_tg_link]
dut2_bd_links = [dut1_dut2_link, dut2_tg_link]
- topology_links = {'DUT1_DUT2_LINK': dut1_dut2_link,
- 'DUT1_TG_LINK': dut1_tg_link,
- 'DUT2_TG_LINK': dut2_tg_link,
- 'TG_TRAFFIC_LINKS': tg_traffic_links,
- 'DUT1_BD_LINKS': dut1_bd_links,
- 'DUT2_BD_LINKS': dut2_bd_links}
+ topology_links = {
+ u"DUT1_DUT2_LINK": dut1_dut2_link,
+ u"DUT1_TG_LINK": dut1_tg_link,
+ u"DUT2_TG_LINK": dut2_tg_link,
+ u"TG_TRAFFIC_LINKS": tg_traffic_links,
+ u"DUT1_BD_LINKS": dut1_bd_links,
+ u"DUT2_BD_LINKS": dut2_bd_links
+ }
return topology_links
@staticmethod
:returns: True if node is type of TG, otherwise False.
:rtype: bool
"""
- return node['type'] == NodeType.TG
+ return node[u"type"] == NodeType.TG
@staticmethod
def get_node_hostname(node):
:returns: Hostname or IP address.
:rtype: str
"""
- return node['host']
+ return node[u"host"]
@staticmethod
def get_node_arch(node):
:rtype: str
"""
try:
- return node['arch']
+ return node[u"arch"]
except KeyError:
- node['arch'] = 'x86_64'
- return 'x86_64'
+ node[u"arch"] = u"x86_64"
+ return u"x86_64"
@staticmethod
def get_cryptodev(node):
:rtype: str
"""
try:
- return node['cryptodev']
+ return node[u"cryptodev"]
except KeyError:
return None
:rtype: str
"""
try:
- return node['uio_driver']
+ return node[u"uio_driver"]
except KeyError:
return None
:param node: Node to set numa_node on.
:param iface_key: Interface key from topology file.
+ :param numa_node_id: Num_node ID.
:type node: dict
:type iface_key: str
+ :type numa_node_id: int
:returns: Return iface_key or None if not found.
"""
try:
- node['interfaces'][iface_key]['numa_node'] = numa_node_id
+ node[u"interfaces"][iface_key][u"numa_node"] = numa_node_id
return iface_key
except KeyError:
return None
+
+ @staticmethod
+ def add_new_socket(node, socket_type, socket_id, socket_path):
+ """Add socket file of specific SocketType and ID to node.
+
+ :param node: Node to add socket on.
+ :param socket_type: Socket type.
+ :param socket_id: Socket id, currently equals to unique node key.
+ :param socket_path: Socket absolute path.
+ :type node: dict
+ :type socket_type: SocketType
+ :type socket_id: str
+ :type socket_path: str
+ """
+ path = [u"sockets", socket_type, socket_id]
+ Topology.add_node_item(node, socket_path, path)
+
+ @staticmethod
+ def del_node_socket_id(node, socket_type, socket_id):
+ """Delete socket of specific SocketType and ID from node.
+
+ :param node: Node to delete socket from.
+ :param socket_type: Socket type.
+ :param socket_id: Socket id, currently equals to unique node key.
+ :type node: dict
+ :type socket_type: SocketType
+ :type socket_id: str
+ """
+ node[u"sockets"][socket_type].pop(socket_id)
+
+ @staticmethod
+ def get_node_sockets(node, socket_type=None):
+ """Get node socket files.
+
+ :param node: Node to get sockets from.
+ :param socket_type: Socket type or None for all sockets.
+ :type node: dict
+ :type socket_type: SocketType
+ :returns: Node sockets or None if not found.
+ :rtype: dict
+ """
+ try:
+ if socket_type:
+ return node[u"sockets"][socket_type]
+ return node[u"sockets"]
+ except KeyError:
+ return None
+
+ @staticmethod
+ def clean_sockets_on_all_nodes(nodes):
+ """Remove temporary socket files from topology file.
+
+ :param nodes: SUT nodes.
+ :type node: dict
+ """
+ for node in nodes.values():
+ if u"sockets" in list(node.keys()):
+ # Containers are disconnected and destroyed already.
+ node.pop(u"sockets")