Code Review
/
csit.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
CSIT-1597 API cleanup: classify
[csit.git]
/
resources
/
libraries
/
python
/
NodePath.py
diff --git
a/resources/libraries/python/NodePath.py
b/resources/libraries/python/NodePath.py
index
d1aa1f7
..
e97bde8
100644
(file)
--- a/
resources/libraries/python/NodePath.py
+++ b/
resources/libraries/python/NodePath.py
@@
-1,4
+1,4
@@
-# Copyright (c) 201
6
Cisco and/or its affiliates.
+# Copyright (c) 201
9
Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@
-13,10
+13,10
@@
"""Path utilities library for nodes in the topology."""
"""Path utilities library for nodes in the topology."""
-from topology import Topology
+from
resources.libraries.python.
topology import Topology
-class NodePath
(object)
:
+class NodePath:
"""Path utilities for nodes in the topology.
:Example:
"""Path utilities for nodes in the topology.
:Example:
@@
-55,16
+55,20
@@
class NodePath(object):
def __init__(self):
self._nodes = []
def __init__(self):
self._nodes = []
+ self._nodes_filter = []
self._links = []
self._path = []
self._path_iter = []
self._links = []
self._path = []
self._path_iter = []
- def append_node(self, node):
+ def append_node(self, node
, filter_list=None
):
"""Append node to the path.
:param node: Node to append to the path.
"""Append node to the path.
:param node: Node to append to the path.
+ :param filter_list: Filter criteria list.
:type node: dict
:type node: dict
+ :type filter_list: list of strings
"""
"""
+ self._nodes_filter.append(filter_list)
self._nodes.append(node)
def append_nodes(self, *nodes):
self._nodes.append(node)
def append_nodes(self, *nodes):
@@
-81,6
+85,7
@@
class NodePath(object):
def clear_path(self):
"""Clear path."""
self._nodes = []
def clear_path(self):
"""Clear path."""
self._nodes = []
+ self._nodes_filter = []
self._links = []
self._path = []
self._path_iter = []
self._links = []
self._path = []
self._path_iter = []
@@
-88,33
+93,42
@@
class NodePath(object):
def compute_path(self, always_same_link=True):
"""Compute path for added nodes.
def compute_path(self, always_same_link=True):
"""Compute path for added nodes.
+ .. note:: First add at least two nodes to the topology.
+
:param always_same_link: If True use always same link between two nodes
:param always_same_link: If True use always same link between two nodes
- in path. If False use different link (if available)
between two
- nodes if one link was used before.
+ in path. If False use different link (if available)
+
between two
nodes if one link was used before.
:type always_same_link: bool
:type always_same_link: bool
-
- .. note:: First add at least two nodes to the topology.
+ :raises RuntimeError: If not enough nodes for path.
"""
nodes = self._nodes
if len(nodes) < 2:
"""
nodes = self._nodes
if len(nodes) < 2:
- raise RuntimeError(
'Not enough nodes to compute path'
)
+ raise RuntimeError(
u"Not enough nodes to compute path"
)
for idx in range(0, len(nodes) - 1):
topo = Topology()
node1 = nodes[idx]
node2 = nodes[idx + 1]
for idx in range(0, len(nodes) - 1):
topo = Topology()
node1 = nodes[idx]
node2 = nodes[idx + 1]
- links = topo.get_active_connecting_links(node1, node2)
+ n1_list = self._nodes_filter[idx]
+ n2_list = self._nodes_filter[idx + 1]
+ links = topo.get_active_connecting_links(
+ node1, node2, filter_list_node1=n1_list,
+ filter_list_node2=n2_list
+ )
if not links:
if not links:
- raise RuntimeError('No link between {0} and {1}'.format(
- node1['host'], node2['host']))
-
- link = None
- l_set = set()
+ raise RuntimeError(
+ f"No link between {node1[u'host']} and {node2[u'host']}"
+ )
if always_same_link:
l_set = set(links).intersection(self._links)
else:
l_set = set(links).difference(self._links)
if always_same_link:
l_set = set(links).intersection(self._links)
else:
l_set = set(links).difference(self._links)
+ if not l_set:
+ raise RuntimeError(
+ f"No free link between {node1[u'host']} and "
+ f"{node2[u'host']}, all links already used"
+ )
if not l_set:
link = links.pop()
if not l_set:
link = links.pop()
@@
-133,60
+147,59
@@
class NodePath(object):
def next_interface(self):
"""Path interface iterator.
def next_interface(self):
"""Path interface iterator.
- :return: Interface and node or None if not next interface.
+ :return
s
: Interface and node or None if not next interface.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
"""
if not self._path_iter:
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
"""
if not self._path_iter:
- return (None, None)
- else:
- return self._path_iter.pop()
+ return None, None
+ return self._path_iter.pop()
def first_interface(self):
"""Return first interface on the path.
def first_interface(self):
"""Return first interface on the path.
- :return: Interface and node.
+ :return
s
: Interface and node.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
"""
if not self._path:
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
"""
if not self._path:
- raise RuntimeError(
'No path for topology'
)
+ raise RuntimeError(
u"No path for topology"
)
return self._path[0]
def last_interface(self):
"""Return last interface on the path.
return self._path[0]
def last_interface(self):
"""Return last interface on the path.
- :return: Interface and node.
+ :return
s
: Interface and node.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
"""
if not self._path:
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
"""
if not self._path:
- raise RuntimeError(
'No path for topology'
)
+ raise RuntimeError(
u"No path for topology"
)
return self._path[-1]
def first_ingress_interface(self):
"""Return first ingress interface on the path.
return self._path[-1]
def first_ingress_interface(self):
"""Return first ingress interface on the path.
- :return: Interface and node.
+ :return
s
: Interface and node.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
"""
if not self._path:
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
"""
if not self._path:
- raise RuntimeError(
'No path for topology'
)
+ raise RuntimeError(
u"No path for topology"
)
return self._path[1]
def last_egress_interface(self):
"""Return last egress interface on the path.
return self._path[1]
def last_egress_interface(self):
"""Return last egress interface on the path.
- :return: Interface and node.
+ :return
s
: Interface and node.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
"""
if not self._path:
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
"""
if not self._path:
- raise RuntimeError(
'No path for topology'
)
+ raise RuntimeError(
u"No path for topology"
)
return self._path[-2]
return self._path[-2]