# limitations under the License.
from robot.api import logger
-from topology import NodeType
-from ssh import SSH
-from constants import Constants
+from resources.libraries.python.topology import NodeType
+from resources.libraries.python.ssh import SSH
+from resources.libraries.python.constants import Constants
-class DUTSetup(object):
- def start_vpp_service_on_all_duts(self, nodes):
+class DUTSetup(object):
+ @staticmethod
+ def start_vpp_service_on_all_duts(nodes):
"""Start up the VPP service on all nodes."""
ssh = SSH()
for node in nodes.values():
if node['type'] == NodeType.DUT:
ssh.connect(node)
(ret_code, stdout, stderr) = \
- ssh.exec_command_sudo('service vpp restart')
+ ssh.exec_command_sudo('service vpp restart')
if 0 != int(ret_code):
logger.debug('stdout: {0}'.format(stdout))
logger.debug('stderr: {0}'.format(stderr))
raise Exception('DUT {0} failed to start VPP service'.
- format(node['host']))
+ format(node['host']))
- def setup_all_duts(self, nodes):
+ @staticmethod
+ def setup_all_duts(nodes):
"""Prepare all DUTs in given topology for test execution."""
for node in nodes.values():
if node['type'] == NodeType.DUT:
- self.setup_dut(node)
+ DUTSetup.setup_dut(node)
- def setup_dut(self, node):
+ @staticmethod
+ def setup_dut(node):
ssh = SSH()
ssh.connect(node)
logger.trace(stderr)
if 0 != int(ret_code):
logger.debug('DUT {0} setup script failed: "{1}"'.
- format(node['host'], stdout + stderr))
+ format(node['host'], stdout + stderr))
raise Exception('DUT test setup script failed at node {}'.
- format(node['host']))
+ format(node['host']))
else:
raise ValueError("Unknown search result type")
-
def linear_search(self, start_rate, traffic_type):
"""Linear search of rate with loss below acceptance criteria.
:param start_rate: Initial rate.
:param traffic_type: Traffic profile.
:type start_rate: float
- :param traffic_type: str
+ :type traffic_type: str
:return: nothing
"""
res = self._get_res_based_on_search_type(res)
if self._search_linear_direction == SearchDirection.BOTTOM_UP:
- # loss occured and it was above acceptance criteria
+ # loss occurred and it was above acceptance criteria
if not res:
# if this is first run then we didn't find drop rate
if prev_rate is None:
raise RuntimeError("Unknown search result")
elif self._search_linear_direction == SearchDirection.TOP_DOWN:
- # loss occured, decrease rate
+ # loss occurred, decrease rate
if not res:
prev_rate = rate
rate -= self._rate_linear_step
res = self._get_res_based_on_search_type(res)
- # loss occured and it was above acceptance criteria
+ # loss occurred and it was above acceptance criteria
if not res:
self.binary_search(b_min, rate, traffic_type)
# there was no loss / loss below acceptance criteria
:type ip_addr: str
:type port: str or int
:type path: str
- :return: full url
+ :return: Full url.
:rtype: str
"""
return "http://{ip}:{port}{path}".format(ip=ip_addr, port=port,
:type path: str
:type enable_logging: bool
:type kwargs: dict
- :return: Status code and content of response
+ :return: Status code and content of response.
:rtype: tuple
:raises HTTPRequestError: If
1. it is not possible to connect,
with keywords like "Wait until keyword succeeds". So you can disable
logging by setting enable_logging to False.
- :param msg: Message to be displayed and logged
+ :param msg: Message to be displayed and logged.
:param enable_logging: When True, logging is enabled, otherwise
logging is disabled.
:type msg: str
:param response: JSON formatted string.
:param path: Path to navigate down the data structure.
- :type response: string
+ :type response: str
:type path: tuple
:return: JSON dictionary/list tree.
:rtype: list
"""Common IP utilities library."""
-from ssh import SSH
-from constants import Constants
+from resources.libraries.python.ssh import SSH
+from resources.libraries.python.constants import Constants
class IPUtil(object):
"""Common IP utilities"""
- def __init__(self):
- pass
-
@staticmethod
def vpp_ip_probe(node, interface, addr):
"""Run ip probe on VPP node.
- Args:
- node (Dict): VPP node.
- interface (str): Interface name
- addr (str): IPv4/IPv6 address
+ :param node: VPP node.
+ :param interface: Interface name.
+ :param addr: IPv4/IPv6 address.
+ :type node: dict
+ :type interface: str
+ :type addr: str
"""
ssh = SSH()
ssh.connect(node)
"""Robot framework variable file.
- Create dictionary variable nodes_ipv4_addr of IPv4 addresses from
- available networks.
+Create dictionary variable nodes_ipv4_addr of IPv4 addresses from
+available networks.
"""
+
from ipaddress import IPv4Network
from resources.libraries.python.topology import Topology
"""IPv4 network generator."""
def __init__(self, networks):
"""
- :param networks: list of strings containing IPv4 subnet
- with prefix length
+ :param networks: List of strings containing IPv4 subnet
+ with prefix length.
+ :type networks: list
"""
self._networks = list()
for network in networks:
def next_network(self):
"""
- :return: next network in form (IPv4Network, subnet)
+ :return: Next network in form (IPv4Network, subnet).
:raises: StopIteration if there are no more elements.
"""
if len(self._networks):
def get_variables(nodes, networks=IPV4_NETWORKS[:]):
"""Special robot framework method that returns dictionary nodes_ipv4_addr,
- mapping of node and interface name to IPv4 adddress.
+ mapping of node and interface name to IPv4 address.
- :param nodes: Nodes of the test topology.
- :param networks: list of available IPv4 networks
- :type nodes: dict
- :type networks: list
+ :param nodes: Nodes of the test topology.
+ :param networks: List of available IPv4 networks.
+ :type nodes: dict
+ :type networks: list
- .. note::
- Robot framework calls it automatically.
+ .. note::
+ Robot framework calls it automatically.
"""
topo = Topology()
links = topo.get_links(nodes)
from socket import inet_ntoa
from struct import pack
from abc import ABCMeta, abstractmethod
+
from robot.api.deco import keyword
-import resources.libraries.python.ssh as ssh
+from resources.libraries.python.ssh import exec_cmd_no_error
from resources.libraries.python.Routing import Routing
-from resources.libraries.python.InterfaceUtil import InterfaceUtil
from resources.libraries.python.topology import NodeType, Topology
from resources.libraries.python.VatExecutor import VatExecutor
@abstractmethod
def set_ip(self, interface, address, prefix_length):
- """Configure IPv4 address on interface
+ """Configure IPv4 address on interface.
- :param interface: interface name
- :param address:
- :param prefix_length:
+ :param interface: Interface name.
+ :param address: IPv4 address.
+ :param prefix_length: IPv4 prefix length.
:type interface: str
:type address: str
:type prefix_length: int
@abstractmethod
def set_route(self, network, prefix_length, gateway, interface):
- """Configure IPv4 route
+ """Configure IPv4 route.
- :param network: network IPv4 address
- :param prefix_length: mask length
- :param gateway: IPv4 address of the gateway
- :param interface: interface name
+ :param network: Network IPv4 address.
+ :param prefix_length: IPv4 prefix length.
+ :param gateway: IPv4 address of the gateway.
+ :param interface: Interface name.
:type network: str
:type prefix_length: int
:type gateway: str
@abstractmethod
def unset_route(self, network, prefix_length, gateway, interface):
- """Remove specified IPv4 route
+ """Remove specified IPv4 route.
- :param network: network IPv4 address
- :param prefix_length: mask length
- :param gateway: IPv4 address of the gateway
- :param interface: interface name
+ :param network: Network IPv4 address.
+ :param prefix_length: IPv4 prefix length.
+ :param gateway: IPv4 address of the gateway.
+ :param interface: Interface name.
:type network: str
:type prefix_length: int
:type gateway: str
@abstractmethod
def flush_ip_addresses(self, interface):
- """Flush all IPv4 addresses from specified interface
+ """Flush all IPv4 addresses from specified interface.
- :param interface: interface name
+ :param interface: Interface name.
:type interface: str
:return: nothing
"""
@abstractmethod
def ping(self, destination_address, source_interface):
- """Send an ICMP request to destination node
+ """Send an ICMP request to destination node.
- :param destination_address: address to send the ICMP request
- :param source_interface:
+ :param destination_address: Address to send the ICMP request.
+ :param source_interface: Source interface name.
:type destination_address: str
:type source_interface: str
:return: nothing
super(Tg, self).__init__(node_info)
def _execute(self, cmd):
- return ssh.exec_cmd_no_error(self.node_info, cmd)
+ return exec_cmd_no_error(self.node_info, cmd)
def _sudo_execute(self, cmd):
- return ssh.exec_cmd_no_error(self.node_info, cmd, sudo=True)
+ return exec_cmd_no_error(self.node_info, cmd, sudo=True)
def set_ip(self, interface, address, prefix_length):
cmd = 'ip -4 addr flush dev {}'.format(interface)
super(Dut, self).__init__(node_info)
def get_sw_if_index(self, interface):
- """Get sw_if_index of specified interface from current node
+ """Get sw_if_index of specified interface from current node.
- :param interface: interface name
+ :param interface: Interface name.
:type interface: str
- :return: sw_if_index of 'int' type
+ :return: sw_if_index of the interface or None.
+ :rtype: int
"""
return Topology().get_interface_sw_index(self.node_info, interface)
def exec_vat(self, script, **args):
"""Wrapper for VAT executor.
- :param script: script to execute
- :param args: parameters to the script
+ :param script: Script to execute.
+ :param args: Parameters to the script.
:type script: str
:type args: dict
:return: nothing
def set_route(self, network, prefix_length, gateway, interface):
Routing.vpp_route_add(self.node_info,
- network=network, prefix_len=prefix_length,
- gateway=gateway, interface=interface)
+ network=network, prefix_len=prefix_length,
+ gateway=gateway, interface=interface)
def unset_route(self, network, prefix_length, gateway, interface):
self.exec_vat('del_route.vat', network=network,
def get_node(node_info):
"""Creates a class instance derived from Node based on type.
- :param node_info: dictionary containing information on nodes in topology
- :return: Class instance that is derived from Node
+ :param node_info: Dictionary containing information on nodes in topology.
+ :type node_info: dict
+ :return: Class instance that is derived from Node.
"""
if node_info['type'] == NodeType.TG:
return Tg(node_info)
def vpp_nodes_set_ipv4_addresses(nodes, nodes_addr):
"""Set IPv4 addresses on all VPP nodes in topology.
- :param nodes: Nodes of the test topology.
- :param nodes_addr: Available nodes IPv4 adresses.
- :type nodes: dict
- :type nodes_addr: dict
- :return: affected interfaces as list of (node, interface) tuples
- :rtype: list
+ :param nodes: Nodes of the test topology.
+ :param nodes_addr: Available nodes IPv4 addresses.
+ :type nodes: dict
+ :type nodes_addr: dict
+ :return: Affected interfaces as list of (node, interface) tuples.
+ :rtype: list
"""
-
interfaces = []
for net in nodes_addr.values():
for port in net['ports'].values():
'from "${nodes_addr}"')
def get_ip_addr(node, interface, nodes_addr):
"""Return IPv4 address of the node port.
+
:param node: Node in the topology.
:param interface: Interface name of the node.
- :param nodes_addr: Nodes IPv4 adresses.
+ :param nodes_addr: Nodes IPv4 addresses.
:type node: dict
:type interface: str
:type nodes_addr: dict
- :return: IPv4 address string
+ :return: IPv4 address.
+ :rtype: str
"""
for net in nodes_addr.values():
for port in net['ports'].values():
:param nodes_info: Dictionary containing information on all nodes
in topology.
- :param nodes_addr: Nodes IPv4 adresses.
+ :param nodes_addr: Nodes IPv4 addresses.
:type nodes_info: dict
:type nodes_addr: dict
"""
if node['type'] == NodeType.TG:
continue
for interface, interface_data in node['interfaces'].iteritems():
- if interface == 'mgmt':
- continue
interface_name = interface_data['name']
adj_node, adj_int = Topology.\
get_adjacent_node_and_interface(nodes_info, node,
ip_address = IPv4Setup.get_ip_addr(adj_node, adj_int['name'],
nodes_addr)
mac_address = adj_int['mac_address']
- get_node(node).set_arp(interface_name, ip_address, mac_address)
\ No newline at end of file
+ get_node(node).set_arp(interface_name, ip_address, mac_address)
from robot.api.deco import keyword
from resources.libraries.python.topology import Topology
-from resources.libraries.python.topology import NodeType
-from resources.libraries.python.TrafficScriptExecutor\
- import TrafficScriptExecutor
from resources.libraries.python.IPv4Setup import get_node
get_node(node).arp_ping(ip_address, interface)
@staticmethod
- def set_interface_address(node, interface, address, length):
+ def set_interface_address(node, interface, address, prefix_length):
"""See IPv4Node.set_ip for more information.
:param node: Node where IP address should be set to.
- :param interface: Interface name
- :param address: IP address
- :param length: prefix length
+ :param interface: Interface name.
+ :param address: IP address.
+ :param prefix_length: Prefix length.
:type node: dict
:type interface: str
:type address: str
- :type length: int
+ :type prefix_length: int
"""
log.debug('Node {} interface {} has IPv4 address {} with prefix '
'length {}'.format(Topology.get_node_hostname(node),
- interface, address, length))
- get_node(node).set_ip(interface, address, int(length))
+ interface, address, prefix_length))
+ get_node(node).set_ip(interface, address, int(prefix_length))
@staticmethod
@keyword('Node "${node}" routes to IPv4 network "${network}" with prefix '
def set_route(node, network, prefix_length, interface, gateway):
"""See IPv4Node.set_route for more information.
- :param node:
- :param network:
- :param prefix_length:
- :param interface:
- :param gateway:
- :return:
+ :param node: Node where IP address should be set to.
+ :param network: IP network.
+ :param prefix_length: Prefix length.
+ :param interface: Interface name.
+ :param gateway: Gateway.
+ :type node: dict
+ :type network: str
+ :type prefix_length: int
+ :type interface: str
+ :type gateway: str
"""
log.debug('Node {} routes to network {} with prefix length {} '
'via {} interface {}'.format(Topology.get_node_hostname(node),
:param node: Node dictionary.
:param port: Interface name.
- :return: IPv4 prefix length
+ :param nodes_addr: Available nodes IPv4 addresses.
+ :type node: dict
+ :type port: str
+ :type nodes_addr: dict
+ :return: IPv4 prefix length.
+ :rtype: int
"""
for net in nodes_addr.values():
for p in net['ports'].values():
:param node: Node dictionary.
:param port: Interface name.
- :return: IPv4 subnet of 'str' type
+ :param nodes_addr: Available nodes IPv4 addresses.
+ :type node: dict
+ :type port: int
+ :type nodes_addr: dict
+ :return: IPv4 subnet.
+ :rtype: str
"""
for net in nodes_addr.values():
for p in net['ports'].values():
"""Get link IPv4 address.
:param link: Link name.
- :param nodes_addr: Available nodes IPv4 adresses.
+ :param nodes_addr: Available nodes IPv4 addresses.
:type link: str
:type nodes_addr: dict
:return: Link IPv4 address.
"""Get link IPv4 address prefix.
:param link: Link name.
- :param nodes_addr: Available nodes IPv4 adresses.
+ :param nodes_addr: Available nodes IPv4 addresses.
:type link: str
:type nodes_addr: dict
:return: Link IPv4 address prefix.
"""Robot framework variable file.
- Create dictionary variable nodes_ipv6_addr with IPv6 adresses from available
- networks.
+Create dictionary variable nodes_ipv6_addr with IPv6 addresses from available
+networks.
"""
-from IPv6Setup import IPv6Networks
-from topology import Topology
+from resources.libraries.python.IPv6Setup import IPv6Networks
+from resources.libraries.python.topology import Topology
# Default list of available IPv6 networks
IPV6_NETWORKS = ['3ffe:{0:04x}::/64'.format(i) for i in range(1, 100)]
def get_variables(nodes, networks=IPV6_NETWORKS):
"""Special robot framework method that returns dictionary nodes_ipv6_addr,
- mapping of node and interface name to IPv6 adddress.
+ mapping of node and interface name to IPv6 address.
- :param nodes: Nodes of the test topology.
- :param networks: list of available IPv6 networks
- :type nodes: dict
- :type networks: list
+ :param nodes: Nodes of the test topology.
+ :param networks: List of available IPv6 networks.
+ :type nodes: dict
+ :type networks: list
- .. note::
- Robot framework calls it automatically.
+ .. note::
+ Robot framework calls it automatically.
"""
topo = Topology()
links = topo.get_links(nodes)
"""Library to set up IPv6 in topology."""
-from ssh import SSH
-from ipaddress import IPv6Network
-from topology import NodeType, Topology
-from constants import Constants
-from VatExecutor import VatTerminal, VatExecutor
from robot.api import logger
+from ipaddress import IPv6Network
+
+from resources.libraries.python.ssh import SSH
+from resources.libraries.python.topology import NodeType, Topology
+from resources.libraries.python.constants import Constants
+from resources.libraries.python.VatExecutor import VatTerminal, VatExecutor
class IPv6Networks(object):
"""IPv6 network iterator.
- :param networks: List of the available IPv6 networks.
- :type networks: list
+ :param networks: List of the available IPv6 networks.
+ :type networks: list
"""
def __init__(self, networks):
self._networks = list()
raise Exception('No IPv6 networks')
def next_network(self):
- """Get the next elemnt of the iterator.
+ """Get the next element of the iterator.
- :return: IPv6 network.
- :rtype: IPv6Network object
- :raises: StopIteration if there is no more elements.
+ :return: IPv6 network.
+ :rtype: IPv6Network object
+ :raises: StopIteration if there is no more elements.
"""
if len(self._networks):
return self._networks.pop()
def nodes_set_ipv6_addresses(self, nodes, nodes_addr):
"""Set IPv6 addresses on all VPP nodes in topology.
- :param nodes: Nodes of the test topology.
- :param nodes_addr: Available nodes IPv6 adresses.
- :type nodes: dict
- :type nodes_addr: dict
- :return: affected interfaces as list of (node, interface) tuples
- :rtype: list
+ :param nodes: Nodes of the test topology.
+ :param nodes_addr: Available nodes IPv6 addresses.
+ :type nodes: dict
+ :type nodes_addr: dict
+ :return: Affected interfaces as list of (node, interface) tuples.
+ :rtype: list
"""
-
interfaces = []
for net in nodes_addr.values():
net['prefix'])
interfaces.append((node, port['if']))
-
return interfaces
+
def nodes_clear_ipv6_addresses(self, nodes, nodes_addr):
"""Remove IPv6 addresses from all VPP nodes in topology.
- :param nodes: Nodes of the test topology.
- :param nodes_addr: Available nodes IPv6 adresses.
- :type nodes: dict
- :type nodes_addr: dict
- """
+ :param nodes: Nodes of the test topology.
+ :param nodes_addr: Available nodes IPv6 addresses.
+ :type nodes: dict
+ :type nodes_addr: dict
+ """
for net in nodes_addr.values():
for port in net['ports'].values():
host = port.get('node')
def linux_set_if_ipv6_addr(node, interface, addr, prefix):
"""Set IPv6 address on linux host.
- :param node: Linux node.
- :param interface: Node interface.
- :param addr: IPv6 address.
- :param prefix: IPv6 address prefix.
- :type node: dict
- :type interface: str
- :type addr: str
- :type prefix: str
+ :param node: Linux node.
+ :param interface: Node interface.
+ :param addr: IPv6 address.
+ :param prefix: IPv6 address prefix.
+ :type node: dict
+ :type interface: str
+ :type addr: str
+ :type prefix: str
"""
ssh = SSH()
ssh.connect(node)
def linux_del_if_ipv6_addr(node, interface, addr, prefix):
"""Delete IPv6 address on linux host.
- :param node: Linux node.
- :param interface: Node interface.
- :param addr: IPv6 address.
- :param prefix: IPv6 address prefix.
- :type node: dict
- :type interface: str
- :type addr: str
- :type prefix: str
+ :param node: Linux node.
+ :param interface: Node interface.
+ :param addr: IPv6 address.
+ :param prefix: IPv6 address prefix.
+ :type node: dict
+ :type interface: str
+ :type addr: str
+ :type prefix: str
"""
ssh = SSH()
ssh.connect(node)
def vpp_set_if_ipv6_addr(node, interface, addr, prefix):
"""Set IPv6 address on VPP.
- :param node: VPP node.
- :param interface: Node interface.
- :param addr: IPv6 address.
- :param prefix: IPv6 address prefix.
- :type node: dict
- :type interface: str
- :type addr: str
- :type prefix: str
+ :param node: VPP node.
+ :param interface: Node interface.
+ :param addr: IPv6 address.
+ :param prefix: IPv6 address prefix.
+ :type node: dict
+ :type interface: str
+ :type addr: str
+ :type prefix: str
"""
sw_if_index = Topology.get_interface_sw_index(node, interface)
with VatTerminal(node) as vat:
def vpp_del_if_ipv6_addr(node, interface, addr, prefix):
"""Delete IPv6 address on VPP.
- :param node: VPP node.
- :param interface: Node interface.
- :param addr: IPv6 address.
- :param prefix: IPv6 address prefix.
- :type node: dict
- :type interface: str
- :type addr: str
- :type prefix: str
+ :param node: VPP node.
+ :param interface: Node interface.
+ :param addr: IPv6 address.
+ :param prefix: IPv6 address prefix.
+ :type node: dict
+ :type interface: str
+ :type addr: str
+ :type prefix: str
"""
sw_if_index = Topology.get_interface_sw_index(node, interface)
with VatTerminal(node) as vat:
state='admin-down')
@staticmethod
- def vpp_ra_supress_link_layer(node, interface):
- """Supress ICMPv6 router advertisement message for link scope address
+ def vpp_ra_suppress_link_layer(node, interface):
+ """Suppress ICMPv6 router advertisement message for link scope address.
- :param node: VPP node.
- :param interface: Interface name.
- :type node: dict
- :type interface: str
+ :param node: VPP node.
+ :param interface: Interface name.
+ :type node: dict
+ :type interface: str
"""
sw_if_index = Topology.get_interface_sw_index(node, interface)
VatExecutor.cmd_from_template(node,
sw_if_id=sw_if_index,
param='surpress')
- def vpp_all_ra_supress_link_layer(self, nodes):
- """Supress ICMPv6 router advertisement message for link scope address
- on all VPP nodes in the topology
+ def vpp_all_ra_suppress_link_layer(self, nodes):
+ """Suppress ICMPv6 router advertisement message for link scope address
+ on all VPP nodes in the topology.
- :param nodes: Nodes of the test topology.
- :type nodes: dict
+ :param nodes: Nodes of the test topology.
+ :type nodes: dict
"""
for node in nodes.values():
if node['type'] == NodeType.TG:
continue
for port_k, port_v in node['interfaces'].items():
- if port_k == 'mgmt':
- continue
if_name = port_v.get('name')
if if_name is None:
continue
- self.vpp_ra_supress_link_layer(node, if_name)
+ self.vpp_ra_suppress_link_layer(node, if_name)
@staticmethod
def get_link_address(link, nodes_addr):
"""Get link IPv6 address.
:param link: Link name.
- :param nodes_addr: Available nodes IPv6 adresses.
+ :param nodes_addr: Available nodes IPv6 addresses.
:type link: str
:type nodes_addr: dict
:return: Link IPv6 address.
"""Get link IPv6 address prefix.
:param link: Link name.
- :param nodes_addr: Available nodes IPv6 adresses.
+ :param nodes_addr: Available nodes IPv6 addresses.
:type link: str
:type nodes_addr: dict
:return: Link IPv6 address prefix.
"""IPv6 utilities library."""
import re
-from ssh import SSH
+
+from resources.libraries.python.ssh import SSH
class IPv6Util(object):
"""IPv6 utilities"""
- def __init__(self):
- pass
-
@staticmethod
def ipv6_ping(src_node, dst_addr, count=3, data_size=56, timeout=1):
"""IPv6 ping.
- Args:
- src_node (Dict): Node where ping run.
- dst_addr (str): Destination IPv6 address.
- count (Optional[int]): Number of echo requests.
- data_size (Optional[int]): Number of the data bytes.
- timeout (Optional[int]): Time to wait for a response, in seconds.
-
- Returns:
- Number of lost packets.
+ :param src_node: Node where ping run.
+ :param dst_addr: Destination IPv6 address.
+ :param count: Number of echo requests. (Optional)
+ :param data_size: Number of the data bytes. (Optional)
+ :param timeout: Time to wait for a response, in seconds. (Optional)
+ :type src_node: dict
+ :type dst_addr: str
+ :type count: int
+ :type data_size: int
+ :type timeout: int
+ :return: Number of lost packets.
+ :rtype: int
"""
ssh = SSH()
ssh.connect(src_node)
size=56, timeout=1):
"""Send IPv6 ping to the node port.
- Args:
- nodes_ip (Dict): Nodes IPv6 adresses.
- src_node (Dict): Node where ping run.
- dst_node (Dict): Destination node.
- port (str): Port on the destination node.
- cnt (Optional[int]): Number of echo requests.
- size (Optional[int]): Number of the data bytes.
- timeout (Optional[int]): Time to wait for a response, in seconds.
-
- Returns:
- Number of lost packets.
+ :param nodes_ip: Nodes IPv6 addresses.
+ :param src_node: Node where ping run.
+ :param dst_node: Destination node.
+ :param port: Port on the destination node.
+ :param cnt: Number of echo requests. (Optional)
+ :param size: Number of the data bytes. (Optional)
+ :param timeout: Time to wait for a response, in seconds. (Optional)
+ :type nodes_ip: dict
+ :type src_node: dict
+ :type dst_node: dict
+ :type port: str
+ :type cnt: int
+ :type size: int
+ :type timeout: int
+ :return: Number of lost packets.
+ :rtype: int
"""
dst_ip = IPv6Util.get_node_port_ipv6_address(dst_node, port, nodes_ip)
return IPv6Util.ipv6_ping(src_node, dst_ip, cnt, size, timeout)
def get_node_port_ipv6_address(node, interface, nodes_addr):
"""Return IPv6 address of the node port.
- Args:
- node (Dict): Node in the topology.
- interface (str): Interface name of the node.
- nodes_addr (Dict): Nodes IPv6 adresses.
-
- Returns:
- IPv6 address string.
+ :param node: Node in the topology.
+ :param interface: Interface name of the node.
+ :param nodes_addr: Nodes IPv6 addresses.
+ :type node: dict
+ :type interface: str
+ :type nodes_addr: dict
+ :return: IPv6 address string.
+ :rtype: str
"""
for net in nodes_addr.values():
for port in net['ports'].values():
Function can be used for DUTs as well as for TGs.
- :param node: node where the interface is
- :param interface: interface name or sw_if_index
- :param state: one of 'up' or 'down'
+ :param node: Node where the interface is.
+ :param interface: Interface name or sw_if_index.
+ :param state: One of 'up' or 'down'.
:type node: dict
:type interface: str or int
:type state: str
Function can be used only for TGs.
- :param node: node where the interface is
- :param interface: interface name
- :param mtu: MTU to set
+ :param node: Node where the interface is.
+ :param interface: Interface name.
+ :param mtu: MTU to set.
:type node: dict
:type interface: str
:type mtu: int
Function can be used only for TGs.
- :param node: node where to set default MTU
+ :param node: Node where to set default MTU.
:type node: dict
:return: nothing
"""
"""Wait until all interfaces with admin-up are in link-up state.
:param node: Node to wait on.
- :param timeout: Waiting timeout in seconds (optional, default 10s)
+ :param timeout: Waiting timeout in seconds (optional, default 10s).
:type node: dict
:type timeout: int
:raises: RuntimeError if the timeout period value has elapsed.
"""Get all interface data from a VPP node. If a name or
sw_interface_index is provided, return only data for the matching
interface.
+
:param node: VPP node to get interface data from.
:param interface: Numeric index or name string of a specific interface.
:type node: dict
@staticmethod
def update_vpp_interface_data_on_node(node):
- """Update vpp generated interface data for a given node in DICT__nodes
+ """Update vpp generated interface data for a given node in DICT__nodes.
Updates interface names, software if index numbers and any other details
generated specifically by vpp that are unknown before testcase run.
devices using vpp_api_test, and pairing known information from topology
(mac address/pci address of interface) to state from VPP.
- :param node: Node selected from DICT__nodes
+ :param node: Node selected from DICT__nodes.
:type node: dict
"""
vat_executor = VatExecutor()
"""L2 Utilities Library."""
from robot.api.deco import keyword
+
from resources.libraries.python.topology import Topology
from resources.libraries.python.VatExecutor import VatExecutor, VatTerminal
from resources.libraries.python.ssh import exec_cmd_no_error
forward {forward} learn {learn} arp-term {arp_term}" VAT command on
the node.
- :param node: node where we wish to crate the l2 bridge domain
- :param bd_id: bridge domain index number
- :param flood: enable flooding
- :param uu_flood: enable uu_flood
- :param forward: enable forwarding
- :param learn: enable mac address learning to fib
- :param arp_term: enable arp_termination
+ :param node: Node where we wish to crate the l2 bridge domain.
+ :param bd_id: Bridge domain index number.
+ :param flood: Enable flooding.
+ :param uu_flood: Enable uu_flood.
+ :param forward: Enable forwarding.
+ :param learn: Enable mac address learning to fib.
+ :param arp_term: Enable arp_termination.
:type node: dict
:type bd_id: int
:type flood: bool
Get SW IF ID and add it to the bridge domain.
- :param node: node where we want to execute the command that does this
- :param interface: interface name
- :param bd_id: bridge domain index number to add Interface name to
- :param shg: split horizon group
+ :param node: Node where we want to execute the command that does this.
+ :param interface: Interface name.
+ :param bd_id: Bridge domain index number to add Interface name to.
+ :param shg: Split horizon group.
:type node: dict
:type interface: str
:type bd_id: int
Execute the "sw_interface_set_l2_bridge sw_if_index {sw_if_index}
bd_id {bd_id} shg {shg} enable" VAT command on the given node.
- :param node: node where we want to execute the command that does this
- :param sw_if_index: interface index
- :param bd_id: bridge domain index number to add SW IF ID to
- :param shg: split horizon group
+ :param node: Node where we want to execute the command that does this.
+ :param sw_if_index: Interface index.
+ :param bd_id: Bridge domain index number to add SW IF ID to.
+ :param shg: Split horizon group.
:type node: dict
:type sw_if_index: int
:type bd_id: int
def create_bridge_domain_vat_dict(node, link_names, bd_id):
"""Create dictionary that can be used in l2 bridge domain template.
- :param node: node data dictionary
- :param link_names: list of names of links the bridge domain should be
- connecting
- :param bd_id: bridge domain index number
- :type node: dict
- :type link_names: list
- :return: dictionary used to generate l2 bridge domain VAT configuration
- from template file
The resulting dictionary looks like this:
'interface1': interface name of first interface
'interface2': interface name of second interface
'bd_id': bridge domain index
+
+ :param node: Node data dictionary.
+ :param link_names: List of names of links the bridge domain should be
+ connecting.
+ :param bd_id: Bridge domain index number.
+ :type node: dict
+ :type link_names: list
+ :return: Dictionary used to generate l2 bridge domain VAT configuration
+ from template file.
+ :rtype: dict
"""
bd_dict = Topology().get_interfaces_by_link_names(node, link_names)
bd_dict['bd_id'] = bd_id
def vpp_setup_bidirectional_cross_connect(node, interface1, interface2):
"""Create bidirectional cross-connect between 2 interfaces on vpp node.
- :param node: Node to add bidirectional cross-connect
- :param interface1: first interface name or sw_if_index
- :param interface2: second interface name or sw_if_index
+ :param node: Node to add bidirectional cross-connect.
+ :param interface1: First interface name or sw_if_index.
+ :param interface2: Second interface name or sw_if_index.
:type node: dict
:type interface1: str or int
:type interface2: str or int
"""Path utilities library for nodes in the topology."""
-from topology import Topology
+from resources.libraries.python.topology import Topology
class NodePath(object):
"""Compute path for added nodes.
:param always_same_link: If True use always same link between two nodes
- in path. If False use different link (if available) between two
- nodes if one link was used before.
+ in path. If False use different link (if available) between two
+ nodes if one link was used before.
:type always_same_link: bool
.. note:: First add at least two nodes to the topology.
raise RuntimeError('No link between {0} and {1}'.format(
node1['host'], node2['host']))
- link = None
- l_set = set()
-
if always_same_link:
l_set = set(links).intersection(self._links)
else:
l_set = set(links).difference(self._links)
if not l_set:
raise RuntimeError(
- 'No free link between {0} and {1}, all links already ' \
+ 'No free link between {0} and {1}, all links already '
'used'.format(node1['host'], node2['host']))
if not l_set:
.. note:: Call compute_path before.
"""
if not self._path_iter:
- return (None, None)
+ return None, None
else:
return self._path_iter.pop()
"""
+import os
import socket
import select
-import os
-import time
-from multiprocessing import Queue, Process
+
from scapy.all import ETH_P_IP, ETH_P_IPV6, ETH_P_ALL, ETH_P_ARP
from scapy.all import Ether, ARP, Packet
from scapy.layers.inet6 import IPv6
# TODO: http://stackoverflow.com/questions/320232/ensuring-subprocesses-are-dead-on-exiting-python-program
+
class PacketVerifier(object):
"""Base class for TX and RX queue objects for packet verifier."""
def __init__(self, interface_name):
Takes string as input and looks for first whole packet in it.
If it finds one, it returns substring from the buf parameter.
- :param buf: string representation of incoming packet buffer.
- :type buf: string
+ :param buf: String representation of incoming packet buffer.
+ :type buf: str
:return: String representation of first packet in buf.
- :rtype: string
+ :rtype: str
"""
pkt_len = 0
This function is meant to be run in separate subprocess and is in tight
loop reading raw packets from interface passed as parameter.
- :param interace_name: Name of interface to read packets from.
+ :param interface_name: Name of interface to read packets from.
:param queue: Queue in which this function will push incoming packets.
- :type interface_name: string
+ :type interface_name: str
:type queue: multiprocessing.Queue
:return: None
"""
function to access them.
:param interface_name: Which interface to bind to.
- :type interface_name: string
+ :type interface_name: str
"""
-
def __init__(self, interface_name):
PacketVerifier.__init__(self, interface_name)
- #self._queue = Queue()
- #self._proc = Process(target=packet_reader, args=(interface_name,
- # self._queue))
- #self._proc.daemon = True
- #self._proc.start()
- #time.sleep(2)
-
def recv(self, timeout=3, ignore=None):
"""Read next received packet.
arrives in given timeout queue.Empty exception will be risen.
:param timeout: How many seconds to wait for next packet.
+ :param ignore: Packet list that should be ignored.
:type timeout: int
+ :type ignore: list
:return: Ether() initialized object from packet data.
:rtype: scapy.Ether
"""
-
- #pkt = self._queue.get(True, timeout=timeout)
(rlist, _, _) = select.select([self._sock], [], [], timeout)
if self._sock not in rlist:
return None
This object is used to send packets over RAW socket on a interface.
:param interface_name: Which interface to send packets from.
- :type interface_name: string
+ :type interface_name: str
"""
def __init__(self, interface_name):
PacketVerifier.__init__(self, interface_name)
def create_gratuitous_arp_request(src_mac, src_ip):
- """Creates scapy representation of gratuitous ARP request"""
+ """Creates scapy representation of gratuitous ARP request."""
return (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
ARP(psrc=src_ip, hwsrc=src_mac, pdst=src_ip))
"""QEMU utilities library."""
+from time import time, sleep
import json
import re
-from time import time, sleep
+
from robot.api import logger
+
from resources.libraries.python.ssh import SSH
from resources.libraries.python.constants import Constants
from resources.libraries.python.topology import NodeType
"""Set node to run QEMU on.
:param node: Node to run QEMU on.
- :param node: dict
+ :type node: dict
"""
self._node = node
self._ssh = SSH()
def qemu_start(self):
"""Start QEMU and wait until VM boot.
- :return: VM node info
+ :return: VM node info.
:rtype: dict
.. note:: First set at least node to run QEMU on.
.. warning:: Starts only one VM on the node.
"""Routing utilities library."""
-from VatExecutor import VatTerminal
-from topology import Topology
+from resources.libraries.python.VatExecutor import VatTerminal
+from resources.libraries.python.topology import Topology
class Routing(object):
supposed to end up here.
"""
-import shlex
+from shlex import split
from subprocess import Popen, PIPE, call
from multiprocessing import Pool
from tempfile import NamedTemporaryFile
__all__ = ["SetupFramework"]
+
def pack_framework_dir():
"""Pack the testing WS into temp file, return its name."""
tmpfile.close()
proc = Popen(
- shlex.split("tar --exclude-vcs -zcf {0} .".format(file_name)),
+ split("tar --exclude-vcs -zcf {0} .".format(file_name)),
stdout=PIPE, stderr=PIPE)
(stdout, stderr) = proc.communicate()
def copy_tarball_to_node(tarball, node):
"""Copy tarball file from local host to remote node.
- :param tarball: path to tarball to upload
- :param node: dictionary created from topology
- :type tarball: string
+ :param tarball: Path to tarball to upload.
+ :param node: Dictionary created from topology.
+ :type tarball: str
:type node: dict
:return: nothing
"""
Extracts tarball using tar on given node to specific CSIT loocation.
- :param tarball: path to tarball to upload
- :param node: dictionary created from topology
- :type tarball: string
+ :param tarball: Path to tarball to upload.
+ :param node: Dictionary created from topology.
+ :type tarball: str
:type node: dict
:return: nothing
"""
ssh.connect(node)
(ret_code, stdout, stderr) = ssh.exec_command(
'cd {0} && rm -rf env && virtualenv env && . env/bin/activate && '
- 'pip install -r requirements.txt'.format(con.REMOTE_FW_DIR),
- timeout=100)
+ 'pip install -r requirements.txt'
+ .format(con.REMOTE_FW_DIR), timeout=100)
if 0 != ret_code:
logger.error('Virtualenv creation error: {0}'.format(stdout + stderr))
raise Exception('Virtualenv setup failed')
else:
logger.console('Virtualenv created on {0}'.format(node['host']))
+
def setup_node(args):
"""Run all set-up methods for a node.
This method is used as map_async parameter. It receives tuple with all
parameters as passed to map_async function.
- :param args: all parameters needed to setup one node
+ :param args: All parameters needed to setup one node.
:type args: tuple
:return: nothing
"""
create_env_directory_at_node(node)
logger.console('Setup of node {0} done'.format(node['host']))
+
def delete_local_tarball(tarball):
"""Delete local tarball to prevent disk pollution.
- :param tarball: path to tarball to upload
- :type tarball: string
+ :param tarball: Path to tarball to upload.
+ :type tarball: str
:return: nothing
"""
- call(shlex.split('sh -c "rm {0} > /dev/null 2>&1"'.format(tarball)))
+ call(split('sh -c "rm {0} > /dev/null 2>&1"'.format(tarball)))
+
class SetupFramework(object): # pylint: disable=too-few-public-methods
"""Setup suite run on topology nodes.
to all nodes in topology under /tmp/
"""
- def __init__(self):
- pass
-
@staticmethod
def setup_framework(nodes):
"""Pack the whole directory and extract in temp on each node."""
logger.trace(msg)
remote_tarball = "/tmp/{0}".format(basename(tarball))
- # Turn off loggining since we use multiprocessing
+ # Turn off logging since we use multiprocessing
log_level = BuiltIn().set_log_level('NONE')
params = ((tarball, remote_tarball, node) for node in nodes.values())
pool = Pool(processes=len(nodes))
logger.info('Results: {0}'.format(result.get()))
- # Turn on loggining
+ # Turn on logging
BuiltIn().set_log_level(log_level)
logger.trace('Test framework copied to all topology nodes')
delete_local_tarball(tarball)
"""TG Setup library."""
-from topology import NodeType
-from InterfaceUtil import InterfaceUtil
+from resources.libraries.python.topology import NodeType
+from resources.libraries.python.InterfaceUtil import InterfaceUtil
class TGSetup(object):
from resources.libraries.python.VatExecutor import VatExecutor
from resources.libraries.python.topology import NodeType
+
class Trace(object):
@staticmethod
from robot.api import logger
from robot.libraries.BuiltIn import BuiltIn
-from robot.api.deco import keyword
from resources.libraries.python.ssh import SSH
from resources.libraries.python.topology import NodeType
__all__ = ['TrafficGenerator', 'TGDropRateSearchImpl']
+
class TGDropRateSearchImpl(DropRateSearch):
- """Drop Rate Search implementation"""
+ """Drop Rate Search implementation."""
def __init__(self):
super(TGDropRateSearchImpl, self).__init__()
def measure_loss(self, rate, frame_size, loss_acceptance,
loss_acceptance_type, traffic_type):
- #we need instance of TrafficGenerator instantiated by Robot Framework
- #to be able to use trex_stateless_remote_exec method
- tg_instance = BuiltIn().get_library_instance('resources.libraries.python.TrafficGenerator')
+ # we need instance of TrafficGenerator instantiated by Robot Framework
+ # to be able to use trex_stateless_remote_exec method
+ tg_instance = BuiltIn().get_library_instance(
+ 'resources.libraries.python.TrafficGenerator')
if tg_instance._node['subtype'] is None:
raise Exception('TG subtype not defined')
elif tg_instance._node['subtype'] == NodeSubTypeTG.TREX:
unit_rate = str(rate) + self.get_rate_type_str()
- tg_instance.trex_stateless_remote_exec(self.get_duration(), unit_rate,
- frame_size, traffic_type)
+ tg_instance.trex_stateless_remote_exec(self.get_duration(),
+ unit_rate, frame_size,
+ traffic_type)
- #TODO:getters for tg_instance and loss_acceptance_type
- logger.trace("comparing: {} < {} ".format(tg_instance._loss, loss_acceptance))
+ # TODO: getters for tg_instance and loss_acceptance_type
+ logger.trace("comparing: {} < {} ".format(tg_instance._loss,
+ loss_acceptance))
if float(tg_instance._loss) > float(loss_acceptance):
return False
else:
else:
raise NotImplementedError("TG subtype not supported")
+
class TrafficGenerator(object):
- """Traffic Generator"""
+ """Traffic Generator."""
- #use one instance of TrafficGenerator for all tests in test suite
+ # use one instance of TrafficGenerator for all tests in test suite
ROBOT_LIBRARY_SCOPE = 'TEST SUITE'
def __init__(self):
self._sent = None
self._received = None
self._node = None
- #T-REX interface order mapping
+ # T-REX interface order mapping
self._ifaces_reordered = 0
def initialize_traffic_generator(self, tg_node, tg_if1, tg_if2,
dut1_node, dut1_if1, dut1_if2,
dut2_node, dut2_if1, dut2_if2,
test_type):
- """TG initialization
- :param tg_node: Traffic generator node
- :param tg_if1: TG - name of first interface
- :param tg_if2: TG - name of second interface
- :param dut1_node: DUT1 node
- :param dut1_if1: DUT1 - name of first interface
- :param dut1_if2: DUT1 - name of second interface
- :param dut2_node: DUT2 node
- :param dut2_if1: DUT2 - name of first interface
- :param dut2_if2: DUT2 - name of second interface
- :test_type: 'L2' or 'L3' - src/dst MAC address
+ """TG initialization.
+
+ :param tg_node: Traffic generator node.
+ :param tg_if1: TG - name of first interface.
+ :param tg_if2: TG - name of second interface.
+ :param dut1_node: DUT1 node.
+ :param dut1_if1: DUT1 - name of first interface.
+ :param dut1_if2: DUT1 - name of second interface.
+ :param dut2_node: DUT2 node.
+ :param dut2_if1: DUT2 - name of first interface.
+ :param dut2_if2: DUT2 - name of second interface.
+ :test_type: 'L2' or 'L3' - src/dst MAC address.
:type tg_node: dict
:type tg_if1: str
:type tg_if2: str
:type test_type: str
:return: nothing
"""
-
trex_path = "/opt/trex-core-1.91"
topo = Topology()
@staticmethod
def teardown_traffic_generator(node):
- """TG teardown
- :param node: Traffic generator node
+ """TG teardown.
+
+ :param node: Traffic generator node.
:type node: dict
:return: nothing
"""
-
if node['type'] != NodeType.TG:
raise Exception('Node type is not a TG')
if node['subtype'] == NodeSubTypeTG.TREX:
def trex_stateless_remote_exec(self, duration, rate, framesize,
traffic_type):
- """Execute stateless script on remote node over ssh
+ """Execute stateless script on remote node over ssh.
- :param node: remote node
- :param traffic_type: Traffic profile
- :type node: dict
+ :param traffic_type: Traffic profile.
:type traffic_type: str
"""
ssh = SSH()
logger.trace(stdout)
logger.trace(stderr)
- #last line from console output
+ # last line from console output
line = stdout.splitlines()[-1]
self._result = line
def send_traffic_on(self, node, duration, rate,
framesize, traffic_type):
- """Send traffic from all configured interfaces on TG
- :param node: Dictionary containing TG information
- :param duration: Duration of test traffic generation in seconds
- :param rate: Offered load per interface (e.g. 1%, 3gbps, 4mpps, ...)
- :param framesize: Frame size (L2) in Bytes
- :param traffic_type: Traffic profile
+ """Send traffic from all configured interfaces on TG.
+
+ :param node: Dictionary containing TG information.
+ :param duration: Duration of test traffic generation in seconds.
+ :param rate: Offered load per interface (e.g. 1%, 3gbps, 4mpps, ...).
+ :param framesize: Frame size (L2) in Bytes.
+ :param traffic_type: Traffic profile.
:type node: dict
:type duration: str
:type rate: str
:type framesize: str
:type traffic_type: str
- :return: TG output
+ :return: TG output.
:rtype: str
"""
-
if node['type'] != NodeType.TG:
raise Exception('Node type is not a TG')
return self._result
- def no_traffic_loss_occured(self):
- """Fail is loss occured in traffic run
+ def no_traffic_loss_occurred(self):
+ """Fail is loss occurred in traffic run.
+
:return: nothing
"""
-
if self._loss is None:
raise Exception('The traffic generation has not been issued')
if self._loss != '0':
- raise Exception('Traffic loss occured: {0}'.format(self._loss))
+ raise Exception('Traffic loss occurred: {0}'.format(self._loss))
and '--rx_if'. You can provide more arguments. All arguments have string
representation of the value.
- :param more_args: List of aditional arguments (optional).
+ :param more_args: List of additional arguments (optional).
:type more_args: list
:Example:
"""Traffic script executor library."""
-from constants import Constants
-from ssh import SSH
from robot.api import logger
+from resources.libraries.python.constants import Constants
+from resources.libraries.python.ssh import SSH
+
__all__ = ['TrafficScriptExecutor']
def _escape(string):
"""Escape quotation mark and dollar mark for shell command.
- :param string: String to escape.
- :type string: str
- :return: Escaped string.
- :rtype: str
+ :param string: String to escape.
+ :type string: str
+ :return: Escaped string.
+ :rtype: str
"""
return string.replace('"', '\\"').replace("$", "\\$")
timeout=10):
"""Run traffic script on the TG node.
- :param script_file_name: Traffic script name
- :param node: Node to run traffic script on.
- :param script_args: Traffic scripts arguments.
- :param timeout: Timeout (optional).
- :type script_file_name: str
- :type node: dict
- :type script_args: str
- :type timeout: int
+ :param script_file_name: Traffic script name.
+ :param node: Node to run traffic script on.
+ :param script_args: Traffic scripts arguments.
+ :param timeout: Timeout (optional).
+ :type script_file_name: str
+ :type node: dict
+ :type script_args: str
+ :type timeout: int
"""
logger.trace("{}".format(timeout))
ssh = SSH()
def traffic_script_gen_arg(rx_if, tx_if, src_mac, dst_mac, src_ip, dst_ip):
"""Generate traffic script basic arguments string.
- :param rx_if: Interface that receives traffic.
- :param tx_if: Interface that sends traffic.
- :param src_mac: Source MAC address.
- :param dst_mac: Destination MAC address.
- :param src_ip: Source IP address.
- :param dst_ip: Destination IP address.
- :type rx_if: str
- :type tx_if: str
- :type src_mac: str
- :type dst_mac: str
- :type src_ip: str
- :type dst_ip: str
- :return: Traffic script arguments string.
- :rtype: str
+ :param rx_if: Interface that receives traffic.
+ :param tx_if: Interface that sends traffic.
+ :param src_mac: Source MAC address.
+ :param dst_mac: Destination MAC address.
+ :param src_ip: Source IP address.
+ :param dst_ip: Destination IP address.
+ :type rx_if: str
+ :type tx_if: str
+ :type src_mac: str
+ :type dst_mac: str
+ :type src_ip: str
+ :type dst_ip: str
+ :return: Traffic script arguments string.
+ :rtype: str
"""
- args = '--rx_if {0} --tx_if {1} --src_mac {2} --dst_mac {3} --src_ip' \
- ' {4} --dst_ip {5}'.format(rx_if, tx_if, src_mac, dst_mac, src_ip,
- dst_ip)
+ args = ('--rx_if {0} --tx_if {1} --src_mac {2} --dst_mac {3} --src_ip'
+ ' {4} --dst_ip {5}').format(rx_if, tx_if, src_mac, dst_mac,
+ src_ip, dst_ip)
return args
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from ssh import SSH
-from robot.api import logger
-from constants import Constants
+
import json
+from robot.api import logger
+
+from resources.libraries.python.ssh import SSH
+from resources.libraries.python.constants import Constants
+
+
__all__ = ['VatExecutor']
def cleanup_vat_json_output(json_output):
- """Return VAT json output cleaned from VAT clutter.
+ """Return VAT JSON output cleaned from VAT clutter.
+
+ Clean up VAT JSON output from clutter like vat# prompts and such.
- Clean up VAT json output from clutter like vat# prompts and such
- :param json_output: cluttered json output.
- :return: cleaned up output json string
+ :param json_output: Cluttered JSON output.
+ :return: Cleaned up output JSON string.
"""
retval = json_output
def execute_script(self, vat_name, node, timeout=10, json_out=True):
"""Copy local_path script to node, execute it and return result.
- :param vat_name: name of the vat script file. Only the file name of
- the script is required, the resources path is prepended
- automatically.
- :param node: node to execute the VAT script on.
- :param timeout: seconds to allow the script to run.
- :param json_out: require json output.
+ :param vat_name: Name of the vat script file. Only the file name of
+ the script is required, the resources path is prepended automatically.
+ :param node: Node to execute the VAT script on.
+ :param timeout: Seconds to allow the script to run.
+ :param json_out: Require JSON output.
:return: (rc, stdout, stderr) tuple.
"""
@staticmethod
def cmd_from_template(node, vat_template_file, **vat_args):
"""Execute VAT script on specified node. This method supports
- script templates with parameters
- :param node: node in topology on witch the script is executed
- :param vat_template_file: template file of VAT script
- :param vat_args: arguments to the template file
- :return: list of json objects returned by VAT
+ script templates with parameters.
+
+ :param node: Node in topology on witch the script is executed.
+ :param vat_template_file: Template file of VAT script.
+ :param vat_args: Arguments to the template file.
+ :return: List of JSON objects returned by VAT.
"""
with VatTerminal(node) as vat:
return vat.vat_terminal_exec_cmd_from_template(vat_template_file,
**vat_args)
- @staticmethod
- def copy_config_to_remote(node, local_path, remote_path):
- # TODO: will be removed once v4 is merged to master.
- """Copies vat configuration file to node
-
- :param node: Remote node on which to copy the VAT configuration file
- :param local_path: path of the VAT script on local device that launches
- test cases.
- :param remote_path: path on remote node where to copy the VAT
- configuration script file
- """
- ssh = SSH()
- ssh.connect(node)
- logger.trace("Removing old file {}".format(remote_path))
- ssh.exec_command_sudo("rm -f {}".format(remote_path))
- ssh.scp(local_path, remote_path)
-
class VatTerminal(object):
- """VAT interactive terminal
+ """VAT interactive terminal.
- :param node: Node to open VAT terminal on.
- :param json_param: Defines if outputs from VAT are in JSON format.
- Default is True.
- :type node: dict
- :type json_param: bool
+ :param node: Node to open VAT terminal on.
+ :param json_param: Defines if outputs from VAT are in JSON format.
+ Default is True.
+ :type node: dict
+ :type json_param: bool
"""
def vat_terminal_exec_cmd(self, cmd):
"""Execute command on the opened VAT terminal.
- :param cmd: Command to be executed.
+ :param cmd: Command to be executed.
- :return: Command output in python representation of JSON format or
- None if not in JSON mode.
+ :return: Command output in python representation of JSON format or
+ None if not in JSON mode.
"""
logger.debug("Executing command in VAT terminal: {}".format(cmd))
out = self._ssh.interactive_terminal_exec_command(self._tty,
def vat_terminal_exec_cmd_from_template(self, vat_template_file, **args):
"""Execute VAT script from a file.
- :param vat_template_file: template file name of a VAT script
- :param args: dictionary of parameters for VAT script
- :return: list of json objects returned by VAT
+
+ :param vat_template_file: Template file name of a VAT script.
+ :param args: Dictionary of parameters for VAT script.
+ :return: List of JSON objects returned by VAT.
"""
file_path = '{}/{}'.format(Constants.RESOURCES_TPL_VAT,
vat_template_file)
"""Return interface dictionary from interface_list by MAC address.
Extracts interface dictionary from all of the interfaces in interfaces
- list parsed from json according to mac_address of the interface.
+ list parsed from JSON according to mac_address of the interface.
:param interfaces_list: Interfaces parsed from JSON.
:param mac_address: MAC address of interface we are looking for.
@staticmethod
def update_vpp_interface_data_from_json(node, interface_dump_json):
- """Update vpp node data in node__DICT from json interface dump.
+ """Update vpp node data in node__DICT from JSON interface dump.
This method updates vpp interface names and sw if indexes according to
interface MAC addresses found in interface_dump_json.
"""VPP Configuration File Generator library"""
+import re
+import time
+
from robot.api import logger
from resources.libraries.python.ssh import SSH
from resources.libraries.python.topology import NodeType
from resources.libraries.python.topology import Topology
-import re
-import time
-
__all__ = ['VppConfigGenerator']
#
def add_pci_device(self, node, pci_device=None):
"""Add PCI device configuration for node.
- :param node: DUT node
+ :param node: DUT node.
:param pci_device: PCI device (format 0000:00:00.0 or 00:00.0).
If none given, all PCI devices for this node as per topology will be
added.
:type node: dict
- :type pci_device: string
+ :type pci_device: str
:return: nothing
"""
if node['type'] != NodeType.DUT:
# Specific device was given.
hostname = Topology.get_node_hostname(node)
- pattern = re.compile("^[0-9A-Fa-f]{4}:[0-9A-Fa-f]{2}:"\
- "[0-9A-Fa-f]{2}\\.[0-9A-Fa-f]$")
+ pattern = re.compile("^[0-9A-Fa-f]{4}:[0-9A-Fa-f]{2}:"
+ "[0-9A-Fa-f]{2}\\.[0-9A-Fa-f]$")
if not pattern.match(pci_device):
- raise ValueError('PCI address {} to be added to host {} '\
- 'is not in valid format xxxx:xx:xx.x'.\
- format(pci_device, hostname))
+ raise ValueError('PCI address {} to be added to host {} '
+ 'is not in valid format xxxx:xx:xx.x'.
+ format(pci_device, hostname))
- if not hostname in self._nodeconfig:
+ if hostname not in self._nodeconfig:
self._nodeconfig[hostname] = {}
- if not 'pci_addrs' in self._nodeconfig[hostname]:
+ if 'pci_addrs' not in self._nodeconfig[hostname]:
self._nodeconfig[hostname]['pci_addrs'] = []
self._nodeconfig[hostname]['pci_addrs'].append(pci_device)
- logger.debug('Adding PCI device {1} to {0}'.format(hostname,\
- pci_device))
+ logger.debug('Adding PCI device {1} to {0}'.format(hostname,
+ pci_device))
def add_cpu_config(self, node, cpu_config):
"""Add CPU configuration for node.
- :param node: DUT node
- :param cpu_config: CPU configuration option, as a string
+ :param node: DUT node.
+ :param cpu_config: CPU configuration option, as a string.
:type node: dict
- :type cpu_config: string
+ :type cpu_config: str
:return: nothing
"""
if node['type'] != NodeType.DUT:
raise ValueError('Node type is not a DUT')
hostname = Topology.get_node_hostname(node)
- if not hostname in self._nodeconfig:
+ if hostname not in self._nodeconfig:
self._nodeconfig[hostname] = {}
- if not 'cpu_config' in self._nodeconfig[hostname]:
+ if 'cpu_config' not in self._nodeconfig[hostname]:
self._nodeconfig[hostname]['cpu_config'] = []
self._nodeconfig[hostname]['cpu_config'].append(cpu_config)
- logger.debug('Adding {} to hostname {} CPU config'.format(hostname, \
- cpu_config))
+ logger.debug('Adding {} to hostname {} CPU config'.format(hostname,
+ cpu_config))
def add_socketmem_config(self, node, socketmem_config):
"""Add Socket Memory configuration for node.
- :param node: DUT node
- :param socketmem_config: Socket Memory configuration option, as a string
+ :param node: DUT node.
+ :param socketmem_config: Socket Memory configuration option,
+ as a string.
:type node: dict
- :type cpu_config: string
+ :type socketmem_config: str
:return: nothing
"""
if node['type'] != NodeType.DUT:
raise ValueError('Node type is not a DUT')
hostname = Topology.get_node_hostname(node)
- if not hostname in self._nodeconfig:
+ if hostname not in self._nodeconfig:
self._nodeconfig[hostname] = {}
self._nodeconfig[hostname]['socketmem_config'] = socketmem_config
- logger.debug('Setting hostname {} Socket Memory config to {}'.\
- format(hostname, socketmem_config))
+ logger.debug('Setting hostname {} Socket Memory config to {}'.
+ format(hostname, socketmem_config))
def add_heapsize_config(self, node, heapsize_config):
"""Add Heap Size configuration for node.
- :param node: DUT node
- :param heapsize_config: Heap Size configuration, as a string
+ :param node: DUT node.
+ :param heapsize_config: Heap Size configuration, as a string.
:type node: dict
- :type cpu_config: string
+ :type heapsize_config: str
:return: nothing
"""
if node['type'] != NodeType.DUT:
raise ValueError('Node type is not a DUT')
hostname = Topology.get_node_hostname(node)
- if not hostname in self._nodeconfig:
+ if hostname not in self._nodeconfig:
self._nodeconfig[hostname] = {}
self._nodeconfig[hostname]['heapsize_config'] = heapsize_config
- logger.debug('Setting hostname {} Heap Size config to {}'.\
- format(hostname, heapsize_config))
+ logger.debug('Setting hostname {} Heap Size config to {}'.
+ format(hostname, heapsize_config))
def add_rss_config(self, node, rss_config):
"""Add RSS configuration for node.
- :param node: DUT node
- :param rss_config: RSS configuration, as a string
+ :param node: DUT node.
+ :param rss_config: RSS configuration, as a string.
:type node: dict
- :type rss_config: string
+ :type rss_config: str
:return: nothing
"""
if node['type'] != NodeType.DUT:
def remove_all_pci_devices(self, node):
"""Remove PCI device configuration from node.
- :param node: DUT node
- :type: node: dict
+ :param node: DUT node.
+ :type node: dict
:return: nothing
"""
if node['type'] != NodeType.DUT:
hostname = Topology.get_node_hostname(node)
if hostname in self._nodeconfig:
self._nodeconfig[hostname]['pci_addrs'] = []
- logger.debug('Clearing all PCI devices for hostname {}.'.\
- format(hostname))
+ logger.debug('Clearing all PCI devices for hostname {}.'.
+ format(hostname))
def remove_all_cpu_config(self, node):
"""Remove CPU configuration from node.
- :param node: DUT node
- :type: node: dict
+ :param node: DUT node.
+ :type node: dict
:return: nothing
"""
if node['type'] != NodeType.DUT:
hostname = Topology.get_node_hostname(node)
if hostname in self._nodeconfig:
self._nodeconfig[hostname]['cpu_config'] = []
- logger.debug('Clearing all CPU config for hostname {}.'.\
- format(hostname))
+ logger.debug('Clearing all CPU config for hostname {}.'.
+ format(hostname))
def remove_socketmem_config(self, node):
"""Remove Socket Memory configuration from node.
- :param node: DUT node
- :type: node: dict
+ :param node: DUT node.
+ :type node: dict
:return: nothing
"""
if node['type'] != NodeType.DUT:
hostname = Topology.get_node_hostname(node)
if hostname in self._nodeconfig:
self._nodeconfig[hostname].pop('socketmem_config', None)
- logger.debug('Clearing Socket Memory config for hostname {}.'.\
- format(hostname))
+ logger.debug('Clearing Socket Memory config for hostname {}.'.
+ format(hostname))
def remove_heapsize_config(self, node):
"""Remove Heap Size configuration from node.
- :param node: DUT node
- :type: node: dict
+ :param node: DUT node.
+ :type node: dict
:return: nothing
"""
if node['type'] != NodeType.DUT:
hostname = Topology.get_node_hostname(node)
if hostname in self._nodeconfig:
self._nodeconfig[hostname].pop('heapsize_config', None)
- logger.debug('Clearing Heap Size config for hostname {}.'.\
- format(hostname))
+ logger.debug('Clearing Heap Size config for hostname {}.'.
+ format(hostname))
def remove_rss_config(self, node):
"""Remove RSS configuration from node.
- :param node: DUT node
- :type: node: dict
+ :param node: DUT node.
+ :type node: dict
:return: nothing
"""
if node['type'] != NodeType.DUT:
Use data from calls to this class to form a startup.conf file and
replace /etc/vpp/startup.conf with it on node.
- :param node: DUT node
- :param waittime: time to wait for VPP to restart (default 5 seconds)
- :param retries: number of times (default 12) to re-try waiting
+ :param node: DUT node.
+ :param waittime: Time to wait for VPP to restart (default 5 seconds).
+ :param retries: Number of times (default 12) to re-try waiting.
:type node: dict
:type waittime: int
:type retries: int
heapsizeconfig=heapsizeconfig,
rssconfig=rssconfig)
- logger.debug('Writing VPP config to host {}: "{}"'.format(hostname,\
- vppconfig))
+ logger.debug('Writing VPP config to host {}: "{}"'.format(hostname,
+ vppconfig))
ssh = SSH()
ssh.connect(node)
# a sudo'd outut ("sudo echo xxx > /path/to/file") does not
# work on most platforms...
(ret, stdout, stderr) = \
- ssh.exec_command('echo "{0}" | sudo tee {1}'.\
- format(vppconfig, VPP_CONFIG_FILENAME))
+ ssh.exec_command('echo "{0}" | sudo tee {1}'.
+ format(vppconfig, VPP_CONFIG_FILENAME))
if ret != 0:
- logger.debug('Writing config file failed to node {}'.\
- format(hostname))
+ logger.debug('Writing config file failed to node {}'.
+ format(hostname))
logger.debug('stdout: {}'.format(stdout))
logger.debug('stderr: {}'.format(stderr))
- raise RuntimeError('Writing config file failed to node {}'.\
- format(hostname))
+ raise RuntimeError('Writing config file failed to node {}'.
+ format(hostname))
# Instead of restarting, we'll do separate start and stop
# actions. This way we don't care whether VPP was running
(ret, stdout, stderr) = \
ssh.exec_command('sudo initctl start {}'.format(VPP_SERVICE_NAME))
if ret != 0:
- logger.debug('Restarting VPP failed on node {}'.\
- format(hostname))
+ logger.debug('Restarting VPP failed on node {}'.
+ format(hostname))
logger.debug('stdout: {}'.format(stdout))
logger.debug('stderr: {}'.format(stderr))
- raise RuntimeError('Restarting VPP failed on node {}'.\
- format(hostname))
+ raise RuntimeError('Restarting VPP failed on node {}'.
+ format(hostname))
# Sleep <waittime> seconds, up to <retry> times,
# and verify if VPP is running.
# healthy or not, or a call that waits (up to a defined length
# of time) and returns immediately if VPP is or becomes healthy.
(ret, stdout, stderr) = \
- ssh.exec_command('echo show hardware-interfaces | '\
- 'nc 0 5002')
+ ssh.exec_command('echo show hardware-interfaces | '
+ 'nc 0 5002')
if ret == 0:
vpp_is_running = True
else:
- logger.debug('VPP not yet running, {} retries left'.\
- format(retries_left))
+ logger.debug('VPP not yet running, {} retries left'.
+ format(retries_left))
if retries_left == 0:
- raise RuntimeError('VPP failed to restart on node {}'.\
- format(hostname))
- logger.debug('VPP interfaces found on node {}'.\
- format(stdout))
+ raise RuntimeError('VPP failed to restart on node {}'.
+ format(hostname))
+ logger.debug('VPP interfaces found on node {}'.
+ format(stdout))
"""VPP counters utilities library."""
import time
-from topology import NodeType, Topology
-from VatExecutor import VatExecutor, VatTerminal
+
from robot.api import logger
+from resources.libraries.python.topology import NodeType, Topology
+from resources.libraries.python.VatExecutor import VatExecutor, VatTerminal
+
class VppCounters(object):
"""VPP counters utilities."""
def vpp_nodes_clear_interface_counters(self, nodes):
"""Clear interface counters on all VPP nodes in topology.
- :param nodes: Nodes in topology.
- :type nodes: dict
+ :param nodes: Nodes in topology.
+ :type nodes: dict
"""
for node in nodes.values():
if node['type'] == NodeType.DUT:
@staticmethod
def vpp_show_errors_verbose(node):
- """Run "show errors verbose" debug CLI command
+ """Run "show errors verbose" debug CLI command.
- :param node: Node to run command on
+ :param node: Node to run command on.
:type node: dict
"""
vat = VatExecutor()
@staticmethod
def vpp_show_runtime_verbose(node):
- """Run "show runtime" debug CLI command
+ """Run "show runtime" debug CLI command.
- :param node: Node to run command on
+ :param node: Node to run command on.
:type node: dict
"""
vat = VatExecutor()
@staticmethod
def vpp_show_hardware_detail(node):
- """Run "show hardware-interfaces detail" debug CLI command
+ """Run "show hardware-interfaces detail" debug CLI command.
- :param node: Node to run command on
+ :param node: Node to run command on.
:type node: dict
"""
vat = VatExecutor()
def vpp_clear_interface_counters(node):
"""Clear interface counters on VPP node.
- :param node: Node to clear interface counters on.
- :type node: dict
+ :param node: Node to clear interface counters on.
+ :type node: dict
"""
vat = VatExecutor()
vat.execute_script('clear_interface.vat', node)
def vpp_dump_stats_table(self, node):
"""Dump stats table on VPP node.
- :param node: Node to dump stats table on.
- :type node: dict
- :return: Stats table.
+ :param node: Node to dump stats table on.
+ :type node: dict
+ :return: Stats table.
"""
with VatTerminal(node) as vat:
vat.vat_terminal_exec_cmd('want_stats enable')
return self.vpp_get_ipv46_interface_counter(node, interface, True)
def vpp_get_ipv46_interface_counter(self, node, interface, is_ipv6=True):
- """Return interface IPv4/IPv6 counter
-
- :param node: Node to get interface IPv4/IPv6 counter on.
- :param interface: Interface name.
- :type node: dict
- :type interface: str
- :return: Interface IPv4/IPv6 counter.
- :param is_ipv6: specify IP version
- :type is_ipv6: bool
- :rtype: int
+ """Return interface IPv4/IPv6 counter.
+
+ :param node: Node to get interface IPv4/IPv6 counter on.
+ :param interface: Interface name.
+ :param is_ipv6: Specify IP version.
+ :type node: dict
+ :type interface: str
+ :type is_ipv6: bool
+ :return: Interface IPv4/IPv6 counter.
+ :rtype: int
"""
version = 'ip6' if is_ipv6 else 'ip4'
topo = Topology()
# See the License for the specific language governing permissions and
# limitations under the License.
-"""Used to parse Json files or Json data strings to dictionaries"""
+"""Used to parse JSON files or JSON data strings to dictionaries"""
import json
class JsonParser(object):
- """Parses Json data string or files containing Json data strings"""
+ """Parses JSON data string or files containing JSON data strings"""
def __init__(self):
pass
@staticmethod
def parse_data(json_data):
- """Return list parsed from json data string.
+ """Return list parsed from JSON data string.
- Translates json data into list of values/dictionaries/lists
- :param json_data: data in json format
- :return: json data parsed as python list
+ Translates JSON data into list of values/dictionaries/lists.
+
+ :param json_data: Data in JSON format.
+ :type json_data: str
+ :return: JSON data parsed as python list.
+ :rtype: list
"""
parsed_data = json.loads(json_data)
return parsed_data
- def parse_file(self, json_file):
- """Return list parsed from file containing json string.
+ @staticmethod
+ def parse_file(json_file):
+ """Return list parsed from file containing JSON string.
+
+ Translates JSON data found in file into list of
+ values/dictionaries/lists.
- Translates json data found in file into list of
- values/dictionaries/lists
- :param json_file: file with json type data
- :return: json data parsed as python list
+ :param json_file: File with JSON type data.
+ :type json_file: str
+ :return: JSON data parsed as python list.
+ :rtype: list
"""
input_data = open(json_file).read()
- parsed_data = self.parse_data(input_data)
+ parsed_data = JsonParser.parse_data(input_data)
return parsed_data
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+import StringIO
+from time import time
+
import socket
import paramiko
from paramiko import RSAKey
-import StringIO
from scp import SCPClient
-from time import time
-from robot.api import logger
from interruptingcow import timeout
-from robot.utils.asserts import assert_equal, assert_not_equal
+from robot.api import logger
+from robot.utils.asserts import assert_equal
__all__ = ["exec_cmd", "exec_cmd_no_error"]
__existing_connections = {}
def __init__(self):
- pass
+ self._ssh = None
- def _node_hash(self, node):
+ @staticmethod
+ def _node_hash(node):
return hash(frozenset([node['host'], node['port']]))
def connect(self, node):
logger.debug('new ssh: {0}'.format(self._ssh))
logger.debug('Connect peer: {0}'.
- format(self._ssh.get_transport().getpeername()))
+ format(self._ssh.get_transport().getpeername()))
logger.debug('Connections: {0}'.format(str(SSH.__existing_connections)))
def disconnect(self, node):
'of buffer: {0}'.format(stdout))
raise
-
stderr = ""
while True:
buf = chan.recv_stderr(self.__MAX_RECV_BUF)
def exec_command_sudo(self, cmd, cmd_input=None, timeout=10):
"""Execute SSH command with sudo on a new channel on the connected Node.
- :param cmd: Command to be executed.
- :param cmd_input: Input redirected to the command.
- :param timeout: Timeout.
- :return: return_code, stdout, stderr
+ :param cmd: Command to be executed.
+ :param cmd_input: Input redirected to the command.
+ :param timeout: Timeout.
+ :return: return_code, stdout, stderr
- :Example:
+ :Example:
- >>> from ssh import SSH
- >>> ssh = SSH()
- >>> ssh.connect(node)
- >>> #Execute command without input (sudo -S cmd)
- >>> ssh.exec_command_sudo("ifconfig eth0 down")
- >>> #Execute command with input (sudo -S cmd <<< "input")
- >>> ssh.exec_command_sudo("vpp_api_test", "dump_interface_table")
+ >>> from ssh import SSH
+ >>> ssh = SSH()
+ >>> ssh.connect(node)
+ >>> # Execute command without input (sudo -S cmd)
+ >>> ssh.exec_command_sudo("ifconfig eth0 down")
+ >>> # Execute command with input (sudo -S cmd <<< "input")
+ >>> ssh.exec_command_sudo("vpp_api_test", "dump_interface_table")
"""
if cmd_input is None:
command = 'sudo -S {c}'.format(c=cmd)
def interactive_terminal_open(self, time_out=10):
"""Open interactive terminal on a new channel on the connected Node.
- :param time_out: Timeout in seconds.
- :return: SSH channel with opened terminal.
+ :param time_out: Timeout in seconds.
+ :return: SSH channel with opened terminal.
- .. warning:: Interruptingcow is used here, and it uses
- signal(SIGALRM) to let the operating system interrupt program
- execution. This has the following limitations: Python signal
- handlers only apply to the main thread, so you cannot use this
- from other threads. You must not use this in a program that
- uses SIGALRM itself (this includes certain profilers)
+ .. warning:: Interruptingcow is used here, and it uses
+ signal(SIGALRM) to let the operating system interrupt program
+ execution. This has the following limitations: Python signal
+ handlers only apply to the main thread, so you cannot use this
+ from other threads. You must not use this in a program that
+ uses SIGALRM itself (this includes certain profilers)
"""
chan = self._ssh.get_transport().open_session()
chan.get_pty()
raise Exception('Open interactive terminal timeout.')
return chan
- def interactive_terminal_exec_command(self, chan, cmd, prompt,
+ @staticmethod
+ def interactive_terminal_exec_command(chan, cmd, prompt,
time_out=10):
"""Execute command on interactive terminal.
- interactive_terminal_open() method has to be called first!
-
- :param chan: SSH channel with opened terminal.
- :param cmd: Command to be executed.
- :param prompt: Command prompt, sequence of characters used to
- indicate readiness to accept commands.
- :param time_out: Timeout in seconds.
- :return: Command output.
-
- .. warning:: Interruptingcow is used here, and it uses
- signal(SIGALRM) to let the operating system interrupt program
- execution. This has the following limitations: Python signal
- handlers only apply to the main thread, so you cannot use this
- from other threads. You must not use this in a program that
- uses SIGALRM itself (this includes certain profilers)
+ interactive_terminal_open() method has to be called first!
+
+ :param chan: SSH channel with opened terminal.
+ :param cmd: Command to be executed.
+ :param prompt: Command prompt, sequence of characters used to
+ indicate readiness to accept commands.
+ :param time_out: Timeout in seconds.
+ :return: Command output.
+
+ .. warning:: Interruptingcow is used here, and it uses
+ signal(SIGALRM) to let the operating system interrupt program
+ execution. This has the following limitations: Python signal
+ handlers only apply to the main thread, so you cannot use this
+ from other threads. You must not use this in a program that
+ uses SIGALRM itself (this includes certain profilers)
"""
chan.sendall('{c}\n'.format(c=cmd))
buf = ''
tmp = buf.replace(cmd.replace('\n', ''), '')
return tmp.replace(prompt, '')
- def interactive_terminal_close(self, chan):
+ @staticmethod
+ def interactive_terminal_close(chan):
"""Close interactive terminal SSH channel.
- :param: chan: SSH channel to be closed.
+ :param: chan: SSH channel to be closed.
"""
chan.close()
logger.error(e)
return None
- return (ret_code, stdout, stderr)
+ return ret_code, stdout, stderr
+
def exec_cmd_no_error(node, cmd, timeout=None, sudo=False):
"""Convenience function to ssh/exec/return out & err.
+
Verifies that return code is zero.
Returns (stdout, stderr).
(rc, stdout, stderr) = exec_cmd(node, cmd, timeout=timeout, sudo=sudo)
assert_equal(rc, 0, 'Command execution failed: "{}"\n{}'.
format(cmd, stderr))
- return (stdout, stderr)
+ return stdout, stderr
"""Defines nodes and topology structure."""
+from yaml import load
+
from robot.api import logger
from robot.libraries.BuiltIn import BuiltIn
from robot.api.deco import keyword
-from yaml import load
__all__ = ["DICT__nodes", 'Topology']
def load_topo_from_yaml():
- """Load topology from file defined in "${TOPOLOGY_PATH}" variable
+ """Load topology from file defined in "${TOPOLOGY_PATH}" variable.
- :return: nodes from loaded topology
+ :return: Nodes from loaded topology.
"""
topo_path = BuiltIn().get_variable_value("${TOPOLOGY_PATH}")
class NodeType(object):
- """Defines node types used in topology dictionaries"""
+ """Defines node types used in topology dictionaries."""
# Device Under Test (this node has VPP running on it)
DUT = 'DUT'
# Traffic Generator (this node has traffic generator on it)
class NodeSubTypeTG(object):
- #T-Rex traffic generator
+ # T-Rex traffic generator
TREX = 'TREX'
# Moongen
MOONGEN = 'MOONGEN'
class Topology(object):
- """Topology data manipulation and extraction methods
+ """Topology data manipulation and extraction methods.
Defines methods used for manipulation and extraction of data from
the used topology.
@staticmethod
def _get_interface_by_key_value(node, key, value):
- """Return node interface name according to key and value
+ """Return node interface name according to key and value.
- :param node: :param node: the node dictionary
- :param key: key by which to select the interface.
- :param value: value that should be found using the key.
+ :param node: The node dictionary.
+ :param key: Key by which to select the interface.
+ :param value: Value that should be found using the key.
:return:
"""
-
interfaces = node['interfaces']
retval = None
for interface in interfaces.values():
def get_interface_by_link_name(self, node, link_name):
"""Return interface name of link on node.
- This method returns the interface name asociated with a given link
+ This method returns the interface name associated with a given link
for a given node.
- :param link_name: name of the link that a interface is connected to.
- :param node: the node topology dictionary
- :return: interface name of the interface connected to the given link
- """
+ :param link_name: Name of the link that a interface is connected to.
+ :param node: The node topology dictionary.
+ :return: Interface name of the interface connected to the given link.
+ :rtype: str
+ """
return self._get_interface_by_key_value(node, "link", link_name)
def get_interfaces_by_link_names(self, node, link_names):
- """Return dictionary of dicitonaries {"interfaceN", interface name}.
+ """Return dictionary of dictionaries {"interfaceN", interface name}.
- This method returns the interface names asociated with given links
+ This method returns the interface names associated with given links
for a given node.
- :param link_names: list of names of the link that a interface is
+
+ :param link_names: List of names of the link that a interface is
connected to.
- :param node: the node topology directory
- :return: dictionary of interface names that are connected to the given
- links
+ :param node: The node topology directory.
+ :return: Dictionary of interface names that are connected to the given
+ links.
+ :rtype: dict
"""
-
retval = {}
interface_key_tpl = "interface{}"
interface_number = 1
def get_interface_by_sw_index(self, node, sw_index):
"""Return interface name of link on node.
- This method returns the interface name asociated with a software index
- assigned to the interface by vpp for a given node.
- :param sw_index: sw_index of the link that a interface is connected to.
- :param node: the node topology dictionary
- :return: interface name of the interface connected to the given link
- """
+ This method returns the interface name associated with a software
+ interface index assigned to the interface by vpp for a given node.
+ :param sw_index: Sw_index of the link that a interface is connected to.
+ :param node: The node topology dictionary.
+ :return: Interface name of the interface connected to the given link.
+ :rtype: str
+ """
return self._get_interface_by_key_value(node, "vpp_sw_index", sw_index)
@staticmethod
link_name = port_data['link']
break
- if link_name is None:
+ if link_name is None:
return None
# find link
link_name = None
# get link name where the interface belongs to
for port_name, port_data in node['interfaces'].iteritems():
- if port_name == 'mgmt':
- continue
if port_data['name'] == interface_name:
link_name = port_data['link']
break
@staticmethod
def get_node_link_mac(node, link_name):
- """Return interface mac address by link name
+ """Return interface mac address by link name.
- :param node: Node to get interface sw_index on
- :param link_name: link name
+ :param node: Node to get interface sw_index on.
+ :param link_name: Link name.
:type node: dict
- :type link_name: string
- :return: mac address string
+ :type link_name: str
+ :return: MAC address string.
+ :rtype: str
"""
for port in node['interfaces'].values():
if port.get('link') == link_name:
@staticmethod
def _get_node_active_link_names(node):
- """Return list of link names that are other than mgmt links
+ """Return list of link names that are other than mgmt links.
- :param node: node topology dictionary
- :return: list of strings that represent link names occupied by the node
+ :param node: Node topology dictionary.
+ :return: List of strings that represent link names occupied by the node.
+ :rtype: list
"""
interfaces = node['interfaces']
link_names = []
@keyword('Get active links connecting "${node1}" and "${node2}"')
def get_active_connecting_links(self, node1, node2):
- """Return list of link names that connect together node1 and node2
+ """Return list of link names that connect together node1 and node2.
- :param node1: node topology dictionary
- :param node2: node topology dictionary
- :return: list of strings that represent connecting link names
+ :param node1: Node topology dictionary.
+ :param node2: Node topology dictionary.
+ :type node1: dict
+ :type node2: dict
+ :return: List of strings that represent connecting link names.
+ :rtype: list
"""
logger.trace("node1: {}".format(str(node1)))
def get_first_active_connecting_link(self, node1, node2):
"""
- :param node1: Connected node
+ :param node1: Connected node.
+ :param node2: Connected node.
:type node1: dict
- :param node2: Connected node
:type node2: dict
- :return: name of link connecting the two nodes together
+ :return: Name of link connecting the two nodes together.
+ :rtype: str
:raises: RuntimeError
"""
-
connecting_links = self.get_active_connecting_links(node1, node2)
if len(connecting_links) == 0:
raise RuntimeError("No links connecting the nodes were found")
For the time being it returns links from the Node path:
TG->DUT1->DUT2->TG
- :param tgen: traffic generator node data
- :param dut1: DUT1 node data
- :param dut2: DUT2 node data
- :type tgen: dict
- :type dut1: dict
- :type dut2: dict
- :return: dictionary of possible link combinations
- the naming convention until changed to something more general is
+ The naming convention until changed to something more general is
implemented is this:
DUT1_DUT2_LINK: link name between DUT! and DUT2
DUT1_TG_LINK: link name between DUT1 and TG
domain on DUT1
DUT2_BD_LINKS: list of link names that will be connected by the bridge
domain on DUT2
+
+ :param tgen: Traffic generator node data.
+ :param dut1: DUT1 node data.
+ :param dut2: DUT2 node data.
+ :type tgen: dict
+ :type dut1: dict
+ :type dut2: dict
+ :return: Dictionary of possible link combinations.
+ :rtype: dict
"""
# TODO: replace with generic function.
dut1_dut2_link = self.get_first_active_connecting_link(dut1, dut2)
@staticmethod
def is_tg_node(node):
- """Find out whether the node is TG
+ """Find out whether the node is TG.
- :param node: node to examine
- :return: True if node is type of TG; False otherwise
+ :param node: Node to examine.
+ :type node: dict
+ :return: True if node is type of TG, otherwise False.
+ :rtype: bool
"""
return node['type'] == NodeType.TG
:param node: Node created from topology.
:type node: dict
- :return: host as 'str' type
+ :return: Hostname or IP address.
+ :rtype: str
"""
return node['host']
| | [Arguments] | ${nodes} | ${nodes_addr}
| | Nodes Clear Ipv6 Addresses | ${nodes} | ${nodes_addr}
-| Vpp nodes ra supress link layer
-| | [Documentation] | Supress ICMPv6 router advertisement message for link scope address
+| Vpp nodes ra suppress link layer
+| | [Documentation] | Suppress ICMPv6 router advertisement message for link scope address
| | [Arguments] | ${nodes}
-| | Vpp All Ra Supress Link Layer | ${nodes}
+| | Vpp All Ra Suppress Link Layer | ${nodes}
| Vpp nodes setup ipv6 routing
| | [Documentation] | Setup routing on all VPP nodes required for IPv6 tests
| | [Arguments] | ${duration} | ${rate} | ${framesize} | ${topology_type}
| | Send traffic on | ${tg} | ${duration}
| | ... | ${rate} | ${framesize} | ${topology_type}
-| | No traffic loss occured
+| | No traffic loss occurred
from resources.libraries.python.ssh import SSH
+
def load_topology(args):
"""Load topology file referenced to by parameter passed to this script.
- :param args: arguments parsed from commandline
+ :param args: Arguments parsed from commandline.
:type args: ArgumentParser().parse_args()
- :return: Python representation of topology yaml
+ :return: Python representation of topology YAML.
:rtype: dict
"""
data = None
return data
+
def ssh_no_error(ssh, cmd):
"""Execute a command over ssh channel, and log and exit if the command
- fials.
+ fails.
- :param ssh: SSH() object connected to a node
- :param cmd: Command line to execute on remote node
+ :param ssh: SSH() object connected to a node.
+ :param cmd: Command line to execute on remote node.
:type ssh: SSH() object
:type cmd: str
- :return: stdout from the SSH command
+ :return: stdout from the SSH command.
:rtype: str
"""
ret, stdo, stde = ssh.exec_command(cmd)
return stdo
+
def update_mac_addresses_for_node(node):
"""For given node loop over all ports with PCI address and look for its MAC
address.
and binds it to linux kernel driver. After the device is bound to specific
linux kernel driver the MAC address is extracted from /sys/bus/pci location
and stored within the node dictionary that was passed to this function.
- :param node: Node from topology
+
+ :param node: Node from topology.
:type node: dict
:return: None
"""
for port_name, port in node['interfaces'].items():
- if not port.has_key('driver'):
+ if 'driver' not in port:
err_msg = '{0} port {1} has no driver element, exiting'.format(
node['host'], port_name)
raise RuntimeError(err_msg)
# Then bind to the 'driver' from topology for given port
cmd = 'echo {0} | sudo tee /sys/bus/pci/drivers/{1}/bind'.\
- format(port['pci_address'], port['driver'])
+ format(port['pci_address'], port['driver'])
ssh_no_error(ssh, cmd)
# Then extract the mac address and store it in the topology
pattern = re.compile("^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$")
if not pattern.match(mac):
raise RuntimeError('MAC address read from host {0} {1} is in '
- 'bad format "{2}"'.format(node['host'],
- port['pci_address'], mac))
+ 'bad format "{2}"'
+ .format(node['host'], port['pci_address'], mac))
print '{0}: Found MAC address of PCI device {1}: {2}'.format(
node['host'], port['pci_address'], mac)
port['mac_address'] = mac
+
def update_nodes_mac_addresses(topology):
"""Loop over nodes in topology and get mac addresses for all listed ports
based on PCI addresses.
- :param topology: Topology information with nodes
+ :param topology: Topology information with nodes.
:type topology: dict
:return: None
"""
-
for node in topology['nodes'].values():
update_mac_addresses_for_node(node)
+
def dump_updated_topology(topology, args):
"""Writes or prints out updated topology file.
- :param topology: Topology information with nodes
- :param args: arguments parsed from command line
+ :param topology: Topology information with nodes.
+ :param args: Arguments parsed from command line.
:type topology: dict
:type args: ArgumentParser().parse_args()
- :return: 1 if error occured, 0 if successful
+ :return: 1 if error occurred, 0 if successful.
:rtype: int
"""
-
if args.output_file:
if not args.force:
if os.path.isfile(args.output_file):
print yaml.dump(topology, default_flow_style=False)
return 0
+
def main():
"""Main function"""
parser = ArgumentParser()
if __name__ == "__main__":
sys.exit(main())
-
-
| Variables | resources/libraries/python/IPv6NodesAddr.py | ${nodes}
| Force Tags | HW_ENV
| Suite Setup | Run Keywords | Setup ipv6 to all dut in topology | ${nodes} | ${nodes_ipv6_addr}
-| ... | AND | Vpp nodes ra supress link layer | ${nodes}
+| ... | AND | Vpp nodes ra suppress link layer | ${nodes}
| ... | AND | Vpp nodes setup ipv6 routing | ${nodes} | ${nodes_ipv6_addr}
| ... | AND | Setup all TGs before traffic script
| Test Setup | Clear interface counters on all vpp nodes in topology | ${nodes}