+ DUTSetup.py:424 Else clause on loop without a break statement
+ InterfaceUtil.py:400 Else clause on loop without a break statement
+ QemuUtils.py:564 Wrong continued indentation
+ SetupDPDKTest.py: Locally enabling broad-except
+ VatExecutor.py: Catching too general exception Exception
+ ssh.py:95 No exception type(s) specified.
+ HTTPRequest.py: Tolerate HTTPCodes.OK
+ multiple: Drop ":returns: None" from docstrings.
There are still several warnings present:
- R0902(too-many-instance-attributes)
- R0912(too-many-branches)
- R0913(too-many-arguments)
- R0914(too-many-locals)
- R0915(too-many-statements)
- R0401(cyclic-import)
And there are multiple blocks of similar lines,
mainly across various Setup*Test.py files:
- R0801(duplicate-code)
Change-Id: I582575cb52b85d69d268e6374852f6e74bb71052
Signed-off-by: Vratko Polak <vrpolak@cisco.com>
17 files changed:
:type dut_node: dict
:type dut_if1: str
:type dut_if2: str
:type dut_node: dict
:type dut_if1: str
:type dut_if2: str
:raises RuntimeError: If it fails to bind the interfaces to igb_uio.
"""
if dut_node['type'] == NodeType.DUT:
:raises RuntimeError: If it fails to bind the interfaces to igb_uio.
"""
if dut_node['type'] == NodeType.DUT:
:type dut_node: dict
:type dut_if1: str
:type dut_if2: str
:type dut_node: dict
:type dut_if1: str
:type dut_if2: str
:raises RuntimeError: If it fails to cleanup the dpdk.
"""
if dut_node['type'] == NodeType.DUT:
:raises RuntimeError: If it fails to cleanup the dpdk.
"""
if dut_node['type'] == NodeType.DUT:
:type nb_cores: str
:type queue_nums: str
:type jumbo_frames: str
:type nb_cores: str
:type queue_nums: str
:type jumbo_frames: str
:raises RuntimeError: If the script "run_l2fwd.sh" fails.
"""
if dut_node['type'] == NodeType.DUT:
:raises RuntimeError: If the script "run_l2fwd.sh" fails.
"""
if dut_node['type'] == NodeType.DUT:
:type lcores_list: str
:type queue_nums: str
:type jumbo_frames: str
:type lcores_list: str
:type queue_nums: str
:type jumbo_frames: str
"""
if dut_node['type'] == NodeType.DUT:
adj_mac0, adj_mac1 = L3fwdTest.get_adj_mac(nodes_info, dut_node,
"""
if dut_node['type'] == NodeType.DUT:
adj_mac0, adj_mac1 = L3fwdTest.get_adj_mac(nodes_info, dut_node,
def pack_framework_dir():
def pack_framework_dir():
- """Pack the testing WS into temp file, return its name."""
+ """Pack the testing WS into temp file, return its name.
+
+ :raise RuntimeError: If command returns nonzero return code."""
tmpfile = NamedTemporaryFile(suffix=".tgz", prefix="DPDK-testing-")
file_name = tmpfile.name
tmpfile = NamedTemporaryFile(suffix=".tgz", prefix="DPDK-testing-")
file_name = tmpfile.name
return_code = proc.wait()
if return_code != 0:
return_code = proc.wait()
if return_code != 0:
- raise Exception("Could not pack testing framework.")
+ raise RuntimeError("Could not pack testing framework.")
:type tarball: str
:type node: dict
:returns: nothing
:type tarball: str
:type node: dict
:returns: nothing
+ :raise RuntimeError: If command returns nonzero return code.
"""
logger.console('Extracting tarball to {0} on {1}'.format(
con.REMOTE_FW_DIR, node['host']))
"""
logger.console('Extracting tarball to {0} on {1}'.format(
con.REMOTE_FW_DIR, node['host']))
(ret_code, _, stderr) = ssh.exec_command(cmd, timeout=30)
if ret_code != 0:
logger.error('Unpack error: {0}'.format(stderr))
(ret_code, _, stderr) = ssh.exec_command(cmd, timeout=30)
if ret_code != 0:
logger.error('Unpack error: {0}'.format(stderr))
- raise Exception('Failed to unpack {0} at node {1}'.format(
+ raise RuntimeError('Failed to unpack {0} at node {1}'.format(
:param node: Dictionary created from topology, will only install in the TG
:type node: dict
:returns: nothing
:param node: Dictionary created from topology, will only install in the TG
:type node: dict
:returns: nothing
+ :raise RuntimeError: If command returns nonzero return code.
"""
logger.console('Extracting virtualenv, installing requirements.txt '
'on {0}'.format(node['host']))
"""
logger.console('Extracting virtualenv, installing requirements.txt '
'on {0}'.format(node['host']))
.format(con.REMOTE_FW_DIR), timeout=100)
if ret_code != 0:
logger.error('Virtualenv creation error: {0}'.format(stdout + stderr))
.format(con.REMOTE_FW_DIR), timeout=100)
if ret_code != 0:
logger.error('Virtualenv creation error: {0}'.format(stdout + stderr))
- raise Exception('Virtualenv setup failed')
+ raise RuntimeError('Virtualenv setup failed')
else:
logger.console('Virtualenv created on {0}'.format(node['host']))
else:
logger.console('Virtualenv created on {0}'.format(node['host']))
:param node: Dictionary created from topology
:type node: dict
:returns: nothing
:param node: Dictionary created from topology
:type node: dict
:returns: nothing
+ :raise RuntimeError: If command returns nonzero return code.
"""
arch = Topology.get_node_arch(node)
logger.console('Install the DPDK on {0} ({1})'.format(node['host'],
"""
arch = Topology.get_node_arch(node)
logger.console('Install the DPDK on {0} ({1})'.format(node['host'],
if ret_code != 0:
logger.error('Install the DPDK error: {0}'.format(stderr))
if ret_code != 0:
logger.error('Install the DPDK error: {0}'.format(stderr))
- raise Exception('Install the DPDK failed')
+ raise RuntimeError('Install the DPDK failed')
else:
logger.console('Install the DPDK on {0} success!'.format(node['host']))
else:
logger.console('Install the DPDK on {0} success!'.format(node['host']))
-#pylint: disable=broad-except
def setup_node(args):
"""Run all set-up methods for a node.
def setup_node(args):
"""Run all set-up methods for a node.
install_dpdk_test(node)
if node['type'] == NodeType.TG:
create_env_directory_at_node(node)
install_dpdk_test(node)
if node['type'] == NodeType.TG:
create_env_directory_at_node(node)
- except Exception as exc:
+ except RuntimeError as exc:
logger.error("Node setup failed, error:'{0}'".format(exc.message))
return False
else:
logger.console('Setup of node {0} done'.format(node['host']))
return True
logger.error("Node setup failed, error:'{0}'".format(exc.message))
return False
else:
logger.console('Setup of node {0} done'.format(node['host']))
return True
-#pylint: enable=broad-except
def delete_local_tarball(tarball):
"""Delete local tarball to prevent disk pollution.
def delete_local_tarball(tarball):
"""Delete local tarball to prevent disk pollution.
return None
if name == 'Driver:':
return value
return None
if name == 'Driver:':
return value
@staticmethod
def kernel_module_verify(node, module, force_load=False):
@staticmethod
def kernel_module_verify(node, module, force_load=False):
@unique
class HTTPCodes(IntEnum):
"""HTTP status codes"""
@unique
class HTTPCodes(IntEnum):
"""HTTP status codes"""
+ OK = 200 # HTTP standard code name. # pylint: disable=invalid-name
ACCEPTED = 201
UNAUTHORIZED = 401
FORBIDDEN = 403
ACCEPTED = 201
UNAUTHORIZED = 401
FORBIDDEN = 403
:type node: dict
:type pci_addr: str
:type driver: str
:type node: dict
:type pci_addr: str
:type driver: str
:raises RuntimeError: If unbinding from the current driver fails.
:raises RuntimeError: If binding to the new driver fails.
"""
:raises RuntimeError: If unbinding from the current driver fails.
:raises RuntimeError: If binding to the new driver fails.
"""
return None
if name == 'Driver:':
return value if value else None
return None
if name == 'Driver:':
return value if value else None
- else:
- raise RuntimeError('Get interface driver for: {0}'
- .format(pci_addr))
+ raise RuntimeError('Get interface driver for: {0}'
+ .format(pci_addr))
@staticmethod
def tg_set_interfaces_udev_rules(node):
@staticmethod
def tg_set_interfaces_udev_rules(node):
:type sw_if_index: int
:type bd_id: int
:type shg: int
:type sw_if_index: int
:type bd_id: int
:type shg: int
"""
VatExecutor.cmd_from_template(node, "l2_bd_add_sw_if_index.vat",
bd_id=bd_id, sw_if_index=sw_if_index,
"""
VatExecutor.cmd_from_template(node, "l2_bd_add_sw_if_index.vat",
bd_id=bd_id, sw_if_index=sw_if_index,
:param queue: Queue in which this function will push incoming packets.
:type interface_name: str
:type queue: multiprocessing.Queue
:param queue: Queue in which this function will push incoming packets.
:type interface_name: str
:type queue: multiprocessing.Queue
"""
sock = conf.L2listen(iface=interface_name, type=ETH_P_ALL)
"""
sock = conf.L2listen(iface=interface_name, type=ETH_P_ALL)
pid = '-pidfile {}'.format(self._pid_file)
# Run QEMU
pid = '-pidfile {}'.format(self._pid_file)
# Run QEMU
- cmd = '{0} {1} {2} {3} {4} {5} {6} {7} {8} {9} {10}'.format(bin_path,
- self._qemu_opt.get('smp'), mem, ssh_fwd,
+ cmd = '{0} {1} {2} {3} {4} {5} {6} {7} {8} {9} {10}'.format(
+ bin_path, self._qemu_opt.get('smp'), mem, ssh_fwd,
self._qemu_opt.get('options'), drive, qmp, serial, qga, graphic,
pid)
try:
self._qemu_opt.get('options'), drive, qmp, serial, qga, graphic,
pid)
try:
:type if1_adj_mac: str
:type if2_adj_mac: str
:type testtype: str
:type if1_adj_mac: str
:type if2_adj_mac: str
:type testtype: str
:raises RuntimeError: If the script execute fails.
"""
:raises RuntimeError: If the script execute fails.
"""
:param payload_data: the payload data in the packet.
:type payload_data: str
:param payload_data: the payload data in the packet.
:type payload_data: str
:raises RuntimeError: If the vxlan protocol field verify fails.
"""
# get the vxlan packet and check it
:raises RuntimeError: If the vxlan protocol field verify fails.
"""
# get the vxlan packet and check it
:param test_type: the functional test type.
:type payload_data: str
:type test_type: str
:param test_type: the functional test type.
:type payload_data: str
:type test_type: str
:raises RuntimeError: If the vxlangpe and nsh protocol
field verify fails.
"""
:raises RuntimeError: If the vxlangpe and nsh protocol
field verify fails.
"""
:type ether: scapy.Ether
:type frame_size: Integer
:type test_type: str
:type ether: scapy.Ether
:type frame_size: Integer
:type test_type: str
:raises RuntimeError: If the packet field verify fails.
"""
:raises RuntimeError: If the packet field verify fails.
"""
:type file_prefix: str
:type dest_ip: str
:type is_ipv4: bool
:type file_prefix: str
:type dest_ip: str
:type is_ipv4: bool
:raises RuntimeError: If failed to execute udpfwd test on the dut node.
"""
pci_address = Topology.get_interface_pci_addr(dut_node, dut_if)
:raises RuntimeError: If failed to execute udpfwd test on the dut node.
"""
pci_address = Topology.get_interface_pci_addr(dut_node, dut_if)
__LINUX_PROMPT = (":~$ ", "~]$ ", "~]# ")
def __init__(self, node, json_param=True):
__LINUX_PROMPT = (":~$ ", "~]$ ", "~]# ")
def __init__(self, node, json_param=True):
+ """TODO: Should we document this constructor can raise RuntimeError?"""
json_text = ' json' if json_param else ''
self.json = json_param
self._node = node
json_text = ' json' if json_param else ''
self.json = json_param
self._node = node
self._ssh.connect(self._node)
try:
self._tty = self._ssh.interactive_terminal_open()
self._ssh.connect(self._node)
try:
self._tty = self._ssh.interactive_terminal_open()
raise RuntimeError("Cannot open interactive terminal on node {0}".
format(self._node))
raise RuntimeError("Cannot open interactive terminal on node {0}".
format(self._node))
self._tty,
'sudo -S {0}{1}'.format(Constants.VAT_BIN_NAME, json_text),
self.__VAT_PROMPT)
self._tty,
'sudo -S {0}{1}'.format(Constants.VAT_BIN_NAME, json_text),
self.__VAT_PROMPT)
"""Execute command on the opened VAT terminal.
:param cmd: Command to be executed.
"""Execute command on the opened VAT terminal.
:param cmd: Command to be executed.
:returns: Command output in python representation of JSON format or
None if not in JSON mode.
:returns: Command output in python representation of JSON format or
None if not in JSON mode.
+ :raise RuntimeError: If VAT command execution fails.
"""
VatHistory.add_to_vat_history(self._node, cmd)
logger.debug("Executing command in VAT terminal: {0}".format(cmd))
"""
VatHistory.add_to_vat_history(self._node, cmd)
logger.debug("Executing command in VAT terminal: {0}".format(cmd))
out = self._ssh.interactive_terminal_exec_command(self._tty, cmd,
self.__VAT_PROMPT)
self.vat_stdout = out
out = self._ssh.interactive_terminal_exec_command(self._tty, cmd,
self.__VAT_PROMPT)
self.vat_stdout = out
self._exec_failure = True
vpp_pid = get_vpp_pid(self._node)
if vpp_pid:
self._exec_failure = True
vpp_pid = get_vpp_pid(self._node)
if vpp_pid:
format(self._ssh.get_transport().getpeername()))
logger.debug('Connections: {0}'.
format(str(SSH.__existing_connections)))
format(self._ssh.get_transport().getpeername()))
logger.debug('Connections: {0}'.
format(str(SSH.__existing_connections)))
+ except RuntimeError as exc:
if attempts > 0:
self._reconnect(attempts-1)
else:
if attempts > 0:
self._reconnect(attempts-1)
else:
def disconnect(self, node):
"""Close SSH connection to the node.
def disconnect(self, node):
"""Close SSH connection to the node.
def interactive_terminal_open(self, time_out=30):
"""Open interactive terminal on a new channel on the connected Node.
def interactive_terminal_open(self, time_out=30):
"""Open interactive terminal on a new channel on the connected Node.
- :param time_out: Timeout in seconds.
- :returns: SSH channel with opened terminal.
+ FIXME: Convert or document other possible exceptions, such as
+ socket.error or SSHException.
.. warning:: Interruptingcow is used here, and it uses
signal(SIGALRM) to let the operating system interrupt program
.. warning:: Interruptingcow is used here, and it uses
signal(SIGALRM) to let the operating system interrupt program
handlers only apply to the main thread, so you cannot use this
from other threads. You must not use this in a program that
uses SIGALRM itself (this includes certain profilers)
handlers only apply to the main thread, so you cannot use this
from other threads. You must not use this in a program that
uses SIGALRM itself (this includes certain profilers)
+
+ :param time_out: Timeout in seconds.
+ :returns: SSH channel with opened terminal.
+ :raise IOError: If receive attempt results in socket.timeout.
"""
chan = self._ssh.get_transport().open_session()
chan.get_pty()
"""
chan = self._ssh.get_transport().open_session()
chan.get_pty()
break
except socket.timeout:
logger.error('Socket timeout: {0}'.format(buf))
break
except socket.timeout:
logger.error('Socket timeout: {0}'.format(buf))
- raise Exception('Socket timeout: {0}'.format(buf))
+ # TODO: Find out which exception would callers appreciate here.
+ raise IOError('Socket timeout: {0}'.format(buf))
return chan
def interactive_terminal_exec_command(self, chan, cmd, prompt):
return chan
def interactive_terminal_exec_command(self, chan, cmd, prompt):
interactive_terminal_open() method has to be called first!
interactive_terminal_open() method has to be called first!
- :param chan: SSH channel with opened terminal.
- :param cmd: Command to be executed.
- :param prompt: Command prompt, sequence of characters used to
- indicate readiness to accept commands.
- :returns: Command output.
-
.. warning:: Interruptingcow is used here, and it uses
signal(SIGALRM) to let the operating system interrupt program
execution. This has the following limitations: Python signal
handlers only apply to the main thread, so you cannot use this
from other threads. You must not use this in a program that
uses SIGALRM itself (this includes certain profilers)
.. warning:: Interruptingcow is used here, and it uses
signal(SIGALRM) to let the operating system interrupt program
execution. This has the following limitations: Python signal
handlers only apply to the main thread, so you cannot use this
from other threads. You must not use this in a program that
uses SIGALRM itself (this includes certain profilers)
+
+ :param chan: SSH channel with opened terminal.
+ :param cmd: Command to be executed.
+ :param prompt: Command prompt, sequence of characters used to
+ indicate readiness to accept commands.
+ :returns: Command output.
+ :raise IOError: If receive attempt results in socket.timeout.
"""
chan.sendall('{c}\n'.format(c=cmd))
buf = ''
"""
chan.sendall('{c}\n'.format(c=cmd))
buf = ''
except socket.timeout:
logger.error('Socket timeout during execution of command: '
'{0}\nBuffer content:\n{1}'.format(cmd, buf))
except socket.timeout:
logger.error('Socket timeout during execution of command: '
'{0}\nBuffer content:\n{1}'.format(cmd, buf))
- raise Exception('Socket timeout during execution of command: '
- '{0}\nBuffer content:\n{1}'.format(cmd, buf))
+ # TODO: Find out which exception would callers appreciate here.
+ raise IOError('Socket timeout during execution of command: '
+ '{0}\nBuffer content:\n{1}'.format(cmd, buf))
tmp = buf.replace(cmd.replace('\n', ''), '')
for item in prompt:
tmp.replace(item, '')
tmp = buf.replace(cmd.replace('\n', ''), '')
for item in prompt:
tmp.replace(item, '')
def exec_cmd(node, cmd, timeout=600, sudo=False):
"""Convenience function to ssh/exec/return rc, out & err.
def exec_cmd(node, cmd, timeout=600, sudo=False):
"""Convenience function to ssh/exec/return rc, out & err.
+ FIXME: Document :param, :type, :raise and similar.
Returns (rc, stdout, stderr).
"""
if node is None:
Returns (rc, stdout, stderr).
"""
if node is None:
:param node: Node from topology.
:type node: dict
:param node: Node from topology.
:type node: dict
"""
for port_name, port in node['interfaces'].items():
if 'driver' not in port:
"""
for port_name, port in node['interfaces'].items():
if 'driver' not in port:
:param topology: Topology information with nodes.
:type topology: dict
:param topology: Topology information with nodes.
:type topology: dict
"""
for node in topology['nodes'].values():
update_mac_addresses_for_node(node)
"""
for node in topology['nodes'].values():
update_mac_addresses_for_node(node)
:param function: Function to be executed in each thread.
:type function: function
:param function: Function to be executed in each thread.
:type function: function
"""
self.total_ok_rqsts = 0
"""
self.total_ok_rqsts = 0