@staticmethod
def _get_netmask(prefix_length):
+ """Convert IPv4 network prefix length into IPV4 network mask.
+
+ :param prefix_length: Length of network prefix.
+ :type prefix_length: int
+
+ :return: Network mask.
+ :rtype: str
+ """
+
bits = 0xffffffff ^ (1 << 32 - prefix_length) - 1
return inet_ntoa(pack('>I', bits))
super(Tg, self).__init__(node_info)
def _execute(self, cmd):
+ """Executes the specified command on TG using SSH.
+
+ :param cmd: Command to be executed.
+ :type cmd: str
+
+ :return: Content of stdout and stderr returned by command.
+ :rtype: tuple
+ """
return exec_cmd_no_error(self.node_info, cmd)
def _sudo_execute(self, cmd):
+ """Executes the specified command with sudo on TG using SSH.
+
+ :param cmd: Command to be executed.
+ :type cmd: str
+
+ :return: Content of stdout and stderr returned by command.
+ :rtype: tuple
+ """
return exec_cmd_no_error(self.node_info, cmd, sudo=True)
def set_ip(self, interface, address, prefix_length):
format(network, prefix_length))
def arp_ping(self, destination_address, source_interface):
+ """Execute 'arping' command to send one ARP packet from the TG node.
+
+ :param destination_address: Destination IP address for the ARP packet.
+ :param source_interface: Name of an interface to send ARP packet from.
+ :type destination_address: str
+ :type source_interface: str
+ """
self._sudo_execute('arping -c 1 -I {} {}'.format(source_interface,
destination_address))
sw_if_index=self.get_sw_if_index(interface))
def arp_ping(self, destination_address, source_interface):
+ """Does nothing."""
pass
def flush_ip_addresses(self, interface):
host = port.get('node')
dev = port.get('if')
if host == node['host'] and dev == interface:
- ip = port.get('addr')
- if ip is not None:
- return ip
+ ip_addr = port.get('addr')
+ if ip_addr is not None:
+ return ip_addr
else:
raise Exception(
'Node {n} port {p} IPv4 address is not set'.format(