from ipaddress import IPv6Network
from topology import NodeType, Topology
from constants import Constants
-from VatExecutor import VatTerminal
+from VatExecutor import VatTerminal, VatExecutor
from robot.api import logger
def __init__(self):
pass
- def nodes_setup_ipv6_addresses(self, nodes, nodes_addr):
- """Setup IPv6 addresses on all VPP nodes in topology.
+ def nodes_set_ipv6_addresses(self, nodes, nodes_addr):
+ """Set IPv6 addresses on all VPP nodes in topology.
:param nodes: Nodes of the test topology.
:param nodes_addr: Available nodes IPv6 adresses.
:type nodes: dict
:type nodes_addr: dict
+ :return: affected interfaces as list of (node, interface) tuples
+ :rtype: list
"""
+
+ interfaces = []
+
for net in nodes_addr.values():
for port in net['ports'].values():
host = port.get('node')
self.vpp_set_if_ipv6_addr(node, port['if'], port['addr'],
net['prefix'])
+ interfaces.append((node, port['if']))
+
+ return interfaces
def nodes_clear_ipv6_addresses(self, nodes, nodes_addr):
"""Remove IPv6 addresses from all VPP nodes in topology.
:type prefix: str
"""
sw_if_index = Topology.get_interface_sw_index(node, interface)
- vat = VatTerminal(node)
- vat.vat_terminal_exec_cmd_from_template('add_ip_address.vat',
- sw_if_index=sw_if_index,
- address=addr,
- prefix_length=prefix)
- vat.vat_terminal_exec_cmd_from_template('set_if_state.vat',
- sw_if_index=sw_if_index,
- state='admin-up')
- vat.vat_terminal_close()
+ with VatTerminal(node) as vat:
+ vat.vat_terminal_exec_cmd_from_template('add_ip_address.vat',
+ sw_if_index=sw_if_index,
+ address=addr,
+ prefix_length=prefix)
+ vat.vat_terminal_exec_cmd_from_template('set_if_state.vat',
+ sw_if_index=sw_if_index,
+ state='admin-up')
ssh = SSH()
ssh.connect(node)
:type prefix: str
"""
sw_if_index = Topology.get_interface_sw_index(node, interface)
- vat = VatTerminal(node)
- vat.vat_terminal_exec_cmd_from_template('del_ip_address.vat',
- sw_if_index=sw_if_index,
- address=addr,
- prefix_length=prefix)
- vat.vat_terminal_exec_cmd_from_template('set_if_state.vat',
- sw_if_index=sw_if_index,
- state='admin-down')
- vat.vat_terminal_close()
+ with VatTerminal(node) as vat:
+ vat.vat_terminal_exec_cmd_from_template('del_ip_address.vat',
+ sw_if_index=sw_if_index,
+ address=addr,
+ prefix_length=prefix)
+ vat.vat_terminal_exec_cmd_from_template('set_if_state.vat',
+ sw_if_index=sw_if_index,
+ state='admin-down')
@staticmethod
def vpp_ra_supress_link_layer(node, interface):
:type node: dict
:type interface: str
"""
- ssh = SSH()
- ssh.connect(node)
-
- cmd = '{c}'.format(c=Constants.VAT_BIN_NAME)
- cmd_input = 'exec ip6 nd {0} ra-surpress-link-layer'.format(
- interface)
- (ret_code, _, _) = ssh.exec_command_sudo(cmd, cmd_input)
- if int(ret_code) != 0:
- raise Exception("'{0}' failed on {1}".format(cmd_input,
- node['host']))
+ sw_if_index = Topology.get_interface_sw_index(node, interface)
+ VatExecutor.cmd_from_template(node,
+ 'sw_interface_ip6nd_ra_config.vat',
+ sw_if_id=sw_if_index,
+ param='surpress')
def vpp_all_ra_supress_link_layer(self, nodes):
"""Supress ICMPv6 router advertisement message for link scope address