-# Copyright (c) 2016 Cisco and/or its affiliates.
+# Copyright (c) 2018 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@keyword('From node "${node}" interface "${port}" ARP-ping '
'IPv4 address "${ip_address}"')
def arp_ping(node, interface, ip_address):
+ """Send an ARP ping from the specified node.
+
+ :param node: Node in topology.
+ :param ip_address: Destination IP address for the ARP packet.
+ :param interface: Name of an interface to send the ARP packet from.
+ :type node: dict
+ :type ip_address: str
+ :type interface: str
+ """
log.debug('From node {} interface {} ARP-ping IPv4 address {}'.
format(Topology.get_node_hostname(node),
interface, ip_address))
:type node: dict
:type port: str
:type nodes_addr: dict
- :return: IPv4 prefix length.
+ :returns: IPv4 prefix length.
:rtype: int
"""
for net in nodes_addr.values():
- for p in net['ports'].values():
- if p['node'] == node['host'] and p['if'] == port:
+ for net_port in net['ports'].values():
+ if net_port['node'] == node['host'] and net_port['if'] == port:
return net['prefix']
raise Exception('Subnet not found for node {n} port {p}'.
:type node: dict
:type port: int
:type nodes_addr: dict
- :return: IPv4 subnet.
+ :returns: IPv4 subnet.
:rtype: str
"""
for net in nodes_addr.values():
- for p in net['ports'].values():
- if p['node'] == node['host'] and p['if'] == port:
+ for net_port in net['ports'].values():
+ if net_port['node'] == node['host'] and net_port['if'] == port:
return net['net_addr']
raise Exception('Subnet not found for node {n} port {p}'.
def flush_ip_addresses(port, node):
"""See IPv4Node.flush_ip_addresses for more information.
- :param port:
- :param node:
- :return:
+ :param port: FIXME
+ :param node: FIXME
+ :returns: FIXME
"""
get_node(node).flush_ip_addresses(port)
:param nodes_addr: Available nodes IPv4 addresses.
:type link: str
:type nodes_addr: dict
- :return: Link IPv4 address.
+ :returns: Link IPv4 address.
:rtype: str
"""
net = nodes_addr.get(link)
:param nodes_addr: Available nodes IPv4 addresses.
:type link: str
:type nodes_addr: dict
- :return: Link IPv4 address prefix.
+ :returns: Link IPv4 address prefix.
:rtype: int
"""
net = nodes_addr.get(link)
interface, ping_count, destination)
else:
cmd = 'ping -c{0} {1}'.format(ping_count, destination)
- rc, stdout, stderr = exec_cmd(node, cmd, sudo=True)
- if rc != 0:
+ ret_code, _, _ = exec_cmd(node, cmd, sudo=True)
+ if ret_code != 0:
raise RuntimeError("Ping Not Successful")
@staticmethod
- def set_linux_interface_arp(node, interface, ip, mac, namespace=None):
+ def set_linux_interface_arp(node, interface, ip_addr, mac, namespace=None):
"""Set arp on interface in linux.
:param node: Node where to execute command.
:param interface: Interface in namespace.
- :param ip: IP for arp.
+ :param ip_addr: IP address for ARP entry.
:param mac: MAC address.
:param namespace: Execute command in namespace. Optional
:type node: dict
:type interface: str
- :type ip: str
+ :type ip_addr: str
:type mac: str
:type namespace: str
:raises RuntimeError: Could not set ARP properly.
"""
if namespace is not None:
cmd = 'ip netns exec {} arp -i {} -s {} {}'.format(
- namespace, interface, ip, mac)
+ namespace, interface, ip_addr, mac)
else:
- cmd = 'arp -i {} -s {} {}'.format(interface, ip, mac)
- rc, _, stderr = exec_cmd(node, cmd, sudo=True)
- if rc != 0:
+ cmd = 'arp -i {} -s {} {}'.format(interface, ip_addr, mac)
+ ret_code, _, stderr = exec_cmd(node, cmd, sudo=True)
+ if ret_code != 0:
raise RuntimeError("Arp set not successful, reason:{}".
format(stderr))