-# Copyright (c) 2018 Cisco and/or its affiliates.
+# Copyright (c) 2019 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
"""Defines nodes and topology structure."""
+import re
+
from collections import Counter
-from yaml import load
+from yaml import safe_load
from robot.api import logger
from robot.libraries.BuiltIn import BuiltIn, RobotNotRunningError
-from robot.api.deco import keyword
-__all__ = ["DICT__nodes", 'Topology', 'NodeType']
+__all__ = [
+ u"DICT__nodes", u"Topology", u"NodeType", u"SocketType", u"NodeSubTypeTG"
+]
def load_topo_from_yaml():
:returns: Nodes from loaded topology.
"""
try:
- topo_path = BuiltIn().get_variable_value("${TOPOLOGY_PATH}")
+ topo_path = BuiltIn().get_variable_value(u"${TOPOLOGY_PATH}")
except RobotNotRunningError:
return ''
with open(topo_path) as work_file:
- return load(work_file.read())['nodes']
+ return safe_load(work_file.read())[u"nodes"]
# pylint: disable=invalid-name
-class NodeType(object):
+class NodeType:
"""Defines node types used in topology dictionaries."""
# Device Under Test (this node has VPP running on it)
- DUT = 'DUT'
+ DUT = u"DUT"
# Traffic Generator (this node has traffic generator on it)
- TG = 'TG'
+ TG = u"TG"
# Virtual Machine (this node running on DUT node)
- VM = 'VM'
+ VM = u"VM"
-class NodeSubTypeTG(object):
+class NodeSubTypeTG:
"""Defines node sub-type TG - traffic generator."""
# T-Rex traffic generator
- TREX = 'TREX'
+ TREX = u"TREX"
# Moongen
- MOONGEN = 'MOONGEN'
+ MOONGEN = u"MOONGEN"
# IxNetwork
- IXNET = 'IXNET'
+ IXNET = u"IXNET"
+
+
+class SocketType:
+ """Defines socket types used in topology dictionaries."""
+ # VPP Socket PAPI
+ PAPI = u"PAPI"
+ # VPP PAPI Stats (legacy option until stats are migrated to Socket PAPI)
+ STATS = u"STATS"
+
DICT__nodes = load_topo_from_yaml()
-class Topology(object):
+class Topology:
"""Topology data manipulation and extraction methods.
Defines methods used for manipulation and extraction of data from
the methods without having filled active topology with internal nodes data.
"""
+ def add_node_item(self, node, value, path):
+ """Add item to topology node.
+
+ :param node: Topology node.
+ :param value: Value to insert.
+ :param path: Path where to insert item.
+ :type node: dict
+ :type value: str
+ :type path: list
+ """
+ if len(path) == 1:
+ node[path[0]] = value
+ return
+ if path[0] not in node:
+ node[path[0]] = dict()
+ elif isinstance(node[path[0]], str):
+ node[path[0]] = dict() if node[path[0]] == u"" \
+ else {node[path[0]]: u""}
+ self.add_node_item(node[path[0]], value, path[1:])
+
@staticmethod
def add_new_port(node, ptype):
"""Add new port to the node to active topology.
max_ports = 1000000
iface = None
for i in range(1, max_ports):
- if node['interfaces'].get(str(ptype) + str(i)) is None:
+ if node[u"interfaces"].get(str(ptype) + str(i)) is None:
iface = str(ptype) + str(i)
- node['interfaces'][iface] = dict()
+ node[u"interfaces"][iface] = dict()
break
return iface
:returns: Nothing
"""
try:
- node['interfaces'].pop(iface_key)
+ node[u"interfaces"].pop(iface_key)
except KeyError:
pass
:type ptype: str
:returns: Nothing
"""
- for if_key in list(node['interfaces']):
+ for if_key in list(node[u"interfaces"]):
if if_key.startswith(str(ptype)):
- node['interfaces'].pop(if_key)
+ node[u"interfaces"].pop(if_key)
@staticmethod
def remove_all_added_ports_on_all_duts_from_topology(nodes):
:type nodes: dict
:returns: Nothing
"""
- port_types = ('subinterface', 'vlan_subif', 'memif', 'tap', 'vhost',
- 'loopback', 'gre_tunnel', 'vxlan_tunnel', 'eth_bond')
+ port_types = (
+ u"subinterface", u"vlan_subif", u"memif", u"tap", u"vhost",
+ u"loopback", u"gre_tunnel", u"vxlan_tunnel", u"eth_bond", u"eth_avf"
+ )
for node_data in nodes.values():
- if node_data['type'] == NodeType.DUT:
+ if node_data[u"type"] == NodeType.DUT:
for ptype in port_types:
Topology.remove_all_ports(node_data, ptype)
+ @staticmethod
+ def remove_all_vif_ports(node):
+ """Remove all Virtual Interfaces on DUT node.
+
+ :param node: Node to remove VIF ports on.
+ :type node: dict
+ :returns: Nothing
+ """
+ reg_ex = re.compile(r"port\d+_vif\d+")
+ for if_key in list(node[u"interfaces"]):
+ if re.match(reg_ex, if_key):
+ node[u"interfaces"].pop(if_key)
+
+ @staticmethod
+ def remove_all_added_vif_ports_on_all_duts_from_topology(nodes):
+ """Remove all added Virtual Interfaces on all DUT nodes in
+ the topology.
+
+ :param nodes: Nodes in the topology.
+ :type nodes: dict
+ :returns: Nothing
+ """
+ for node_data in nodes.values():
+ if node_data[u"type"] == NodeType.DUT:
+ Topology.remove_all_vif_ports(node_data)
+
@staticmethod
def update_interface_sw_if_index(node, iface_key, sw_if_index):
"""Update sw_if_index on the interface from the node.
:type iface_key: str
:type sw_if_index: int
"""
- node['interfaces'][iface_key]['vpp_sw_index'] = int(sw_if_index)
+ node[u"interfaces"][iface_key][u"vpp_sw_index"] = int(sw_if_index)
@staticmethod
def update_interface_name(node, iface_key, name):
:type iface_key: str
:type name: str
"""
- node['interfaces'][iface_key]['name'] = str(name)
+ node[u"interfaces"][iface_key][u"name"] = str(name)
@staticmethod
def update_interface_mac_address(node, iface_key, mac_address):
:type iface_key: str
:type mac_address: str
"""
- node['interfaces'][iface_key]['mac_address'] = str(mac_address)
+ node[u"interfaces"][iface_key][u"mac_address"] = str(mac_address)
+
+ @staticmethod
+ def update_interface_pci_address(node, iface_key, pci_address):
+ """Update pci_address on the interface from the node.
+
+ :param node: Node to update PCI on.
+ :param iface_key: Topology key of the interface.
+ :param pci_address: PCI address.
+ :type node: dict
+ :type iface_key: str
+ :type pci_address: str
+ """
+ node[u"interfaces"][iface_key][u"pci_address"] = str(pci_address)
+
+ @staticmethod
+ def update_interface_vlan(node, iface_key, vlan):
+ """Update VLAN on the interface from the node.
+
+ :param node: Node to update VLAN on.
+ :param iface_key: Topology key of the interface.
+ :param vlan: VLAN ID.
+ :type node: dict
+ :type iface_key: str
+ :type vlan: str
+ """
+ node[u"interfaces"][iface_key][u"vlan"] = int(vlan)
@staticmethod
def update_interface_vhost_socket(node, iface_key, vhost_socket):
:type iface_key: str
:type vhost_socket: str
"""
- node['interfaces'][iface_key]['vhost_socket'] = str(vhost_socket)
+ node[u"interfaces"][iface_key][u"vhost_socket"] = str(vhost_socket)
@staticmethod
def update_interface_memif_socket(node, iface_key, memif_socket):
:type iface_key: str
:type memif_socket: str
"""
- node['interfaces'][iface_key]['memif_socket'] = str(memif_socket)
+ node[u"interfaces"][iface_key][u"memif_socket"] = str(memif_socket)
@staticmethod
def update_interface_memif_id(node, iface_key, memif_id):
:type iface_key: str
:type memif_id: str
"""
- node['interfaces'][iface_key]['memif_id'] = str(memif_id)
+ node[u"interfaces"][iface_key][u"memif_id"] = str(memif_id)
@staticmethod
def update_interface_memif_role(node, iface_key, memif_role):
:type iface_key: str
:type memif_role: str
"""
- node['interfaces'][iface_key]['memif_role'] = str(memif_role)
+ node[u"interfaces"][iface_key][u"memif_role"] = str(memif_role)
@staticmethod
def update_interface_tap_dev_name(node, iface_key, dev_name):
:type dev_name: str
:returns: Nothing
"""
- node['interfaces'][iface_key]['dev_name'] = str(dev_name)
+ node[u"interfaces"][iface_key][u"dev_name"] = str(dev_name)
@staticmethod
def get_node_by_hostname(nodes, hostname):
:returns: Node dictionary or None if not found.
"""
for node in nodes.values():
- if node['host'] == hostname:
+ if node[u"host"] == hostname:
return node
return None
:returns: Links in the topology.
:rtype: list
"""
- links = []
+ links = list()
for node in nodes.values():
- for interface in node['interfaces'].values():
- link = interface.get('link')
+ for interface in node[u"interfaces"].values():
+ link = interface.get(u"link")
if link is not None:
if link not in links:
links.append(link)
:returns: Interface key from topology file
:rtype: string
"""
- interfaces = node['interfaces']
+ interfaces = node[u"interfaces"]
retval = None
- for if_key, if_val in interfaces.iteritems():
+ for if_key, if_val in interfaces.items():
k_val = if_val.get(key)
if k_val is not None:
if k_val == value:
:returns: Interface key.
:rtype: str
"""
- return Topology._get_interface_by_key_value(node, "name", iface_name)
+ return Topology._get_interface_by_key_value(node, u"name", iface_name)
@staticmethod
def get_interface_by_link_name(node, link_name):
:returns: Interface key of the interface connected to the given link.
:rtype: str
"""
- return Topology._get_interface_by_key_value(node, "link", link_name)
+ return Topology._get_interface_by_key_value(node, u"link", link_name)
def get_interfaces_by_link_names(self, node, link_names):
"""Return dictionary of dictionaries {"interfaceN", interface name}.
links.
:rtype: dict
"""
- retval = {}
- interface_key_tpl = "interface{}"
+ retval = dict()
interface_number = 1
for link_name in link_names:
interface = self.get_interface_by_link_name(node, link_name)
- interface_name = self.get_interface_name(node, interface)
- interface_key = interface_key_tpl.format(str(interface_number))
- retval[interface_key] = interface_name
+ retval[f"interface{str(interface_number)}"] = \
+ self.get_interface_name(node, interface)
interface_number += 1
return retval
@staticmethod
- def get_interface_by_sw_index(node, sw_index):
+ def get_interface_by_sw_index(node, sw_if_index):
"""Return interface name of link on node.
This method returns the interface name associated with a software
interface index assigned to the interface by vpp for a given node.
:param node: The node topology dictionary.
- :param sw_index: Sw_index of the link that a interface is connected to.
+ :param sw_if_index: sw_if_index of the link that a interface is
+ connected to.
:type node: dict
- :type sw_index: int
+ :type sw_if_index: int
:returns: Interface name of the interface connected to the given link.
:rtype: str
"""
- return Topology._get_interface_by_key_value(node, "vpp_sw_index",
- sw_index)
+ return Topology._get_interface_by_key_value(
+ node, u"vpp_sw_index", sw_if_index
+ )
@staticmethod
def get_interface_sw_index(node, iface_key):
"""Get VPP sw_if_index for the interface using interface key.
:param node: Node to get interface sw_if_index on.
- :param iface_key: Interface key from topology file, or sw_index.
+ :param iface_key: Interface key from topology file, or sw_if_index.
:type node: dict
:type iface_key: str/int
:returns: Return sw_if_index or None if not found.
:rtype: int or None
"""
try:
- if isinstance(iface_key, basestring):
- return node['interfaces'][iface_key].get('vpp_sw_index')
+ if isinstance(iface_key, str):
+ return node[u"interfaces"][iface_key].get(u"vpp_sw_index")
# TODO: use only iface_key, do not use integer
return int(iface_key)
except (KeyError, ValueError):
:raises TypeError: If provided interface name is not a string.
"""
try:
- if not isinstance(iface_name, basestring):
- raise TypeError("Interface name must be a string.")
+ if not isinstance(iface_name, str):
+ raise TypeError(u"Interface name must be a string.")
iface_key = Topology.get_interface_by_name(node, iface_name)
- return node['interfaces'][iface_key].get('vpp_sw_index')
+ return node[u"interfaces"][iface_key].get(u"vpp_sw_index")
except (KeyError, ValueError):
return None
:rtype: int
"""
try:
- return node['interfaces'][iface_key].get('mtu')
+ return node[u"interfaces"][iface_key].get(u"mtu")
except KeyError:
return None
:rtype: str
"""
try:
- return node['interfaces'][iface_key].get('name')
+ return node[u"interfaces"][iface_key].get(u"name")
except KeyError:
return None
if isinstance(interface, int):
key = Topology.get_interface_by_sw_index(node, interface)
if key is None:
- raise RuntimeError("Interface with sw_if_index={0} does not "
- "exist in topology.".format(interface))
+ raise RuntimeError(
+ f"Interface with sw_if_index={interface} does not exist "
+ f"in topology."
+ )
elif interface in Topology.get_node_interfaces(node):
key = interface
- elif interface in Topology.get_links({"dut": node}):
+ elif interface in Topology.get_links({u"dut": node}):
key = Topology.get_interface_by_link_name(node, interface)
- elif isinstance(interface, basestring):
+ elif isinstance(interface, str):
key = Topology.get_interface_by_name(node, interface)
if key is None:
- raise RuntimeError("Interface with key, name or link name "
- "\"{0}\" does not exist in topology."
- .format(interface))
+ raise RuntimeError(
+ f"Interface with key, name or link name \"{interface}\" "
+ f"does not exist in topology."
+ )
else:
- raise TypeError("Type of interface argument must be integer"
- " or string.")
+ raise TypeError(
+ u"Type of interface argument must be integer or string."
+ )
return key
@staticmethod
key = Topology.convert_interface_reference_to_key(node, interface)
conversions = {
- "key": lambda x, y: y,
- "name": Topology.get_interface_name,
- "sw_if_index": Topology.get_interface_sw_index
+ u"key": lambda x, y: y,
+ u"name": Topology.get_interface_name,
+ u"sw_if_index": Topology.get_interface_sw_index
}
try:
return conversions[wanted_format](node, key)
except KeyError:
- raise ValueError("Unrecognized return value wanted: {0}."
- "Valid options are key, name, sw_if_index"
- .format(wanted_format))
+ raise ValueError(
+ f"Unrecognized return value wanted: {wanted_format}."
+ f"Valid options are key, name, sw_if_index"
+ )
@staticmethod
def get_interface_numa_node(node, iface_key):
:rtype: int
"""
try:
- return node['interfaces'][iface_key].get('numa_node')
+ return node[u"interfaces"][iface_key].get(u"numa_node")
except KeyError:
return None
numa_list = []
for if_key in iface_keys:
try:
- numa_list.append(node['interfaces'][if_key].get('numa_node'))
+ numa_list.append(node[u"interfaces"][if_key].get(u"numa_node"))
except KeyError:
pass
:returns: Return MAC or None if not found.
"""
try:
- return node['interfaces'][iface_key].get('mac_address')
+ return node[u"interfaces"][iface_key].get(u"mac_address")
except KeyError:
return None
:returns: Return IP4 or None if not found.
"""
try:
- return node['interfaces'][iface_key].get('ip4_address', None)
+ return node[u"interfaces"][iface_key].get(u"ip4_address")
except KeyError:
return None
"""
link_name = None
# get link name where the interface belongs to
- for if_key, if_val in node['interfaces'].iteritems():
- if if_key == 'mgmt':
+ for if_key, if_val in node[u"interfaces"].items():
+ if if_key == u"mgmt":
continue
if if_key == iface_key:
- link_name = if_val['link']
+ link_name = if_val[u"link"]
break
if link_name is None:
# find link
for node_data in nodes_info.values():
# skip self
- if node_data['host'] == node['host']:
+ if node_data[u"host"] == node[u"host"]:
continue
for if_key, if_val \
- in node_data['interfaces'].iteritems():
- if 'link' not in if_val:
+ in node_data[u"interfaces"].items():
+ if u"link" not in if_val:
continue
- if if_val['link'] == link_name:
+ if if_val[u"link"] == link_name:
return node_data, if_key
return None
:returns: Return PCI address or None if not found.
"""
try:
- return node['interfaces'][iface_key].get('pci_address')
+ return node[u"interfaces"][iface_key].get(u"pci_address")
except KeyError:
return None
:returns: Return interface driver or None if not found.
"""
try:
- return node['interfaces'][iface_key].get('driver')
+ return node[u"interfaces"][iface_key].get(u"driver")
+ except KeyError:
+ return None
+
+ @staticmethod
+ def get_interface_vlan(node, iface_key):
+ """Get interface vlan.
+
+ :param node: Node to get interface driver on.
+ :param iface_key: Interface key from topology file.
+ :type node: dict
+ :type iface_key: str
+ :returns: Return interface vlan or None if not found.
+ """
+ try:
+ return node[u"interfaces"][iface_key].get(u"vlan")
except KeyError:
return None
:returns: Return list of keys of all interfaces.
:rtype: list
"""
- return node['interfaces'].keys()
+ return node[u"interfaces"].keys()
@staticmethod
def get_node_link_mac(node, link_name):
"""Return interface mac address by link name.
- :param node: Node to get interface sw_index on.
+ :param node: Node to get interface sw_if_index on.
:param link_name: Link name.
:type node: dict
:type link_name: str
:returns: MAC address string.
:rtype: str
"""
- for port in node['interfaces'].values():
- if port.get('link') == link_name:
- return port.get('mac_address')
+ for port in node[u"interfaces"].values():
+ if port.get(u"link") == link_name:
+ return port.get(u"mac_address")
return None
@staticmethod
:returns: List of link names occupied by the node.
:rtype: None or list of string
"""
- interfaces = node['interfaces']
+ interfaces = node[u"interfaces"]
link_names = []
for interface in interfaces.values():
- if 'link' in interface:
- if (filter_list is not None) and ('model' in interface):
+ if u"link" in interface:
+ if (filter_list is not None) and (u"model" in interface):
for filt in filter_list:
- if filt == interface['model']:
- link_names.append(interface['link'])
- elif (filter_list is not None) and ('model' not in interface):
- logger.trace('Cannot apply filter on interface: {}'
- .format(str(interface)))
+ if filt == interface[u"model"]:
+ link_names.append(interface[u"link"])
+ elif (filter_list is not None) and (u"model" not in interface):
+ logger.trace(
+ f"Cannot apply filter on interface: {str(interface)}"
+ )
else:
- link_names.append(interface['link'])
+ link_names.append(interface[u"link"])
if not link_names:
link_names = None
return link_names
- @keyword('Get active links connecting "${node1}" and "${node2}"')
- def get_active_connecting_links(self, node1, node2,
- filter_list_node1=None,
- filter_list_node2=None):
+ def get_active_connecting_links(
+ self, node1, node2, filter_list_node1=None, filter_list_node2=None):
"""Return list of link names that connect together node1 and node2.
:param node1: Node topology dictionary.
:rtype: list
"""
- logger.trace("node1: {}".format(str(node1)))
- logger.trace("node2: {}".format(str(node2)))
+ logger.trace(f"node1: {str(node1)}")
+ logger.trace(f"node2: {str(node2)}")
node1_links = self._get_node_active_link_names(
- node1,
- filter_list=filter_list_node1)
+ node1, filter_list=filter_list_node1
+ )
node2_links = self._get_node_active_link_names(
- node2,
- filter_list=filter_list_node2)
+ node2, filter_list=filter_list_node2
+ )
connecting_links = None
if node1_links is None:
- logger.error("Unable to find active links for node1")
+ logger.error(u"Unable to find active links for node1")
elif node2_links is None:
- logger.error("Unable to find active links for node2")
+ logger.error(u"Unable to find active links for node2")
else:
connecting_links = list(set(node1_links).intersection(node2_links))
return connecting_links
- @keyword('Get first active connecting link between node "${node1}" and '
- '"${node2}"')
def get_first_active_connecting_link(self, node1, node2):
- """
+ """Get first link connecting the two nodes together.
:param node1: Connected node.
:param node2: Connected node.
"""
connecting_links = self.get_active_connecting_links(node1, node2)
if not connecting_links:
- raise RuntimeError("No links connecting the nodes were found")
+ raise RuntimeError(u"No links connecting the nodes were found")
return connecting_links[0]
- @keyword('Get egress interfaces name on "${node1}" for link with '
- '"${node2}"')
def get_egress_interfaces_name_for_nodes(self, node1, node2):
"""Get egress interfaces on node1 for link with node2.
:returns: Egress interfaces.
:rtype: list
"""
- interfaces = []
+ interfaces = list()
links = self.get_active_connecting_links(node1, node2)
if not links:
- raise RuntimeError('No link between nodes')
- for interface in node1['interfaces'].values():
- link = interface.get('link')
+ raise RuntimeError(u"No link between nodes")
+ for interface in node1[u"interfaces"].values():
+ link = interface.get(u"link")
if link is None:
continue
if link in links:
continue
- name = interface.get('name')
+ name = interface.get(u"name")
if name is None:
continue
interfaces.append(name)
return interfaces
- @keyword('Get first egress interface name on "${node1}" for link with '
- '"${node2}"')
def get_first_egress_interface_for_nodes(self, node1, node2):
"""Get first egress interface on node1 for link with node2.
"""
interfaces = self.get_egress_interfaces_name_for_nodes(node1, node2)
if not interfaces:
- raise RuntimeError('No egress interface for nodes')
+ raise RuntimeError(u"No egress interface for nodes")
return interfaces[0]
- @keyword('Get link data useful in circular topology test from tg "${tgen}"'
- ' dut1 "${dut1}" dut2 "${dut2}"')
def get_links_dict_from_nodes(self, tgen, dut1, dut2):
"""Return link combinations used in tests in circular topology.
tg_traffic_links = [dut1_tg_link, dut2_tg_link]
dut1_bd_links = [dut1_dut2_link, dut1_tg_link]
dut2_bd_links = [dut1_dut2_link, dut2_tg_link]
- topology_links = {'DUT1_DUT2_LINK': dut1_dut2_link,
- 'DUT1_TG_LINK': dut1_tg_link,
- 'DUT2_TG_LINK': dut2_tg_link,
- 'TG_TRAFFIC_LINKS': tg_traffic_links,
- 'DUT1_BD_LINKS': dut1_bd_links,
- 'DUT2_BD_LINKS': dut2_bd_links}
+ topology_links = {
+ u"DUT1_DUT2_LINK": dut1_dut2_link,
+ u"DUT1_TG_LINK": dut1_tg_link,
+ u"DUT2_TG_LINK": dut2_tg_link,
+ u"TG_TRAFFIC_LINKS": tg_traffic_links,
+ u"DUT1_BD_LINKS": dut1_bd_links,
+ u"DUT2_BD_LINKS": dut2_bd_links
+ }
return topology_links
@staticmethod
:returns: True if node is type of TG, otherwise False.
:rtype: bool
"""
- return node['type'] == NodeType.TG
+ return node[u"type"] == NodeType.TG
@staticmethod
def get_node_hostname(node):
:returns: Hostname or IP address.
:rtype: str
"""
- return node['host']
+ return node[u"host"]
@staticmethod
def get_node_arch(node):
:rtype: str
"""
try:
- return node['arch']
+ return node[u"arch"]
except KeyError:
- node['arch'] = 'x86_64'
- return 'x86_64'
+ node[u"arch"] = u"x86_64"
+ return u"x86_64"
@staticmethod
def get_cryptodev(node):
:rtype: str
"""
try:
- return node['cryptodev']
+ return node[u"cryptodev"]
except KeyError:
return None
:rtype: str
"""
try:
- return node['uio_driver']
+ return node[u"uio_driver"]
except KeyError:
return None
:returns: Return iface_key or None if not found.
"""
try:
- node['interfaces'][iface_key]['numa_node'] = numa_node_id
+ node[u"interfaces"][iface_key][u"numa_node"] = numa_node_id
return iface_key
except KeyError:
return None
+
+ def add_new_socket(self, node, socket_type, socket_id, socket_path):
+ """Add socket file of specific SocketType and ID to node.
+
+ :param node: Node to add socket on.
+ :param socket_type: Socket type.
+ :param socket_id: Socket id.
+ :param socket_path: Socket absolute path.
+ :type node: dict
+ :type socket_type: SocketType
+ :type socket_id: str
+ :type socket_path: str
+ """
+ path = [u"sockets", socket_type, socket_id]
+ self.add_node_item(node, socket_path, path)
+
+ @staticmethod
+ def get_node_sockets(node, socket_type=None):
+ """Get node socket files.
+
+ :param node: Node to get sockets from.
+ :param socket_type: Socket type or None for all sockets.
+ :type node: dict
+ :type socket_type: SocketType
+ :returns: Node sockets or None if not found.
+ :rtype: dict
+ """
+ try:
+ if socket_type:
+ return node[u"sockets"][socket_type]
+ return node[u"sockets"]
+ except KeyError:
+ return None
+
+ @staticmethod
+ def clean_sockets_on_all_nodes(nodes):
+ """Remove temporary socket files from topology file.
+
+ :param nodes: SUT nodes.
+ :type node: dict
+ """
+ for node in nodes.values():
+ if u"sockets" in list(node.keys()):
+ node.pop(u"sockets")