from ssh import SSH
from ipaddress import IPv6Network
-from topology import NodeType
-from topology import Topology
+from topology import NodeType, Topology
from constants import Constants
+from VatExecutor import VatTerminal, VatExecutor
+from robot.api import logger
class IPv6Networks(object):
:type addr: str
:type prefix: str
"""
+ sw_if_index = Topology.get_interface_sw_index(node, interface)
+ vat = VatTerminal(node)
+ vat.vat_terminal_exec_cmd_from_template('add_ip_address.vat',
+ sw_if_index=sw_if_index,
+ address=addr,
+ prefix_length=prefix)
+ vat.vat_terminal_exec_cmd_from_template('set_if_state.vat',
+ sw_if_index=sw_if_index,
+ state='admin-up')
+ vat.vat_terminal_close()
+
ssh = SSH()
ssh.connect(node)
-
- cmd = '{c}'.format(c=Constants.VAT_BIN_NAME)
- cmd_input = 'sw_interface_add_del_address {dev} {ip}/{p}'.format(
- dev=interface, ip=addr, p=prefix)
- (ret_code, _, _) = ssh.exec_command_sudo(cmd, cmd_input)
- if int(ret_code) != 0:
- raise Exception('VPP sw_interface_add_del_address failed on {h}'
- .format(h=node['host']))
-
- cmd_input = 'sw_interface_set_flags {dev} admin-up'.format(
- dev=interface)
- (ret_code, _, _) = ssh.exec_command_sudo(cmd, cmd_input)
- if int(ret_code) != 0:
- raise Exception('VPP sw_interface_set_flags failed on {h}'.format(
- h=node['host']))
+ cmd_input = 'exec show int'
+ (ret_code, stdout, stderr) = ssh.exec_command_sudo(
+ Constants.VAT_BIN_NAME, cmd_input)
+ logger.debug('ret: {0}'.format(ret_code))
+ logger.debug('stdout: {0}'.format(stdout))
+ logger.debug('stderr: {0}'.format(stderr))
@staticmethod
def vpp_del_if_ipv6_addr(node, interface, addr, prefix):
:type addr: str
:type prefix: str
"""
- ssh = SSH()
- ssh.connect(node)
-
- cmd = '{c}'.format(c=Constants.VAT_BIN_NAME)
- cmd_input = 'sw_interface_add_del_address {dev} {ip}/{p} del'.format(
- dev=interface, ip=addr, p=prefix)
- (ret_code, _, _) = ssh.exec_command_sudo(cmd, cmd_input)
- if int(ret_code) != 0:
- raise Exception(
- 'sw_interface_add_del_address failed on {h}'.
- format(h=node['host']))
-
- cmd_input = 'sw_interface_set_flags {dev} admin-down'.format(
- dev=interface)
- (ret_code, _, _) = ssh.exec_command_sudo(cmd, cmd_input)
- if int(ret_code) != 0:
- raise Exception('VPP sw_interface_set_flags failed on {h}'.format(
- h=node['host']))
+ sw_if_index = Topology.get_interface_sw_index(node, interface)
+ vat = VatTerminal(node)
+ vat.vat_terminal_exec_cmd_from_template('del_ip_address.vat',
+ sw_if_index=sw_if_index,
+ address=addr,
+ prefix_length=prefix)
+ vat.vat_terminal_exec_cmd_from_template('set_if_state.vat',
+ sw_if_index=sw_if_index,
+ state='admin-down')
+ vat.vat_terminal_close()
@staticmethod
def vpp_ra_supress_link_layer(node, interface):
:type node: dict
:type interface: str
"""
- ssh = SSH()
- ssh.connect(node)
-
- cmd = '{c}'.format(c=Constants.VAT_BIN_NAME)
- cmd_input = 'exec ip6 nd {0} ra-surpress-link-layer'.format(
- interface)
- (ret_code, _, _) = ssh.exec_command_sudo(cmd, cmd_input)
- if int(ret_code) != 0:
- raise Exception("'{0}' failed on {1}".format(cmd_input,
- node['host']))
+ sw_if_index = Topology.get_interface_sw_index(node, interface)
+ VatExecutor.cmd_from_template(node,
+ 'sw_interface_ip6nd_ra_config.vat',
+ sw_if_id=sw_if_index,
+ param='surpress')
def vpp_all_ra_supress_link_layer(self, nodes):
"""Supress ICMPv6 router advertisement message for link scope address