- :type interface: str
- :return: nothing
- """
- pass
-
- @abstractmethod
- def flush_ip_addresses(self, interface):
- """Flush all IPv4 addresses from specified interface
- :param interface: interface name
- :type interface: str
- :return: nothing
- """
- pass
-
- @abstractmethod
- def ping(self, destination_address, source_interface):
- """Send an ICMP request to destination node
- :param destination_address: address to send the ICMP request
- :param source_interface:
- :type destination_address: str
- :type source_interface: str
- :return: nothing
- """
- pass
-
-
-class Tg(IPv4Node):
- """Traffic generator node"""
- def __init__(self, node_info):
- super(Tg, self).__init__(node_info)
-
- def _execute(self, cmd):
- return ssh.exec_cmd_no_error(self.node_info, cmd)
-
- def _sudo_execute(self, cmd):
- return ssh.exec_cmd_no_error(self.node_info, cmd, sudo=True)
-
- def set_ip(self, interface, address, prefix_length):
- cmd = 'ip -4 addr flush dev {}'.format(interface)
- self._sudo_execute(cmd)
- cmd = 'ip addr add {}/{} dev {}'.format(address, prefix_length,
- interface)
- self._sudo_execute(cmd)
-
- # TODO: not ipv4-specific, move to another class
- def set_interface_state(self, interface, state):
- cmd = 'ip link set {} {}'.format(interface, state)
- self._sudo_execute(cmd)
-
- def set_route(self, network, prefix_length, gateway, interface):
- netmask = self._get_netmask(prefix_length)
- cmd = 'route add -net {} netmask {} gw {}'.\
- format(network, netmask, gateway)
- self._sudo_execute(cmd)
-
- def unset_route(self, network, prefix_length, gateway, interface):
- self._sudo_execute('ip route delete {}/{}'.
- format(network, prefix_length))
-
- def arp_ping(self, destination_address, source_interface):
- self._sudo_execute('arping -c 1 -I {} {}'.format(source_interface,
- destination_address))
-
- def ping(self, destination_address, source_interface):
- self._execute('ping -c 1 -w 5 -I {} {}'.format(source_interface,
- destination_address))
-
- def flush_ip_addresses(self, interface):
- self._sudo_execute('ip addr flush dev {}'.format(interface))
-
-
-class Dut(IPv4Node):
- """Device under test"""
- def __init__(self, node_info):
- super(Dut, self).__init__(node_info)
-
- def get_sw_if_index(self, interface):
- """Get sw_if_index of specified interface from current node
- :param interface: interface name
- :type interface: str
- :return: sw_if_index of 'int' type
- """
- return Topology().get_interface_sw_index(self.node_info, interface)
-
- def exec_vat(self, script, **args):
- """Wrapper for VAT executor.
- :param script: script to execute
- :param args: parameters to the script
- :type script: str
- :type args: dict
- :return: nothing
- """
- # TODO: check return value
- VatExecutor.cmd_from_template(self.node_info, script, **args)
-
- def set_ip(self, interface, address, prefix_length):
- self.exec_vat('add_ip_address.vat',
- sw_if_index=self.get_sw_if_index(interface),
- address=address, prefix_length=prefix_length)
-
- def set_interface_state(self, interface, state):
- if state == 'up':
- state = 'admin-up link-up'
- elif state == 'down':
- state = 'admin-down link-down'
- else:
- raise Exception('Unexpected interface state: {}'.format(state))
-
- self.exec_vat('set_if_state.vat',
- sw_if_index=self.get_sw_if_index(interface), state=state)
-
- def set_route(self, network, prefix_length, gateway, interface):
- sw_if_index = self.get_sw_if_index(interface)
- self.exec_vat('add_route.vat',
- network=network, prefix_length=prefix_length,
- gateway=gateway, sw_if_index=sw_if_index)
-
- def unset_route(self, network, prefix_length, gateway, interface):
- self.exec_vat('del_route.vat', network=network,
- prefix_length=prefix_length, gateway=gateway,
- sw_if_index=self.get_sw_if_index(interface))
-
- def arp_ping(self, destination_address, source_interface):
- pass
-
- def flush_ip_addresses(self, interface):
- self.exec_vat('flush_ip_addresses.vat',
- sw_if_index=self.get_sw_if_index(interface))
-
- def ping(self, destination_address, source_interface):
- pass
-
-
-def get_node(node_info):
- """Creates a class instance derived from Node based on type.
- :param node_info: dictionary containing information on nodes in topology
- :return: Class instance that is derived from Node
- """
- if node_info['type'] == NodeType.TG:
- return Tg(node_info)
- elif node_info['type'] == NodeType.DUT:
- return Dut(node_info)
- else:
- raise NotImplementedError('Node type "{}" unsupported!'.
- format(node_info['type']))
-
-
-def get_node_hostname(node_info):
- """Get string identifying specifed node.
- :param node_info: Node in the topology.
- :type node_info: Dict
- :return: String identifying node.
- """
- return node_info['host']
-
-
-class IPv4Util(object):
- """Implements keywords for IPv4 tests."""
-
- ADDRESSES = {} # holds configured IPv4 addresses
- PREFIXES = {} # holds configured IPv4 addresses' prefixes
- SUBNETS = {} # holds configured IPv4 addresses' subnets
-
- """
- Helper dictionary used when setting up ipv4 addresses in topology
-
- Example value:
- 'link1': { b'port1': {b'addr': b'192.168.3.1'},
- b'port2': {b'addr': b'192.168.3.2'},
- b'prefix': 24,
- b'subnet': b'192.168.3.0'}
- """
- topology_helper = None
-
- @staticmethod
- def next_address(subnet):
- """Get next unused IPv4 address from a subnet
- :param subnet: holds available IPv4 addresses
- :return: tuple (ipv4_address, prefix_length)
- """
- for i in range(1, 4):
- # build a key and try to get it from address dictionary
- interface = 'port{}'.format(i)
- if interface in subnet:
- addr = subnet[interface]['addr']
- del subnet[interface]
- return addr, subnet['prefix']
- raise Exception('Not enough ipv4 addresses in subnet')
-
- @staticmethod
- def next_network(nodes_addr):
- """Get next unused network from dictionary
- :param nodes_addr: dictionary of available networks
- :return: dictionary describing an IPv4 subnet with addresses
- """
- assert_not_equal(len(nodes_addr), 0, 'Not enough networks')
- _, subnet = nodes_addr.popitem()
- return subnet
-
- @staticmethod
- def configure_ipv4_addr_on_node(node, nodes_addr):
- """Configure IPv4 address for all interfaces on a node in topology
- :param node: dictionary containing information about node
- :param nodes_addr: dictionary containing IPv4 addresses
- :return:
- """
- for interface, interface_data in node['interfaces'].iteritems():
- if interface == 'mgmt':
- continue
- if interface_data['link'] not in IPv4Util.topology_helper:
- IPv4Util.topology_helper[interface_data['link']] = \
- IPv4Util.next_network(nodes_addr)
-
- network = IPv4Util.topology_helper[interface_data['link']]
- address, prefix = IPv4Util.next_address(network)
-
- get_node(node).set_ip(interface_data['name'], address, prefix)
- key = (get_node_hostname(node), interface_data['name'])
- IPv4Util.ADDRESSES[key] = address
- IPv4Util.PREFIXES[key] = prefix
- IPv4Util.SUBNETS[key] = network['subnet']
-
- @staticmethod
- def nodes_setup_ipv4_addresses(nodes_info, nodes_addr):
- """Configure IPv4 addresses on all non-management interfaces for each
- node in nodes_info
- :param nodes_info: dictionary containing information on all nodes
- in topology
- :param nodes_addr: dictionary containing IPv4 addresses
- :return: nothing
- """
- IPv4Util.topology_helper = {}
- # make a deep copy of nodes_addr because of modifications
- nodes_addr_copy = copy.deepcopy(nodes_addr)
- for _, node in nodes_info.iteritems():
- IPv4Util.configure_ipv4_addr_on_node(node, nodes_addr_copy)
-
- @staticmethod
- def nodes_clear_ipv4_addresses(nodes):
- """Clear all addresses from all nodes in topology
- :param nodes: dictionary containing information on all nodes
- :return: nothing
- """
- for _, node in nodes.iteritems():
- for interface, interface_data in node['interfaces'].iteritems():
- if interface == 'mgmt':
- continue
- IPv4Util.flush_ip_addresses(interface_data['name'], node)
-
- # TODO: not ipv4-specific, move to another class
- @staticmethod
- @keyword('Node "${node}" interface "${interface}" is in "${state}" state')
- def set_interface_state(node, interface, state):
- """See IPv4Node.set_interface_state for more information.
- :param node:
- :param interface:
- :param state:
- :return:
- """
- log.debug('Node {} interface {} is in {} state'.format(
- get_node_hostname(node), interface, state))
- get_node(node).set_interface_state(interface, state)
-
- @staticmethod
- @keyword('Node "${node}" interface "${port}" has IPv4 address '
- '"${address}" with prefix length "${prefix_length}"')
- def set_interface_address(node, interface, address, length):
- """See IPv4Node.set_ip for more information.
- :param node:
- :param interface:
- :param address:
- :param length:
- :return:
- """
- log.debug('Node {} interface {} has IPv4 address {} with prefix '
- 'length {}'.format(get_node_hostname(node), interface,
- address, length))
- get_node(node).set_ip(interface, address, int(length))
- hostname = get_node_hostname(node)
- IPv4Util.ADDRESSES[hostname, interface] = address
- IPv4Util.PREFIXES[hostname, interface] = int(length)
- # TODO: Calculate subnet from ip address and prefix length.
- # IPv4Util.SUBNETS[hostname, interface] =
-
- @staticmethod
- @keyword('From node "${node}" interface "${port}" ARP-ping '
- 'IPv4 address "${ip_address}"')
- def arp_ping(node, interface, ip_address):
- log.debug('From node {} interface {} ARP-ping IPv4 address {}'.
- format(get_node_hostname(node), interface, ip_address))
- get_node(node).arp_ping(ip_address, interface)
-
- @staticmethod
- @keyword('Node "${node}" routes to IPv4 network "${network}" with prefix '
- 'length "${prefix_length}" using interface "${interface}" via '
- '"${gateway}"')
- def set_route(node, network, prefix_length, interface, gateway):
- """See IPv4Node.set_route for more information.
- :param node:
- :param network:
- :param prefix_length:
- :param interface:
- :param gateway:
- :return: