# See the License for the specific language governing permissions and
# limitations under the License.
+"""Classify utilities library."""
from robot.api import logger
from resources.libraries.python.VatExecutor import VatExecutor, VatTerminal
+# pylint: disable=too-many-arguments, invalid-name
+
class Classify(object):
"""Classify utilities."""
return table_index, skip_n, match_n
@staticmethod
- def vpp_configures_classify_session_l3(node, acl_method, table_index, skip_n,
- match_n, ip_version, direction,
- address):
+ def vpp_configures_classify_session_l3(node, acl_method, table_index,
+ skip_n, match_n, ip_version,
+ direction, address):
"""Configuration of classify session for IP address filtering.
:param node: VPP node to setup classify session.
address=address)
@staticmethod
- def vpp_configures_classify_session_l2(node, acl_method, table_index, skip_n,
- match_n, direction, address):
+ def vpp_configures_classify_session_l2(node, acl_method, table_index,
+ skip_n, match_n, direction, address):
"""Configuration of classify session for MAC address filtering.
:param node: VPP node to setup classify session.
:type address: str
"""
with VatTerminal(node) as vat:
- vat.vat_terminal_exec_cmd_from_template("classify_add_session_l2.vat",
- acl_method=acl_method,
- table_index=table_index,
- skip_n=skip_n,
- match_n=match_n,
- direction=direction,
- address=address)
+ vat.vat_terminal_exec_cmd_from_template(
+ "classify_add_session_l2.vat",
+ acl_method=acl_method,
+ table_index=table_index,
+ skip_n=skip_n,
+ match_n=match_n,
+ direction=direction,
+ address=address)
@staticmethod
def vpp_configures_classify_session_hex(node, acl_method, table_index,
- skip_n, match_n, hex_value):
+ skip_n, match_n, hex_value):
"""Configuration of classify session with hex value.
:param node: VPP node to setup classify session.
# See the License for the specific language governing permissions and
# limitations under the License.
+"""DUT setup library."""
+
from robot.api import logger
from resources.libraries.python.topology import NodeType
class DUTSetup(object):
+ """Contains methods for setting up DUTs."""
@staticmethod
def start_vpp_service_on_all_duts(nodes):
"""Start up the VPP service on all nodes."""
ssh.connect(node)
(ret_code, stdout, stderr) = \
ssh.exec_command_sudo('service vpp restart')
- if 0 != int(ret_code):
+ if int(ret_code) != 0:
logger.debug('stdout: {0}'.format(stdout))
logger.debug('stderr: {0}'.format(stderr))
raise Exception('DUT {0} failed to start VPP service'.
@staticmethod
def setup_dut(node):
+ """Run script over SSH to setup the DUT node.
+
+ :param node: DUT node to set up.
+ :type node: dict
+
+ :raises Exception: If the DUT setup fails.
+ """
ssh = SSH()
ssh.connect(node)
Constants.REMOTE_FW_DIR, Constants.RESOURCES_LIB_SH))
logger.trace(stdout)
logger.trace(stderr)
- if 0 != int(ret_code):
+ if int(ret_code) != 0:
logger.debug('DUT {0} setup script failed: "{1}"'.
format(node['host'], stdout + stderr))
raise Exception('DUT test setup script failed at node {}'.
@staticmethod
def _get_netmask(prefix_length):
+ """Convert IPv4 network prefix length into IPV4 network mask.
+
+ :param prefix_length: Length of network prefix.
+ :type prefix_length: int
+
+ :return: Network mask.
+ :rtype: str
+ """
+
bits = 0xffffffff ^ (1 << 32 - prefix_length) - 1
return inet_ntoa(pack('>I', bits))
super(Tg, self).__init__(node_info)
def _execute(self, cmd):
+ """Executes the specified command on TG using SSH.
+
+ :param cmd: Command to be executed.
+ :type cmd: str
+
+ :return: Content of stdout and stderr returned by command.
+ :rtype: tuple
+ """
return exec_cmd_no_error(self.node_info, cmd)
def _sudo_execute(self, cmd):
+ """Executes the specified command with sudo on TG using SSH.
+
+ :param cmd: Command to be executed.
+ :type cmd: str
+
+ :return: Content of stdout and stderr returned by command.
+ :rtype: tuple
+ """
return exec_cmd_no_error(self.node_info, cmd, sudo=True)
def set_ip(self, interface, address, prefix_length):
format(network, prefix_length))
def arp_ping(self, destination_address, source_interface):
+ """Execute 'arping' command to send one ARP packet from the TG node.
+
+ :param destination_address: Destination IP address for the ARP packet.
+ :param source_interface: Name of an interface to send ARP packet from.
+ :type destination_address: str
+ :type source_interface: str
+ """
self._sudo_execute('arping -c 1 -I {} {}'.format(source_interface,
destination_address))
sw_if_index=self.get_sw_if_index(interface))
def arp_ping(self, destination_address, source_interface):
+ """Does nothing."""
pass
def flush_ip_addresses(self, interface):
host = port.get('node')
dev = port.get('if')
if host == node['host'] and dev == interface:
- ip = port.get('addr')
- if ip is not None:
- return ip
+ ip_addr = port.get('addr')
+ if ip_addr is not None:
+ return ip_addr
else:
raise Exception(
'Node {n} port {p} IPv4 address is not set'.format(
@keyword('From node "${node}" interface "${port}" ARP-ping '
'IPv4 address "${ip_address}"')
def arp_ping(node, interface, ip_address):
+ """Send an ARP ping from the specified node.
+
+ :param node: Node in topology.
+ :param ip_address: Destination IP address for the ARP packet.
+ :param interface: Name of an interface to send the ARP packet from.
+ :type node: dict
+ :type ip_address: str
+ :type interface: str
+ """
log.debug('From node {} interface {} ARP-ping IPv4 address {}'.
format(Topology.get_node_hostname(node),
interface, ip_address))
:rtype: int
"""
for net in nodes_addr.values():
- for p in net['ports'].values():
- if p['node'] == node['host'] and p['if'] == port:
+ for net_port in net['ports'].values():
+ if net_port['node'] == node['host'] and net_port['if'] == port:
return net['prefix']
raise Exception('Subnet not found for node {n} port {p}'.
:rtype: str
"""
for net in nodes_addr.values():
- for p in net['ports'].values():
- if p['node'] == node['host'] and p['if'] == port:
+ for net_port in net['ports'].values():
+ if net_port['node'] == node['host'] and net_port['if'] == port:
return net['net_addr']
raise Exception('Subnet not found for node {n} port {p}'.
interface, ping_count, destination)
else:
cmd = 'ping -c{0} {1}'.format(ping_count, destination)
- rc, stdout, stderr = exec_cmd(node, cmd, sudo=True)
- if rc != 0:
+ ret_code, _, _ = exec_cmd(node, cmd, sudo=True)
+ if ret_code != 0:
raise RuntimeError("Ping Not Successful")
@staticmethod
- def set_linux_interface_arp(node, interface, ip, mac, namespace=None):
+ def set_linux_interface_arp(node, interface, ip_addr, mac, namespace=None):
"""Set arp on interface in linux.
:param node: Node where to execute command.
:param interface: Interface in namespace.
- :param ip: IP for arp.
+ :param ip_addr: IP address for ARP entry.
:param mac: MAC address.
:param namespace: Execute command in namespace. Optional
:type node: dict
:type interface: str
- :type ip: str
+ :type ip_addr: str
:type mac: str
:type namespace: str
:raises RuntimeError: Could not set ARP properly.
"""
if namespace is not None:
cmd = 'ip netns exec {} arp -i {} -s {} {}'.format(
- namespace, interface, ip, mac)
+ namespace, interface, ip_addr, mac)
else:
- cmd = 'arp -i {} -s {} {}'.format(interface, ip, mac)
- rc, _, stderr = exec_cmd(node, cmd, sudo=True)
- if rc != 0:
+ cmd = 'arp -i {} -s {} {}'.format(interface, ip_addr, mac)
+ ret_code, _, stderr = exec_cmd(node, cmd, sudo=True)
+ if ret_code != 0:
raise RuntimeError("Arp set not successful, reason:{}".
format(stderr))
cmd = "ping6 -c {c} -s {s} -W {W} {dst}".format(c=count, s=data_size,
W=timeout,
dst=dst_addr)
- (ret_code, stdout, _) = ssh.exec_command(cmd)
+ (_, stdout, _) = ssh.exec_command(cmd)
regex = re.compile(r'(\d+) packets transmitted, (\d+) received')
match = regex.search(stdout)
host = port.get('node')
dev = port.get('if')
if host == node['host'] and dev == interface:
- ip = port.get('addr')
- if ip is not None:
- return ip
+ ip_addr = port.get('addr')
+ if ip_addr is not None:
+ return ip_addr
else:
raise Exception(
'Node {n} port {p} IPv6 address is not set'.format(
seid=seid,
seid_prefix=seid_prefix,
rloc=rloc)
+
+
class LispAdjacency(object):
"""Class for lisp adjacency API."""
:param node: VPP node.
:param vni: Vni.
:param deid: Destination eid address.
- :param deid_predix: Destination eid address prefix_len.
+ :param deid_prefix: Destination eid address prefix_len.
:param seid: Source eid address.
:param seid_prefix: Source eid address prefix_len.
:type node: dict
:param node: VPP node.
:param vni: Vni.
:param deid: Destination eid address.
- :param deid_predix: Destination eid address prefix_len.
+ :param deid_prefix: Destination eid address prefix_len.
:param seid: Source eid address.
:param seid_prefix: Source eid address prefix_len.
:type node: dict
@staticmethod
def add_lisp_gpe_forward_entry(node, *args):
+ """Not implemented"""
# TODO: Implement when VPP-334 is fixed.
pass
@staticmethod
def del_lisp_gpe_forward_entry(node, *args):
+ """Not implemented"""
# TODO: Implement when VPP-334 is fixed.
pass
"""Linux namespace utilities library."""
-from resources.libraries.python.ssh import exec_cmd_no_error, exec_cmd, SSH
+from resources.libraries.python.ssh import exec_cmd_no_error, exec_cmd
class Namespaces(object):
:raises RuntimeError: Interface could not be attached.
"""
cmd = 'ip link set {0} netns {1}'.format(interface, namespace)
- (rc, _, stderr) = exec_cmd(node, cmd, timeout=5, sudo=True)
- if rc != 0:
+ (ret_code, _, stderr) = exec_cmd(node, cmd, timeout=5, sudo=True)
+ if ret_code != 0:
raise RuntimeError(
'Could not attach interface, reason:{}'.format(stderr))
cmd = 'ip netns exec {} ip link set {} up'.format(
namespace, interface)
- (rc, _, stderr) = exec_cmd(node, cmd, timeout=5, sudo=True)
- if rc != 0:
+ (ret_code, _, stderr) = exec_cmd(node, cmd, timeout=5, sudo=True)
+ if ret_code != 0:
raise RuntimeError(
'Could not set interface state, reason:{}'.format(stderr))
for namespace in self._namespaces:
print "Cleaning namespace {}".format(namespace)
cmd = 'ip netns delete {}'.format(namespace)
- (rc, stdout, stderr) = exec_cmd(node, cmd, timeout=5, sudo=True)
- if rc != 0:
+ (ret_code, _, _) = exec_cmd(node, cmd, timeout=5, sudo=True)
+ if ret_code != 0:
raise RuntimeError('Could not delete namespace')
import select
from scapy.all import ETH_P_IP, ETH_P_IPV6, ETH_P_ALL, ETH_P_ARP
-from scapy.all import Ether, ARP, Packet
+from scapy.all import Ether, ARP
from scapy.layers.inet6 import IPv6
__all__ = ['RxQueue', 'TxQueue', 'Interface', 'create_gratuitous_arp_request',
class Interface(object):
+ """Class for network interfaces. Contains methods for sending and receiving
+ packets."""
def __init__(self, if_name):
+ """Initialize the interface class.
+
+ :param if_name: Name of the interface.
+ :type if_name: str
+ """
self.if_name = if_name
self.sent_packets = []
self.rxq = RxQueue(if_name)
self.txq = TxQueue(if_name)
def send_pkt(self, pkt):
+ """Send the provided packet out the interface."""
self.sent_packets.append(pkt)
self.txq.send(pkt)
def recv_pkt(self, timeout=3):
+ """Read one packet from the interface's receive queue.
+
+ :param timeout: Timeout value in seconds.
+ :type timeout: int
+ :return: Ether() initialized object from packet data.
+ :rtype: scapy.Ether
+ """
return self.rxq.recv(timeout, self.sent_packets)
'{ \\"execute\\": \\"' + cmd + '\\" }" | sudo -S nc -U ' + \
self.__QMP_SOCK
(ret_code, stdout, stderr) = self._ssh.exec_command(qmp_cmd)
- if 0 != int(ret_code):
+ if int(ret_code) != 0:
logger.debug('QMP execute failed {0}'.format(stderr))
- raise RuntimeError('QMP execute "{0}" failed on {1}'.format(cmd,
- self._node['host']))
+ raise RuntimeError('QMP execute "{0}"'
+ ' failed on {1}'.format(cmd, self._node['host']))
logger.trace(stdout)
# Skip capabilities negotiation messages.
out_list = stdout.splitlines()
qga_cmd = 'printf "\xFF" | sudo -S nc ' \
'-q 1 -U ' + self.__QGA_SOCK
(ret_code, stdout, stderr) = self._ssh.exec_command(qga_cmd)
- if 0 != int(ret_code):
+ if int(ret_code) != 0:
logger.debug('QGA execute failed {0}'.format(stderr))
- raise RuntimeError('QGA execute "{0}" failed on {1}'.format(cmd,
- self._node['host']))
+ raise RuntimeError('QGA execute "{0}" '
+ 'failed on {1}'.format(cmd, self._node['host']))
logger.trace(stdout)
if not stdout:
return {}
qga_cmd = 'echo "{ \\"execute\\": \\"' + cmd + '\\" }" | sudo -S nc ' \
'-q 1 -U ' + self.__QGA_SOCK
(ret_code, stdout, stderr) = self._ssh.exec_command(qga_cmd)
- if 0 != int(ret_code):
+ if int(ret_code) != 0:
logger.debug('QGA execute failed {0}'.format(stderr))
- raise RuntimeError('QGA execute "{0}" failed on {1}'.format(cmd,
- self._node['host']))
+ raise RuntimeError('QGA execute "{0}"'
+ ' failed on {1}'.format(cmd, self._node['host']))
logger.trace(stdout)
if not stdout:
return {}
self._node['host']))
# If we do not want to allocate dynamicaly end with error
else:
- raise RuntimeError('Not enough free huge pages: {0}, '
- '{1} MB'.format(huge_free, huge_free * huge_size))
+ raise RuntimeError(
+ 'Not enough free huge pages: {0}, '
+ '{1} MB'.format(huge_free, huge_free * huge_size)
+ )
# Check if huge pages mount point exist
has_huge_mnt = False
(_, output, _) = self._ssh.exec_command('cat /proc/mounts')
# Memory and huge pages
mem = '-object memory-backend-file,id=mem,size={0}M,mem-path={1},' \
'share=on -m {0} -numa node,memdev=mem'.format(
- self._qemu_opt.get('mem_size'), self._qemu_opt.get('huge_mnt'))
+ self._qemu_opt.get('mem_size'), self._qemu_opt.get('huge_mnt'))
# By default check only if hugepages are availbale.
# If 'huge_allocate' is set to true try to allocate as well.
# Setup serial console
serial = '-chardev socket,host=127.0.0.1,port={0},id=gnc0,server,' \
'nowait -device isa-serial,chardev=gnc0'.format(
- self._qemu_opt.get('serial_port'))
+ self._qemu_opt.get('serial_port'))
# Setup QGA via chardev (unix socket) and isa-serial channel
qga = '-chardev socket,path=/tmp/qga.sock,server,nowait,id=qga0 ' \
'-device isa-serial,chardev=qga0'
out = self._qemu_qmp_exec('system_powerdown')
err = out.get('error')
if err is not None:
- raise RuntimeError('QEMU system powerdown failed on {0}, '
- 'error: {1}'.format(self._node['host'], json.dumps(err)))
+ raise RuntimeError(
+ 'QEMU system powerdown failed on {0}, '
+ 'error: {1}'.format(self._node['host'], json.dumps(err))
+ )
def qemu_system_reset(self):
"""Reset the system."""
out = self._qemu_qmp_exec('system_reset')
err = out.get('error')
if err is not None:
- raise RuntimeError('QEMU system reset failed on {0}, '
+ raise RuntimeError(
+ 'QEMU system reset failed on {0}, '
'error: {1}'.format(self._node['host'], json.dumps(err)))
def qemu_kill(self):
return ret.get('status')
else:
err = out.get('error')
- raise RuntimeError('QEMU query-status failed on {0}, '
+ raise RuntimeError(
+ 'QEMU query-status failed on {0}, '
'error: {1}'.format(self._node['host'], json.dumps(err)))
@staticmethod
ssh.exec_command('sudo -Sn bash {0}/{1}/qemu_build.sh'.format(
Constants.REMOTE_FW_DIR, Constants.RESOURCES_LIB_SH), 1000)
logger.trace(stdout)
- if 0 != int(ret_code):
+ if int(ret_code) != 0:
logger.debug('QEMU build failed {0}'.format(stderr))
raise RuntimeError('QEMU build failed on {0}'.format(node['host']))
from resources.libraries.python.topology import Topology
from resources.libraries.python.ssh import exec_cmd_no_error
+
class Routing(object):
"""Routing utilities."""
+ # pylint: disable=too-many-arguments
@staticmethod
def vpp_route_add(node, network, prefix_len, gateway=None,
interface=None, use_sw_index=True, resolve_attempts=10,
where=place)
@staticmethod
- def add_route(node, ip, prefix, gw, namespace=None):
+ def add_route(node, ip_addr, prefix, gateway, namespace=None):
"""Add route in namespace.
:param node: Node where to execute command.
- :param ip: Route destination IP.
+ :param ip_addr: Route destination IP address.
:param prefix: IP prefix.
- :param namespace: Execute command in namespace. Optional
- :param gw: Gateway.
+ :param namespace: Execute command in namespace. Optional.
+ :param gateway: Gateway address.
:type node: dict
- :type ip: str
+ :type ip_addr: str
:type prefix: int
- :type gw: str
+ :type gateway: str
:type namespace: str
"""
if namespace is not None:
cmd = 'ip netns exec {} ip route add {}/{} via {}'.format(
- namespace, ip, prefix, gw)
+ namespace, ip_addr, prefix, gateway)
else:
- cmd = 'ip route add {}/{} via {}'.format(ip, prefix, gw)
+ cmd = 'ip route add {}/{} via {}'.format(ip_addr, prefix, gateway)
exec_cmd_no_error(node, cmd, sudo=True)
logger.debug(stderr)
return_code = proc.wait()
- if 0 != return_code:
+ if return_code != 0:
raise Exception("Could not pack testing framework.")
return file_name
cmd = 'sudo rm -rf {1}; mkdir {1} ; tar -zxf {0} -C {1}; ' \
'rm -f {0}'.format(tarball, con.REMOTE_FW_DIR)
(ret_code, _, stderr) = ssh.exec_command(cmd, timeout=30)
- if 0 != ret_code:
+ if ret_code != 0:
logger.error('Unpack error: {0}'.format(stderr))
raise Exception('Failed to unpack {0} at node {1}'.format(
tarball, node['host']))
'. env/bin/activate && '
'pip install -r requirements.txt'
.format(con.REMOTE_FW_DIR), timeout=100)
- if 0 != ret_code:
+ if ret_code != 0:
logger.error('Virtualenv creation error: {0}'.format(stdout + stderr))
raise Exception('Virtualenv setup failed')
else:
# See the License for the specific language governing permissions and
# limitations under the License.
+"""VAT executor library."""
+
import json
from robot.api import logger
class VatExecutor(object):
+ """Contains methods for executing VAT commands on DUTs."""
def __init__(self):
self._stdout = None
self._stderr = None
Constants.RESOURCES_TPL_VAT,
vat_name)
# TODO this overwrites the output if the vat script has been used twice
- remote_file_out = remote_file_path + ".out"
+ # remote_file_out = remote_file_path + ".out"
cmd = "sudo -S {vat} {json} < {input}".format(
vat=Constants.VAT_BIN_NAME,
# self._delete_files(node, remote_file_path, remote_file_out)
def execute_script_json_out(self, vat_name, node, timeout=10):
+ """Pass all arguments to 'execute_script' method, then cleanup returned
+ json output."""
self.execute_script(vat_name, node, timeout, json_out=True)
self._stdout = cleanup_vat_json_output(self._stdout)
@staticmethod
def _delete_files(node, *files):
+ """Use SSH to delete the specified files on node.
+
+ :param node: Node in topology.
+ :param files: Files to delete.
+ :type node: dict
+ :type files: iterable
+ """
+
ssh = SSH()
ssh.connect(node)
files = " ".join([str(x) for x in files])
ssh.exec_command("rm {0}".format(files))
def script_should_have_failed(self):
+ """Read return code from last executed script and raise exception if the
+ script didn't fail."""
if self._ret_code is None:
raise Exception("First execute the script!")
if self._ret_code == 0:
"Script execution passed, but failure was expected")
def script_should_have_passed(self):
+ """Read return code from last executed script and raise exception if the
+ script failed."""
if self._ret_code is None:
raise Exception("First execute the script!")
if self._ret_code != 0:
"Script execution failed, but success was expected")
def get_script_stdout(self):
+ """Returns value of stdout from last executed script."""
return self._stdout
def get_script_stderr(self):
+ """Returns value of stderr from last executed script."""
return self._stderr
@staticmethod
:type err_msg: str
:raises RuntimeError: If VAT command return value is incorrect.
"""
- if type(vat_out) is dict:
+ if isinstance(vat_out, dict):
retval = vat_out.get('retval')
if retval is not None:
if retval != exp_retval:
# See the License for the specific language governing permissions and
# limitations under the License.
+"""Library for SSH connection management."""
+
import StringIO
from time import time, sleep
from paramiko import RSAKey
from paramiko.ssh_exception import SSHException
from scp import SCPClient
-from interruptingcow import timeout
+from interruptingcow import timeout as icTimeout
from robot.api import logger
from robot.utils.asserts import assert_equal
class SSH(object):
+ """Contains methods for managing and using SSH connections."""
__MAX_RECV_BUF = 10*1024*1024
__existing_connections = {}
@staticmethod
def _node_hash(node):
+ """Get IP address and port hash from node dictionary.
+
+ :param node: Node in topology.
+ :type node: dict
+ :return: IP address and port for the specified node.
+ :rtype: int
+ """
+
return hash(frozenset([node['host'], node['port']]))
def connect(self, node):
pkey = None
if 'priv_key' in node:
pkey = RSAKey.from_private_key(
- StringIO.StringIO(node['priv_key']))
+ StringIO.StringIO(node['priv_key']))
self._ssh = paramiko.SSHClient()
self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.close()
def _reconnect(self):
+ """Close the SSH connection and open it again."""
+
node = self._node
self.disconnect(node)
self.connect(node)
"""Execute SSH command on a new channel on the connected Node.
:param cmd: Command to run on the Node.
- :param timeout: Maximal time in seconds to wait while the command is
- done. If is None then wait forever.
+ :param timeout: Maximal time in seconds to wait until the command is
+ done. If set to None then wait forever.
:type cmd: str
:type timeout: int
:return return_code, stdout, stderr
buf = ''
try:
- with timeout(time_out, exception=RuntimeError):
+ with icTimeout(time_out, exception=RuntimeError):
while not buf.endswith(':~$ '):
if chan.recv_ready():
buf = chan.recv(4096)
chan.sendall('{c}\n'.format(c=cmd))
buf = ''
try:
- with timeout(time_out, exception=RuntimeError):
+ with icTimeout(time_out, exception=RuntimeError):
while not buf.endswith(prompt):
if chan.recv_ready():
buf += chan.recv(4096)
ssh = SSH()
try:
ssh.connect(node)
- except Exception, e:
- logger.error("Failed to connect to node" + str(e))
+ except Exception as err:
+ logger.error("Failed to connect to node" + str(err))
return None, None, None
try:
else:
(ret_code, stdout, stderr) = ssh.exec_command_sudo(cmd,
timeout=timeout)
- except Exception, e:
- logger.error(e)
+ except Exception as err:
+ logger.error(err)
return None, None, None
return ret_code, stdout, stderr
Returns (stdout, stderr).
"""
- (rc, stdout, stderr) = exec_cmd(node, cmd, timeout=timeout, sudo=sudo)
- assert_equal(rc, 0, 'Command execution failed: "{}"\n{}'.
+ (ret_code, stdout, stderr) = exec_cmd(node, cmd, timeout=timeout, sudo=sudo)
+ assert_equal(ret_code, 0, 'Command execution failed: "{}"\n{}'.
format(cmd, stderr))
return stdout, stderr
with open(topo_path) as work_file:
return load(work_file.read())['nodes']
+
# pylint: disable=invalid-name
class NodeType(object):
"""Defines node types used in topology dictionaries."""
class NodeSubTypeTG(object):
+ """Defines node sub-type TG - traffic generator."""
# T-Rex traffic generator
TREX = 'TREX'
# Moongen
:rtype: str
"""
ret, stdo, stde = ssh.exec_command(cmd)
- if 0 != ret:
+ if ret != 0:
print 'Command execution failed: "{}"'.format(cmd)
print 'stdout: {0}'.format(stdo)
print 'stderr: {0}'.format(stde)