+ @staticmethod
+ def add_node_item(node, value, path):
+ """Add item to topology node.
+
+ :param node: Topology node.
+ :param value: Value to insert.
+ :param path: Path where to insert item.
+ :type node: dict
+ :type value: str
+ :type path: list
+ """
+ if len(path) == 1:
+ node[path[0]] = value
+ return
+ if path[0] not in node:
+ node[path[0]] = dict()
+ elif isinstance(node[path[0]], str):
+ node[path[0]] = dict() if node[path[0]] == u"" \
+ else {node[path[0]]: u""}
+ Topology.add_node_item(node[path[0]], value, path[1:])
+
+ @staticmethod
+ def add_new_port(node, ptype):
+ """Add new port to the node to active topology.
+
+ :param node: Node to add new port on.
+ :param ptype: Port type, used as key prefix.
+ :type node: dict
+ :type ptype: str
+ :returns: Port key or None
+ :rtype: string or None
+ """
+ max_ports = 1000000
+ iface = None
+ for i in range(1, max_ports):
+ if node[u"interfaces"].get(str(ptype) + str(i)) is None:
+ iface = str(ptype) + str(i)
+ node[u"interfaces"][iface] = dict()
+ break
+ return iface
+
+ @staticmethod
+ def remove_port(node, iface_key):
+ """Remove required port from active topology.
+
+ :param node: Node to remove port on.
+ :param: iface_key: Topology key of the interface.
+ :type node: dict
+ :type iface_key: str
+ :returns: Nothing
+ """
+ try:
+ node[u"interfaces"].pop(iface_key)
+ except KeyError:
+ pass
+
+ @staticmethod
+ def remove_all_ports(node, ptype):
+ """Remove all ports with ptype as prefix.
+
+ :param node: Node to remove ports on.
+ :param: ptype: Port type, used as key prefix.
+ :type node: dict
+ :type ptype: str
+ :returns: Nothing
+ """
+ for if_key in list(node[u"interfaces"]):
+ if if_key.startswith(str(ptype)):
+ node[u"interfaces"].pop(if_key)
+
+ @staticmethod
+ def remove_all_added_ports_on_all_duts_from_topology(nodes):
+ """Remove all added ports on all DUT nodes in the topology.
+
+ :param nodes: Nodes in the topology.
+ :type nodes: dict
+ :returns: Nothing
+ """
+ port_types = (
+ u"subinterface", u"vlan_subif", u"memif", u"tap", u"vhost",
+ u"loopback", u"gre_tunnel", u"vxlan_tunnel", u"eth_bond",
+ u"eth_avf", u"eth_rdma", u"geneve_tunnel", u"eth_af_xdp",
+ u"gtpu_tunnel"
+ )
+
+ for node_data in nodes.values():
+ if node_data[u"type"] == NodeType.DUT:
+ for ptype in port_types:
+ Topology.remove_all_ports(node_data, ptype)
+
+ @staticmethod
+ def remove_all_vif_ports(node):
+ """Remove all Virtual Interfaces on DUT node.
+
+ :param node: Node to remove VIF ports on.
+ :type node: dict
+ :returns: Nothing
+ """
+ reg_ex = re.compile(r"port\d+_vif\d+")
+ for if_key in list(node[u"interfaces"]):
+ if re.match(reg_ex, if_key):
+ node[u"interfaces"].pop(if_key)
+
+ @staticmethod
+ def remove_all_added_vif_ports_on_all_duts_from_topology(nodes):
+ """Remove all added Virtual Interfaces on all DUT nodes in
+ the topology.
+
+ :param nodes: Nodes in the topology.
+ :type nodes: dict
+ :returns: Nothing
+ """
+ for node_data in nodes.values():
+ if node_data[u"type"] == NodeType.DUT:
+ Topology.remove_all_vif_ports(node_data)
+
+ @staticmethod
+ def update_interface_sw_if_index(node, iface_key, sw_if_index):
+ """Update sw_if_index on the interface from the node.
+
+ :param node: Node to update sw_if_index on.
+ :param iface_key: Topology key of the interface.
+ :param sw_if_index: Internal index to store.
+ :type node: dict
+ :type iface_key: str
+ :type sw_if_index: int
+ """
+ node[u"interfaces"][iface_key][u"vpp_sw_index"] = int(sw_if_index)
+
+ @staticmethod
+ def update_interface_name(node, iface_key, name):
+ """Update name on the interface from the node.
+
+ :param node: Node to update name on.
+ :param iface_key: Topology key of the interface.
+ :param name: Interface name to store.
+ :type node: dict
+ :type iface_key: str
+ :type name: str
+ """
+ node[u"interfaces"][iface_key][u"name"] = str(name)
+
+ @staticmethod
+ def update_interface_mac_address(node, iface_key, mac_address):
+ """Update mac_address on the interface from the node.
+
+ :param node: Node to update MAC on.
+ :param iface_key: Topology key of the interface.
+ :param mac_address: MAC address.
+ :type node: dict
+ :type iface_key: str
+ :type mac_address: str
+ """
+ node[u"interfaces"][iface_key][u"mac_address"] = str(mac_address)
+
+ @staticmethod
+ def update_interface_pci_address(node, iface_key, pci_address):
+ """Update pci_address on the interface from the node.
+
+ :param node: Node to update PCI on.
+ :param iface_key: Topology key of the interface.
+ :param pci_address: PCI address.
+ :type node: dict
+ :type iface_key: str
+ :type pci_address: str
+ """
+ node[u"interfaces"][iface_key][u"pci_address"] = str(pci_address)
+
+ @staticmethod
+ def update_interface_vlan(node, iface_key, vlan):
+ """Update VLAN on the interface from the node.
+
+ :param node: Node to update VLAN on.
+ :param iface_key: Topology key of the interface.
+ :param vlan: VLAN ID.
+ :type node: dict
+ :type iface_key: str
+ :type vlan: str
+ """
+ node[u"interfaces"][iface_key][u"vlan"] = int(vlan)
+
+ @staticmethod
+ def update_interface_vhost_socket(node, iface_key, vhost_socket):
+ """Update vhost socket name on the interface from the node.
+
+ :param node: Node to update socket name on.
+ :param iface_key: Topology key of the interface.
+ :param vhost_socket: Path to named socket on node.
+ :type node: dict
+ :type iface_key: str
+ :type vhost_socket: str
+ """
+ node[u"interfaces"][iface_key][u"vhost_socket"] = str(vhost_socket)
+
+ @staticmethod
+ def update_interface_memif_socket(node, iface_key, memif_socket):
+ """Update memif socket name on the interface from the node.
+
+ :param node: Node to update socket name on.
+ :param iface_key: Topology key of the interface.
+ :param memif_socket: Path to named socket on node.
+ :type node: dict
+ :type iface_key: str
+ :type memif_socket: str
+ """
+ node[u"interfaces"][iface_key][u"memif_socket"] = str(memif_socket)
+
+ @staticmethod
+ def update_interface_memif_id(node, iface_key, memif_id):
+ """Update memif ID on the interface from the node.
+
+ :param node: Node to update memif ID on.
+ :param iface_key: Topology key of the interface.
+ :param memif_id: Memif interface ID.
+ :type node: dict
+ :type iface_key: str
+ :type memif_id: str
+ """
+ node[u"interfaces"][iface_key][u"memif_id"] = str(memif_id)
+
+ @staticmethod
+ def update_interface_memif_role(node, iface_key, memif_role):
+ """Update memif role on the interface from the node.
+
+ :param node: Node to update memif role on.
+ :param iface_key: Topology key of the interface.
+ :param memif_role: Memif role.
+ :type node: dict
+ :type iface_key: str
+ :type memif_role: str
+ """
+ node[u"interfaces"][iface_key][u"memif_role"] = str(memif_role)
+
+ @staticmethod
+ def update_interface_tap_dev_name(node, iface_key, dev_name):
+ """Update device name on the tap interface from the node.
+
+ :param node: Node to update tap device name on.
+ :param iface_key: Topology key of the interface.
+ :param dev_name: Device name of the tap interface.
+ :type node: dict
+ :type iface_key: str
+ :type dev_name: str
+ :returns: Nothing
+ """
+ node[u"interfaces"][iface_key][u"dev_name"] = str(dev_name)