"""Defines nodes and topology structure."""
+import re
+
from collections import Counter
from yaml import load
from robot.libraries.BuiltIn import BuiltIn, RobotNotRunningError
from robot.api.deco import keyword
-__all__ = ["DICT__nodes", 'Topology', 'NodeType']
+__all__ = ["DICT__nodes", 'Topology', 'NodeType', 'SocketType']
def load_topo_from_yaml():
# IxNetwork
IXNET = 'IXNET'
+class SocketType(object):
+ """Defines socket types used in topology dictionaries."""
+ # VPP Socket PAPI
+ PAPI = 'PAPI'
+ # VPP PAPI Stats (legacy option until stats are migrated to Socket PAPI)
+ STATS = 'STATS'
+
DICT__nodes = load_topo_from_yaml()
the methods without having filled active topology with internal nodes data.
"""
+ def add_node_item(self, node, value, path):
+ """Add item to topology node.
+
+ :param node: Topology node.
+ :param value: Value to insert.
+ :param path: Path where to insert item.
+ :type node: dict
+ :type value: str
+ :type path: list
+ """
+ if len(path) == 1:
+ node[path[0]] = value
+ return
+ if path[0] not in node:
+ node[path[0]] = {}
+ elif isinstance(node[path[0]], str):
+ node[path[0]] = {} if node[path[0]] == '' \
+ else {node[path[0]]: ''}
+ self.add_node_item(node[path[0]], value, path[1:])
+
@staticmethod
def add_new_port(node, ptype):
"""Add new port to the node to active topology.
"""
port_types = ('subinterface', 'vlan_subif', 'memif', 'tap', 'vhost',
'loopback', 'gre_tunnel', 'vxlan_tunnel', 'eth_bond',
- 'avf')
+ 'eth_avf')
for node_data in nodes.values():
if node_data['type'] == NodeType.DUT:
for ptype in port_types:
Topology.remove_all_ports(node_data, ptype)
+ @staticmethod
+ def remove_all_vif_ports(node):
+ """Remove all Virtual Interfaces on DUT node.
+
+ :param node: Node to remove VIF ports on.
+ :type node: dict
+ :returns: Nothing
+ """
+ reg_ex = re.compile(r'port\d+_vif\d+')
+ for if_key in list(node['interfaces']):
+ if re.match(reg_ex, if_key):
+ node['interfaces'].pop(if_key)
+
+ @staticmethod
+ def remove_all_added_vif_ports_on_all_duts_from_topology(nodes):
+ """Remove all added Virtual Interfaces on all DUT nodes in
+ the topology.
+
+ :param nodes: Nodes in the topology.
+ :type nodes: dict
+ :returns: Nothing
+ """
+ for node_data in nodes.values():
+ if node_data['type'] == NodeType.DUT:
+ Topology.remove_all_vif_ports(node_data)
+
@staticmethod
def update_interface_sw_if_index(node, iface_key, sw_if_index):
"""Update sw_if_index on the interface from the node.
interface index assigned to the interface by vpp for a given node.
:param node: The node topology dictionary.
- :param sw_if_index: sw_if_index of the link that a interface is connected to.
+ :param sw_if_index: sw_if_index of the link that a interface is
+ connected to.
:type node: dict
:type sw_if_index: int
:returns: Interface name of the interface connected to the given link.
return iface_key
except KeyError:
return None
+
+ def add_new_socket(self, node, socket_type, socket_id, socket_path):
+ """Add socket file of specific SocketType and ID to node.
+
+ :param node: Node to add socket on.
+ :param socket_type: Socket type.
+ :param socket_id: Socket id.
+ :param socket_path: Socket absolute path.
+ :type node: dict
+ :type socket_type: SocketType
+ :type socket_id: str
+ :type socket path: str
+ """
+ path = ['sockets', socket_type, socket_id]
+ self.add_node_item(node, socket_path, path)
+
+ @staticmethod
+ def get_node_sockets(node, socket_type=None):
+ """Get node socket files.
+
+ :param node: Node to get sockets from.
+ :param socket_type: Socket type or None for all sockets.
+ :type node: dict
+ :type socket_type: SocketType
+ :returns: Node sockets or None if not found.
+ :rtype: list
+ """
+ try:
+ if socket_type:
+ return node['sockets'][socket_type]
+ return node['sockets']
+ except KeyError:
+ return None
+
+ @staticmethod
+ def clean_sockets_on_all_nodes(nodes):
+ """Remove temporary socket files from topology file.
+
+ :param nodes: SUT nodes.
+ :type node: dict
+ """
+ for node in nodes.values():
+ if 'sockets' in node.keys():
+ node.pop('sockets')