- Fix PyLint errors
- Fix comments in touched python modules
Change-Id: I26db2d292a41969cf38b9b0bdd49c4fb15349102
Signed-off-by: Tibor Frank <tifrank@cisco.com>
33 files changed:
no-space-check=trailing-comma,dict-separator
# Maximum number of lines in a module
no-space-check=trailing-comma,dict-separator
# Maximum number of lines in a module
# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
# tab).
# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
# tab).
function-rgx=[a-z_][a-z0-9_]{2,30}$
# Regular expression which should only match correct method names
function-rgx=[a-z_][a-z0-9_]{2,30}$
# Regular expression which should only match correct method names
-method-rgx=[a-z_][a-z0-9_]{2,40}$
+method-rgx=[a-z_][a-z0-9_]{2,50}$
# Regular expression which should only match correct instance attribute names
attr-rgx=[a-z_][a-z0-9_]{2,30}$
# Regular expression which should only match correct instance attribute names
attr-rgx=[a-z_][a-z0-9_]{2,30}$
[DESIGN]
# Maximum number of arguments for function / method
[DESIGN]
# Maximum number of arguments for function / method
# Argument names that match this expression will be ignored. Default to name
# with leading underscore
# Argument names that match this expression will be ignored. Default to name
# with leading underscore
min-public-methods=0
# Maximum number of public methods for a class (see R0904).
min-public-methods=0
# Maximum number of public methods for a class (see R0904).
from resources.libraries.python.VatExecutor import VatExecutor, VatTerminal
from resources.libraries.python.VatExecutor import VatExecutor, VatTerminal
-# pylint: disable=too-many-arguments, invalid-name
-
class Classify(object):
"""Classify utilities."""
class Classify(object):
"""Classify utilities."""
:type node: dict
:type ip_version: str
:type direction: str
:type node: dict
:type ip_version: str
:type direction: str
- :return (table_index, skip_n, match_n)
+ :returns (table_index, skip_n, match_n)
table_index: Classify table index.
skip_n: Number of skip vectors.
match_n: Number of match vectors.
table_index: Classify table index.
skip_n: Number of skip vectors.
match_n: Number of match vectors.
:param direction: Direction of traffic - src/dst.
:type node: dict
:type direction: str
:param direction: Direction of traffic - src/dst.
:type node: dict
:type direction: str
- :return (table_index, skip_n, match_n)
+ :returns (table_index, skip_n, match_n)
table_index: Classify table index.
skip_n: Number of skip vectors.
match_n: Number of match vectors.
table_index: Classify table index.
skip_n: Number of skip vectors.
match_n: Number of match vectors.
:param hex_mask: Classify hex mask.
:type node: dict
:type hex_mask: str
:param hex_mask: Classify hex mask.
:type node: dict
:type hex_mask: str
- :return (table_index, skip_n, match_n)
+ :returns (table_index, skip_n, match_n)
table_index: Classify table index.
skip_n: Number of skip vectors.
match_n: Number of match vectors.
table_index: Classify table index.
skip_n: Number of skip vectors.
match_n: Number of match vectors.
:type ip_version: str
:type protocol: str
:type direction: str
:type ip_version: str
:type protocol: str
:type direction: str
- :return: Classify hex mask.
+ :returns: Classify hex mask.
:rtype : str
:raises ValueError: If protocol is not TCP or UDP.
:raises ValueError: If direction is not source or destination or
:rtype : str
:raises ValueError: If protocol is not TCP or UDP.
:raises ValueError: If direction is not source or destination or
:type hex_mask: str
:type source_port: str
:type destination_port: str
:type hex_mask: str
:type source_port: str
:type destination_port: str
- :return: Classify hex value.
+ :returns: Classify hex value.
:rtype: str
"""
source_port_hex = Classify._port_convert(source_port)
:rtype: str
"""
source_port_hex = Classify._port_convert(source_port)
:param port: TCP/UDP port number.
:type port: str
:param port: TCP/UDP port number.
:type port: str
- :return: TCP/UDP port number in 4-digit hexadecimal format.
+ :returns: TCP/UDP port number in 4-digit hexadecimal format.
:rtype: str
"""
return '{0:04x}'.format(int(port))
:rtype: str
"""
return '{0:04x}'.format(int(port))
:param table_index: Index of a specific classify table.
:type node: dict
:type table_index: int
:param table_index: Index of a specific classify table.
:type node: dict
:type table_index: int
- :return: Classify table settings.
+ :returns: Classify table settings.
:rtype: dict
"""
with VatTerminal(node) as vat:
:rtype: dict
"""
with VatTerminal(node) as vat:
:type node: dict
:type table_index: int
:type session_index: int
:type node: dict
:type table_index: int
:type session_index: int
- :return: List of classify session settings, or a dictionary of settings
+ :returns: List of classify session settings, or a dictionary of settings
for a specific classify session.
:rtype: list or dict
"""
for a specific classify session.
:rtype: list or dict
"""
ssh.connect(node)
(ret_code, stdout, stderr) = \
ssh.connect(node)
(ret_code, stdout, stderr) = \
- ssh.exec_command('sudo -Sn bash {0}/{1}/dut_setup.sh'.format(
- Constants.REMOTE_FW_DIR, Constants.RESOURCES_LIB_SH), timeout=120)
+ ssh.exec_command('sudo -Sn bash {0}/{1}/dut_setup.sh'.
+ format(Constants.REMOTE_FW_DIR,
+ Constants.RESOURCES_LIB_SH), timeout=120)
logger.trace(stdout)
logger.trace(stderr)
if int(ret_code) != 0:
logger.trace(stdout)
logger.trace(stderr)
if int(ret_code) != 0:
if int(ret_code) != 0:
logger.debug('Not possible to get PID of VPP process on node: '
if int(ret_code) != 0:
logger.debug('Not possible to get PID of VPP process on node: '
- '"{1}"'.format(node['host'], stdout + stderr))
+ '{0}\n {1}'.format(node['host'], stdout + stderr))
raise RuntimeError('Not possible to get PID of VPP process on node:'
' {}'.format(node['host']))
raise RuntimeError('Not possible to get PID of VPP process on node:'
' {}'.format(node['host']))
class DpdkUtil(object):
"""Utilities for DPDK."""
class DpdkUtil(object):
"""Utilities for DPDK."""
- #pylint: disable=too-many-locals
@staticmethod
def dpdk_testpmd_start(node, **args):
"""Start DPDK testpmd app on VM node.
@staticmethod
def dpdk_testpmd_start(node, **args):
"""Start DPDK testpmd app on VM node.
@staticmethod
def setup_network_namespace(node, namespace_name, interface_name,
@staticmethod
def setup_network_namespace(node, namespace_name, interface_name,
"""Setup namespace on given node and attach interface and IP to
this namespace. Applicable also on TG node.
:param node: Node to set namespace on.
:param namespace_name: Namespace name.
:param interface_name: Interface name.
"""Setup namespace on given node and attach interface and IP to
this namespace. Applicable also on TG node.
:param node: Node to set namespace on.
:param namespace_name: Namespace name.
:param interface_name: Interface name.
- :param ip_address: IP address of namespace's interface.
+ :param ip_addr: IP address of namespace's interface.
:param prefix: IP address prefix length.
:type node: dict
:type namespace_name: str
:type vhost_if: str
:param prefix: IP address prefix length.
:type node: dict
:type namespace_name: str
:type vhost_if: str
:type prefix: int
"""
cmd = ('ip netns add {0}'.format(namespace_name))
:type prefix: int
"""
cmd = ('ip netns add {0}'.format(namespace_name))
exec_cmd_no_error(node, cmd, sudo=True)
cmd = ('ip netns exec {0} ip addr add {1}/{2} dev {3}'.format(
exec_cmd_no_error(node, cmd, sudo=True)
cmd = ('ip netns exec {0} ip addr add {1}/{2} dev {3}'.format(
- namespace_name, ip_address, prefix, interface_name))
+ namespace_name, ip_addr, prefix, interface_name))
exec_cmd_no_error(node, cmd, sudo=True)
@staticmethod
exec_cmd_no_error(node, cmd, sudo=True)
@staticmethod
exec_cmd_no_error(node, cmd, sudo=True)
@staticmethod
exec_cmd_no_error(node, cmd, sudo=True)
@staticmethod
- def set_linux_interface_ip(node, interface, ip, prefix, namespace=None):
+ def set_linux_interface_ip(node, interface, ip_addr, prefix,
+ namespace=None):
"""Set IP address to interface in linux.
:param node: Node where to execute command.
:param interface: Interface in namespace.
"""Set IP address to interface in linux.
:param node: Node where to execute command.
:param interface: Interface in namespace.
- :param ip: IP to be set on interface.
+ :param ip_addr: IP to be set on interface.
:param prefix: IP prefix.
:param namespace: Execute command in namespace. Optional
:type node: dict
:type interface: str
:param prefix: IP prefix.
:param namespace: Execute command in namespace. Optional
:type node: dict
:type interface: str
:type prefix: int
:type namespace: str
:raises RuntimeError: IP could not be set.
"""
if namespace is not None:
cmd = 'ip netns exec {} ip addr add {}/{} dev {}'.format(
:type prefix: int
:type namespace: str
:raises RuntimeError: IP could not be set.
"""
if namespace is not None:
cmd = 'ip netns exec {} ip addr add {}/{} dev {}'.format(
- namespace, ip, prefix, interface)
+ namespace, ip_addr, prefix, interface)
- cmd = 'ip addr add {}/{} dev {}'.format(ip, prefix, interface)
- (rc, _, stderr) = exec_cmd(node, cmd, timeout=5, sudo=True)
- if rc != 0:
+ cmd = 'ip addr add {}/{} dev {}'.format(ip_addr, prefix, interface)
+ (ret_code, _, stderr) = exec_cmd(node, cmd, timeout=5, sudo=True)
+ if ret_code != 0:
raise RuntimeError(
'Could not set IP for interface, reason:{}'.format(stderr))
raise RuntimeError(
'Could not set IP for interface, reason:{}'.format(stderr))
Example: mask 255.255.0.0 -> prefix length 16
:param network: Network mask or network prefix length.
:type network: str or int
Example: mask 255.255.0.0 -> prefix length 16
:param network: Network mask or network prefix length.
:type network: str or int
- :return: Network mask or network prefix length.
+ :returns: Network mask or network prefix length.
:rtype: str or int
"""
temp_address = "0.0.0.0"
:rtype: str or int
"""
temp_address = "0.0.0.0"
from resources.libraries.python.VatJsonUtil import VatJsonUtil
from resources.libraries.python.VatJsonUtil import VatJsonUtil
-# pylint: disable=too-few-public-methods
class PolicyAction(Enum):
"""Policy actions."""
BYPASS = 'bypass'
class PolicyAction(Enum):
"""Policy actions."""
BYPASS = 'bypass'
class IPsecUtil(object):
"""IPsec utilities."""
class IPsecUtil(object):
"""IPsec utilities."""
- # pylint: disable=too-many-arguments
- # pylint: disable=too-many-locals
-
@staticmethod
def policy_action_bypass():
"""Return policy action bypass.
@staticmethod
def policy_action_bypass():
"""Return policy action bypass.
- :return: PolicyAction enum BYPASS object.
+ :returns: PolicyAction enum BYPASS object.
:rtype: PolicyAction
"""
return PolicyAction.BYPASS
:rtype: PolicyAction
"""
return PolicyAction.BYPASS
def policy_action_discard():
"""Return policy action discard.
def policy_action_discard():
"""Return policy action discard.
- :return: PolicyAction enum DISCARD object.
+ :returns: PolicyAction enum DISCARD object.
:rtype: PolicyAction
"""
return PolicyAction.DISCARD
:rtype: PolicyAction
"""
return PolicyAction.DISCARD
def policy_action_protect():
"""Return policy action protect.
def policy_action_protect():
"""Return policy action protect.
- :return: PolicyAction enum PROTECT object.
+ :returns: PolicyAction enum PROTECT object.
:rtype: PolicyAction
"""
return PolicyAction.PROTECT
:rtype: PolicyAction
"""
return PolicyAction.PROTECT
def crypto_alg_aes_cbc_128():
"""Return encryption algorithm aes-cbc-128.
def crypto_alg_aes_cbc_128():
"""Return encryption algorithm aes-cbc-128.
- :return: CryptoAlg enum AES_CBC_128 object.
+ :returns: CryptoAlg enum AES_CBC_128 object.
:rtype: CryptoAlg
"""
return CryptoAlg.AES_CBC_128
:rtype: CryptoAlg
"""
return CryptoAlg.AES_CBC_128
def crypto_alg_aes_cbc_192():
"""Return encryption algorithm aes-cbc-192.
def crypto_alg_aes_cbc_192():
"""Return encryption algorithm aes-cbc-192.
- :return: CryptoAlg enum AES_CBC_192 objec.
+ :returns: CryptoAlg enum AES_CBC_192 objec.
:rtype: CryptoAlg
"""
return CryptoAlg.AES_CBC_192
:rtype: CryptoAlg
"""
return CryptoAlg.AES_CBC_192
def crypto_alg_aes_cbc_256():
"""Return encryption algorithm aes-cbc-256.
def crypto_alg_aes_cbc_256():
"""Return encryption algorithm aes-cbc-256.
- :return: CryptoAlg enum AES_CBC_256 object.
+ :returns: CryptoAlg enum AES_CBC_256 object.
:rtype: CryptoAlg
"""
return CryptoAlg.AES_CBC_256
:rtype: CryptoAlg
"""
return CryptoAlg.AES_CBC_256
:param crypto_alg: Encryption algorithm.
:type crypto_alg: CryptoAlg
:param crypto_alg: Encryption algorithm.
:type crypto_alg: CryptoAlg
:rtype: int
"""
return crypto_alg.key_len
:rtype: int
"""
return crypto_alg.key_len
:param crypto_alg: Encryption algorithm.
:type crypto_alg: CryptoAlg
:param crypto_alg: Encryption algorithm.
:type crypto_alg: CryptoAlg
- :return: Algorithm scapy name.
+ :returns: Algorithm scapy name.
:rtype: str
"""
return crypto_alg.scapy_name
:rtype: str
"""
return crypto_alg.scapy_name
def integ_alg_sha1_96():
"""Return integrity algorithm SHA1-96.
def integ_alg_sha1_96():
"""Return integrity algorithm SHA1-96.
- :return: IntegAlg enum SHA1_96 object.
+ :returns: IntegAlg enum SHA1_96 object.
:rtype: IntegAlg
"""
return IntegAlg.SHA1_96
:rtype: IntegAlg
"""
return IntegAlg.SHA1_96
def integ_alg_sha_256_128():
"""Return integrity algorithm SHA-256-128.
def integ_alg_sha_256_128():
"""Return integrity algorithm SHA-256-128.
- :return: IntegAlg enum SHA_256_128 object.
+ :returns: IntegAlg enum SHA_256_128 object.
:rtype: IntegAlg
"""
return IntegAlg.SHA_256_128
:rtype: IntegAlg
"""
return IntegAlg.SHA_256_128
def integ_alg_sha_384_192():
"""Return integrity algorithm SHA-384-192.
def integ_alg_sha_384_192():
"""Return integrity algorithm SHA-384-192.
- :return: IntegAlg enum SHA_384_192 object.
+ :returns: IntegAlg enum SHA_384_192 object.
:rtype: IntegAlg
"""
return IntegAlg.SHA_384_192
:rtype: IntegAlg
"""
return IntegAlg.SHA_384_192
def integ_alg_sha_512_256():
"""Return integrity algorithm SHA-512-256.
def integ_alg_sha_512_256():
"""Return integrity algorithm SHA-512-256.
- :return: IntegAlg enum SHA_512_256 object.
+ :returns: IntegAlg enum SHA_512_256 object.
:rtype: IntegAlg
"""
return IntegAlg.SHA_512_256
:rtype: IntegAlg
"""
return IntegAlg.SHA_512_256
:param integ_alg: Integrity algorithm.
:type integ_alg: IntegAlg
:param integ_alg: Integrity algorithm.
:type integ_alg: IntegAlg
:rtype: int
"""
return integ_alg.key_len
:rtype: int
"""
return integ_alg.key_len
:param integ_alg: Integrity algorithm.
:type integ_alg: IntegAlg
:param integ_alg: Integrity algorithm.
:type integ_alg: IntegAlg
- :return: Algorithm scapy name.
+ :returns: Algorithm scapy name.
:rtype: str
"""
return integ_alg.scapy_name
:rtype: str
"""
return integ_alg.scapy_name
:param prefix_length: Length of network prefix.
:type prefix_length: int
:param prefix_length: Length of network prefix.
:type prefix_length: int
-
- :return: Network mask.
+ :returns: Network mask.
:type interface: str
:type address: str
:type prefix_length: int
:type interface: str
:type address: str
:type prefix_length: int
:type prefix_length: int
:type gateway: str
:type interface: str
:type prefix_length: int
:type gateway: str
:type interface: str
:type prefix_length: int
:type gateway: str
:type interface: str
:type prefix_length: int
:type gateway: str
:type interface: str
:param source_interface: Source interface name.
:type destination_address: str
:type source_interface: str
:param source_interface: Source interface name.
:type destination_address: str
:type source_interface: str
:param cmd: Command to be executed.
:type cmd: str
:param cmd: Command to be executed.
:type cmd: str
-
- :return: Content of stdout and stderr returned by command.
+ :returns: Content of stdout and stderr returned by command.
:rtype: tuple
"""
return exec_cmd_no_error(self.node_info, cmd)
:rtype: tuple
"""
return exec_cmd_no_error(self.node_info, cmd)
:param cmd: Command to be executed.
:type cmd: str
:param cmd: Command to be executed.
:type cmd: str
-
- :return: Content of stdout and stderr returned by command.
+ :returns: Content of stdout and stderr returned by command.
:rtype: tuple
"""
return exec_cmd_no_error(self.node_info, cmd, sudo=True)
:rtype: tuple
"""
return exec_cmd_no_error(self.node_info, cmd, sudo=True)
:param interface: Interface name.
:type interface: str
:param interface: Interface name.
:type interface: str
- :return: sw_if_index of the interface or None.
+ :returns: sw_if_index of the interface or None.
:rtype: int
"""
return Topology().get_interface_sw_index(self.node_info, interface)
:rtype: int
"""
return Topology().get_interface_sw_index(self.node_info, interface)
:param args: Parameters to the script.
:type script: str
:type args: dict
:param args: Parameters to the script.
:type script: str
:type args: dict
"""
# TODO: check return value
VatExecutor.cmd_from_template(self.node_info, script, **args)
"""
# TODO: check return value
VatExecutor.cmd_from_template(self.node_info, script, **args)
:param node_info: Dictionary containing information on nodes in topology.
:type node_info: dict
:param node_info: Dictionary containing information on nodes in topology.
:type node_info: dict
- :return: Class instance that is derived from Node.
+ :returns: Class instance that is derived from Node.
"""
if node_info['type'] == NodeType.TG:
return Tg(node_info)
"""
if node_info['type'] == NodeType.TG:
return Tg(node_info)
:param nodes_addr: Available nodes IPv4 addresses.
:type nodes: dict
:type nodes_addr: dict
:param nodes_addr: Available nodes IPv4 addresses.
:type nodes: dict
:type nodes_addr: dict
- :return: Affected interfaces as list of (node, interface) tuples.
+ :returns: Affected interfaces as list of (node, interface) tuples.
:rtype: list
"""
interfaces = []
:rtype: list
"""
interfaces = []
:type node: dict
:type iface_key: str
:type nodes_addr: dict
:type node: dict
:type iface_key: str
:type nodes_addr: dict
+ :returns: IPv4 address.
:rtype: str
"""
interface = Topology.get_interface_name(node, iface_key)
:rtype: str
"""
interface = Topology.get_interface_name(node, iface_key)
:param nodes_addr: Available nodes IPv6 addresses.
:type nodes: dict
:type nodes_addr: dict
:param nodes_addr: Available nodes IPv6 addresses.
:type nodes: dict
:type nodes_addr: dict
- :return: Affected interfaces as list of (node, interface) tuples.
+ :returns: Affected interfaces as list of (node, interface) tuples.
:rtype: list
"""
interfaces = []
:rtype: list
"""
interfaces = []
:type addr: str
:type prefix: str
"""
:type addr: str
:type prefix: str
"""
sw_if_index = Topology.get_interface_sw_index(node, iface_key)
with VatTerminal(node) as vat:
vat.vat_terminal_exec_cmd_from_template('add_ip_address.vat',
sw_if_index = Topology.get_interface_sw_index(node, iface_key)
with VatTerminal(node) as vat:
vat.vat_terminal_exec_cmd_from_template('add_ip_address.vat',
:param nodes_addr: Available nodes IPv6 addresses.
:type link: str
:type nodes_addr: dict
:param nodes_addr: Available nodes IPv6 addresses.
:type link: str
:type nodes_addr: dict
- :return: Link IPv6 address.
+ :returns: Link IPv6 address.
:rtype: str
"""
net = nodes_addr.get(link)
:rtype: str
"""
net = nodes_addr.get(link)
:param nodes_addr: Available nodes IPv6 addresses.
:type link: str
:type nodes_addr: dict
:param nodes_addr: Available nodes IPv6 addresses.
:type link: str
:type nodes_addr: dict
- :return: Link IPv6 address prefix.
+ :returns: Link IPv6 address prefix.
:rtype: int
"""
net = nodes_addr.get(link)
:rtype: int
"""
net = nodes_addr.get(link)
- def vpp_lisp_eid_table_mapping(node, vni, bd=None, vrf=None):
+ def vpp_lisp_eid_table_mapping(node, vni, bd_id=None, vrf=None):
"""
Map LISP VNI to either bridge domain ID, or VRF ID.
:param node: VPP node.
:param vni: Lisp VNI.
"""
Map LISP VNI to either bridge domain ID, or VRF ID.
:param node: VPP node.
:param vni: Lisp VNI.
- :param bd: Bridge domain ID.
+ :param bd_id: Bridge domain ID.
:param vrf: VRF id.
:type node: dict
:type vni: int
:param vrf: VRF id.
:type node: dict
:type vni: int
- if bd:
- bd_or_vrf = 'bd_index {}'.format(bd)
+ if bd_id:
+ bd_or_vrf = 'bd_index {}'.format(bd_id)
else:
bd_or_vrf = 'vrf {}'.format(vrf)
VatExecutor.cmd_from_template(node,
else:
bd_or_vrf = 'vrf {}'.format(vrf)
VatExecutor.cmd_from_template(node,
:param node: VPP node.
:type node: dict
:param node: VPP node.
:type node: dict
- :return: Lisp gpe state.
+ :returns: Lisp gpe state.
retrieved - local, remote, empty string = both.
:type node: dict
:type items_filter: str
retrieved - local, remote, empty string = both.
:type node: dict
:type items_filter: str
- :return: Lisp locator_set data as python list.
+ :returns: Lisp locator_set data as python list.
:param node: VPP node.
:type node: dict
:param node: VPP node.
:type node: dict
- :return: Lisp eid table as python list.
+ :returns: Lisp eid table as python list.
:param node: VPP node.
:type node: dict
:param node: VPP node.
:type node: dict
- :return: Lisp map resolver as python list.
+ :returns: Lisp map resolver as python list.
:param locator_set_number: Generate n locator_set.
:type node: dict
:type locator_set_number: str
:param locator_set_number: Generate n locator_set.
:type node: dict
:type locator_set_number: str
- :return: list of lisp locator_set, list of lisp locator_set expected
+ :returns: list of lisp locator_set, list of lisp locator_set expected
from VAT.
:rtype: tuple
"""
from VAT.
:rtype: tuple
"""
:param locator_set_number: Generate n locator_set.
:type node: dict
:type locator_set_number: str
:param locator_set_number: Generate n locator_set.
:type node: dict
:type locator_set_number: str
- :return: list of lisp locator_set, list of lisp locator_set expected
+ :returns: list of lisp locator_set, list of lisp locator_set expected
from VAT.
:rtype: tuple
"""
from VAT.
:rtype: tuple
"""
locator_set_list.append(locator_set)
locator_set_vat = {"ls_name": l_name,
locator_set_list.append(locator_set)
locator_set_vat = {"ls_name": l_name,
locator_set_list_vat.append(locator_set_vat)
return locator_set_list, locator_set_list_vat
locator_set_list_vat.append(locator_set_vat)
return locator_set_list, locator_set_list_vat
:type psid_offset: int
:type psid_len: int
:type map_t: bool
:type psid_offset: int
:type psid_len: int
:type map_t: bool
- :return: Index of created map domain.
+ :returns: Index of created map domain.
:rtype: int
:raises RuntimeError: If unable to add map domain.
"""
:rtype: int
:raises RuntimeError: If unable to add map domain.
"""
:type port: int
:type psid_len: int
:type psid_offset: int
:type port: int
:type psid_len: int
:type psid_offset: int
:rtype: int
"""
ones = 2**16-1
:rtype: int
"""
ones = 2**16-1
:type ea_bit_len: int
:type psid_len: int
:type psid: int
:type ea_bit_len: int
:type psid_len: int
:type psid: int
- :return: Number representing EA bit field of destination IPv6 address.
+ :returns: Number representing EA bit field of destination IPv6 address.
- v4_suffix_len = ipv4_net._max_prefixlen - ipv4_net.prefixlen
+ v4_suffix_len = ipv4_net.max_prefixlen - ipv4_net.prefixlen
v4_suffix = ipv4_net.network_address._ip ^ ipv4_host._ip
if ipv4_net.prefixlen + ea_bit_len <= 32:
v4_suffix = ipv4_net.network_address._ip ^ ipv4_host._ip
if ipv4_net.prefixlen + ea_bit_len <= 32:
:type dst_ip: ipaddress.IPv4Address
:type ea_bit_len: int
:type psid: int
:type dst_ip: ipaddress.IPv4Address
:type ea_bit_len: int
:type psid: int
- :return: Number representing interface id field of destination IPv6
+ :returns: Number representing interface id field of destination IPv6
address.
:rtype: int
"""
if rule_net.prefixlen + ea_bit_len < 32:
address.
:rtype: int
"""
if rule_net.prefixlen + ea_bit_len < 32:
- v4_suffix_len = rule_net._max_prefixlen - rule_net.prefixlen
+ v4_suffix_len = rule_net.max_prefixlen - rule_net.prefixlen
v4_suffix = rule_net.network_address._ip ^ dst_ip._ip
ea_bits = v4_suffix >> (v4_suffix_len - ea_bit_len)
address = rule_net.network_address._ip >> v4_suffix_len
v4_suffix = rule_net.network_address._ip ^ dst_ip._ip
ea_bits = v4_suffix >> (v4_suffix_len - ea_bit_len)
address = rule_net.network_address._ip >> v4_suffix_len
address |= ea_bits
address <<= 32 - rule_net.prefixlen - ea_bit_len
address <<= 16
address |= ea_bits
address <<= 32 - rule_net.prefixlen - ea_bit_len
address <<= 16
- # psid_field = 0
- # address = address | psid_field
elif rule_net.prefixlen + ea_bit_len == 32:
address = dst_ip._ip << 16
elif rule_net.prefixlen + ea_bit_len == 32:
address = dst_ip._ip << 16
- # psid_field = 0
- # address = address | psid_field
else:
address = dst_ip._ip << 16
address |= psid
else:
address = dst_ip._ip << 16
address |= psid
:type psid_len: int
:type ipv4_dst: str
:type dst_port: int
:type psid_len: int
:type ipv4_dst: str
:type dst_port: int
- :return: Computed IPv6 address.
+ :returns: Computed IPv6 address.
:rtype: str
"""
ipv6_net = ipaddress.ip_network(unicode(ipv6_pfx))
ipv4_net = ipaddress.ip_network(unicode(ipv4_pfx))
ipv4_host = ipaddress.ip_address(unicode(ipv4_dst))
:rtype: str
"""
ipv6_net = ipaddress.ip_network(unicode(ipv6_pfx))
ipv4_net = ipaddress.ip_network(unicode(ipv4_pfx))
ipv4_host = ipaddress.ip_address(unicode(ipv4_dst))
- ipv6_host_len = ipv6_net._max_prefixlen - ipv6_net.prefixlen
- ipv4_host_len = ipv4_net._max_prefixlen - ipv4_net.prefixlen
+ ipv6_host_len = ipv6_net.max_prefixlen - ipv6_net.prefixlen
end_user_v6_pfx_len = ipv6_net.prefixlen + ea_bit_len
psid = Map.get_psid_from_port(dst_port, psid_len, psid_offset)
rule_v6_pfx = ipv6_net.network_address._ip >> ipv6_host_len
ea_bits = Map._make_ea_bits(ipv4_net, ipv4_host, ea_bit_len, psid_len,
psid)
end_user_v6_pfx_len = ipv6_net.prefixlen + ea_bit_len
psid = Map.get_psid_from_port(dst_port, psid_len, psid_offset)
rule_v6_pfx = ipv6_net.network_address._ip >> ipv6_host_len
ea_bits = Map._make_ea_bits(ipv4_net, ipv4_host, ea_bit_len, psid_len,
psid)
interface_id = Map._make_interface_id(ipv4_net, ipv4_host, ea_bit_len,
psid)
interface_id = Map._make_interface_id(ipv4_net, ipv4_host, ea_bit_len,
psid)
:param ipv4_src: IPv4 source address
:type ipv6_pfx: str
:type ipv4_src: str
:param ipv4_src: IPv4 source address
:type ipv6_pfx: str
:type ipv4_src: str
- :return: IPv6 address, combination of IPv6 prefix and IPv4 address.
+ :returns: IPv6 address, combination of IPv6 prefix and IPv4 address.
:rtype: str
"""
ipv6_net = ipaddress.ip_network(unicode(ipv6_pfx))
:rtype: str
"""
ipv6_net = ipaddress.ip_network(unicode(ipv6_pfx))
.. note:: First add at least two nodes to the topology.
"""
nodes = self._nodes
.. note:: First add at least two nodes to the topology.
"""
nodes = self._nodes
- nodes_filer = self._nodes_filter
if len(nodes) < 2:
raise RuntimeError('Not enough nodes to compute path')
if len(nodes) < 2:
raise RuntimeError('Not enough nodes to compute path')
def next_interface(self):
"""Path interface iterator.
def next_interface(self):
"""Path interface iterator.
- :return: Interface and node or None if not next interface.
+ :returns: Interface and node or None if not next interface.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
def first_interface(self):
"""Return first interface on the path.
def first_interface(self):
"""Return first interface on the path.
- :return: Interface and node.
+ :returns: Interface and node.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
def last_interface(self):
"""Return last interface on the path.
def last_interface(self):
"""Return last interface on the path.
- :return: Interface and node.
+ :returns: Interface and node.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
def first_ingress_interface(self):
"""Return first ingress interface on the path.
def first_ingress_interface(self):
"""Return first ingress interface on the path.
- :return: Interface and node.
+ :returns: Interface and node.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
def last_egress_interface(self):
"""Return last egress interface on the path.
def last_egress_interface(self):
"""Return last egress interface on the path.
- :return: Interface and node.
+ :returns: Interface and node.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
chksum = 0xa607
src = 11.11.11.11
dst = 11.11.11.10
chksum = 0xa607
src = 11.11.11.11
dst = 11.11.11.10
###[ ICMP ]###
type = echo-reply
code = 0
###[ ICMP ]###
type = echo-reply
code = 0
__all__ = ['RxQueue', 'TxQueue', 'Interface', 'create_gratuitous_arp_request',
'auto_pad', 'checksum_equal']
__all__ = ['RxQueue', 'TxQueue', 'Interface', 'create_gratuitous_arp_request',
'auto_pad', 'checksum_equal']
-# TODO: http://stackoverflow.com/questions/320232/ensuring-subprocesses-are-dead-on-exiting-python-program
+# TODO: http://stackoverflow.com/questions/320232/
+# ensuring-subprocesses-are-dead-on-exiting-python-program
class PacketVerifier(object):
class PacketVerifier(object):
:param buf: String representation of incoming packet buffer.
:type buf: str
:param buf: String representation of incoming packet buffer.
:type buf: str
- :return: String representation of first packet in buf.
+ :returns: String representation of first packet in buf.
:rtype: str
"""
pkt_len = 0
:rtype: str
"""
pkt_len = 0
if len(buf) < 60:
return None
if len(buf) < 60:
return None
- # print
- # print buf.__repr__()
- # print Ether(buf).__repr__()
- # print len(Ether(buf))
- # print
try:
ether_type = Ether(buf[0:14]).type
except AttributeError:
try:
ether_type = Ether(buf[0:14]).type
except AttributeError:
:param queue: Queue in which this function will push incoming packets.
:type interface_name: str
:type queue: multiprocessing.Queue
:param queue: Queue in which this function will push incoming packets.
:type interface_name: str
:type queue: multiprocessing.Queue
"""
sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, ETH_P_ALL)
sock.bind((interface_name, ETH_P_ALL))
"""
sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, ETH_P_ALL)
sock.bind((interface_name, ETH_P_ALL))
:type ignore: list
:type verbose: bool
:type ignore: list
:type verbose: bool
- :return: Ether() initialized object from packet data.
+ :returns: Ether() initialized object from packet data.
:rtype: scapy.Ether
"""
(rlist, _, _) = select.select([self._sock], [], [], timeout)
:rtype: scapy.Ether
"""
(rlist, _, _) = select.select([self._sock], [], [], timeout)
pkt = self._sock.recv(0x7fff)
pkt_pad = auto_pad(pkt)
pkt = self._sock.recv(0x7fff)
pkt_pad = auto_pad(pkt)
- print 'Received packet on {0} of len {1}'.format(self._ifname, len(pkt))
+ print'Received packet on {0} of len {1}'.format(self._ifname, len(pkt))
if verbose:
Ether(pkt).show2()
print
if verbose:
Ether(pkt).show2()
print
:param timeout: Timeout value in seconds.
:type timeout: int
:param timeout: Timeout value in seconds.
:type timeout: int
- :return: Ether() initialized object from packet data.
+ :returns: Ether() initialized object from packet data.
:rtype: scapy.Ether
"""
return self.rxq.recv(timeout, self.sent_packets)
:rtype: scapy.Ether
"""
return self.rxq.recv(timeout, self.sent_packets)
:type chksum1: uint16
:type chksum2: uint16
:type chksum1: uint16
:type chksum2: uint16
- :return: True if checksums are equivalent, False otherwise.
+ :returns: True if checksums are equivalent, False otherwise.
:rtype: boolean
"""
if chksum1 == 0xFFFF:
:rtype: boolean
"""
if chksum1 == 0xFFFF:
from resources.libraries.python.topology import Topology
from resources.libraries.python.topology import Topology
-# pylint: disable=too-few-public-methods
class PolicerRateType(Enum):
"""Policer rate types."""
KBPS = 'kbps'
class PolicerRateType(Enum):
"""Policer rate types."""
KBPS = 'kbps'
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-instance-attributes
-# pylint: disable=too-many-public-methods
class Policer(object):
"""Policer utilities."""
class Policer(object):
"""Policer utilities."""
exceed_action=exceed_action,
violate_action=violate_action,
color_aware=color_aware)
exceed_action=exceed_action,
violate_action=violate_action,
color_aware=color_aware)
- # pylint: enable=no-member
VatJsonUtil.verify_vat_retval(
out[0],
VatJsonUtil.verify_vat_retval(
out[0],
# create classify table
direction = 'src' if self._classify_match_is_src else 'dst'
# create classify table
direction = 'src' if self._classify_match_is_src else 'dst'
- if 6 == ip_address(unicode(self._classify_match_ip)).version:
+ if ip_address(unicode(self._classify_match_ip)).version == 6:
ip_version = 'ip6'
table_type = PolicerClassifyTableType.IP6_TABLE
else:
ip_version = 'ip6'
table_type = PolicerClassifyTableType.IP6_TABLE
else:
class Routing(object):
"""Routing utilities."""
class Routing(object):
"""Routing utilities."""
- # pylint: disable=too-many-arguments
@staticmethod
def vpp_route_add(node, network, prefix_len, gateway=None,
interface=None, use_sw_index=True, resolve_attempts=10,
@staticmethod
def vpp_route_add(node, network, prefix_len, gateway=None,
interface=None, use_sw_index=True, resolve_attempts=10,
:param node: Dictionary created from topology.
:type tarball: str
:type node: dict
:param node: Dictionary created from topology.
:type tarball: str
:type node: dict
"""
logger.console('Copying tarball to {0}'.format(node['host']))
ssh = SSH()
"""
logger.console('Copying tarball to {0}'.format(node['host']))
ssh = SSH()
:param node: Dictionary created from topology.
:type tarball: str
:type node: dict
:param node: Dictionary created from topology.
:type tarball: str
:type node: dict
"""
logger.console('Extracting tarball to {0} on {1}'.format(
con.REMOTE_FW_DIR, node['host']))
"""
logger.console('Extracting tarball to {0} on {1}'.format(
con.REMOTE_FW_DIR, node['host']))
:param args: All parameters needed to setup one node.
:type args: tuple
:param args: All parameters needed to setup one node.
:type args: tuple
- :return: nothing
- :return: True - success, False - error
+ :returns: True - success, False - error
:rtype: bool
"""
tarball, remote_tarball, node = args
:rtype: bool
"""
tarball, remote_tarball, node = args
:param tarball: Path to tarball to upload.
:type tarball: str
:param tarball: Path to tarball to upload.
:type tarball: str
"""
call(split('sh -c "rm {0} > /dev/null 2>&1"'.format(tarball)))
"""
call(split('sh -c "rm {0} > /dev/null 2>&1"'.format(tarball)))
-class SetupFramework(object): # pylint: disable=too-few-public-methods
+class SetupFramework(object):
"""Setup suite run on topology nodes.
Many VAT/CLI based tests need the scripts at remote hosts before executing
"""Setup suite run on topology nodes.
Many VAT/CLI based tests need the scripts at remote hosts before executing
logger.trace('Test framework copied to all topology nodes')
delete_local_tarball(tarball)
logger.console('All nodes are ready')
logger.trace('Test framework copied to all topology nodes')
delete_local_tarball(tarball)
logger.console('All nodes are ready')
# See the License for the specific language governing permissions and
# limitations under the License.
# See the License for the specific language governing permissions and
# limitations under the License.
+"""Packet trace library."""
+
from resources.libraries.python.VatExecutor import VatExecutor
from resources.libraries.python.topology import NodeType
class Trace(object):
from resources.libraries.python.VatExecutor import VatExecutor
from resources.libraries.python.topology import NodeType
class Trace(object):
+ """This class provides methods to manipulate the VPP packet trace."""
@staticmethod
def show_packet_trace_on_all_duts(nodes):
@staticmethod
def show_packet_trace_on_all_duts(nodes):
+ """Show VPP packet trace.
+
+ :param nodes: Nodes from which the packet trace will be displayed.
+ :type nodes: list
+ """
for node in nodes.values():
if node['type'] == NodeType.DUT:
vat = VatExecutor()
for node in nodes.values():
if node['type'] == NodeType.DUT:
vat = VatExecutor()
@staticmethod
def clear_packet_trace_on_all_duts(nodes):
@staticmethod
def clear_packet_trace_on_all_duts(nodes):
+ """Clear VPP packet trace.
+
+ :param nodes: Nodes where the packet trace will be cleared.
+ :type nodes: list
+ """
for node in nodes.values():
if node['type'] == NodeType.DUT:
vat = VatExecutor()
vat.execute_script("clear_trace.vat", node, json_out=False)
for node in nodes.values():
if node['type'] == NodeType.DUT:
vat = VatExecutor()
vat.execute_script("clear_trace.vat", node, json_out=False)
tg_instance = BuiltIn().get_library_instance(
'resources.libraries.python.TrafficGenerator')
tg_instance = BuiltIn().get_library_instance(
'resources.libraries.python.TrafficGenerator')
- if tg_instance._node['subtype'] is None:
+ if tg_instance.node['subtype'] is None:
raise RuntimeError('TG subtype not defined')
raise RuntimeError('TG subtype not defined')
- elif tg_instance._node['subtype'] == NodeSubTypeTG.TREX:
+ elif tg_instance.node['subtype'] == NodeSubTypeTG.TREX:
unit_rate = str(rate) + self.get_rate_type_str()
tg_instance.trex_stl_start_remote_exec(self.get_duration(),
unit_rate, frame_size,
unit_rate = str(rate) + self.get_rate_type_str()
tg_instance.trex_stl_start_remote_exec(self.get_duration(),
unit_rate, frame_size,
'resources.libraries.python.TrafficGenerator')
return tg_instance.get_latency_int()
'resources.libraries.python.TrafficGenerator')
return tg_instance.get_latency_int()
class TrafficGenerator(object):
"""Traffic Generator."""
class TrafficGenerator(object):
"""Traffic Generator."""
# T-REX interface order mapping
self._ifaces_reordered = 0
# T-REX interface order mapping
self._ifaces_reordered = 0
+ @property
+ def node(self):
+ """Getter.
+
+ :returns: Traffic generator node.
+ :rtype: dict
+ """
+ return self._node
+
def get_loss(self):
"""Return number of lost packets.
def get_loss(self):
"""Return number of lost packets.
:param node: VPP node.
:param additional_cmds: Additional commands that the vpp should print
:param node: VPP node.
:param additional_cmds: Additional commands that the vpp should print
:type node: dict
:type additional_cmds: tuple
"""
:type node: dict
:type additional_cmds: tuple
"""
def_setting_tb_displayed['Custom Setting: {}'.format(cmd)] = cmd
ssh = SSH()
ssh.connect(node)
def_setting_tb_displayed['Custom Setting: {}'.format(cmd)] = cmd
ssh = SSH()
ssh.connect(node)
- print("=" * 40)
- for key, value in def_setting_tb_displayed.iteritems():
- (_, stdout, _) = ssh.exec_command_sudo('vppctl sh {}'.format(value))
- print("{} : {} \n".format(key, value))
- print(stdout)
- print("=" * 40)
+ for _, value in def_setting_tb_displayed.items():
+ ssh.exec_command_sudo('vppctl sh {}'.format(value))
Clean up VAT JSON output from clutter like vat# prompts and such.
:param json_output: Cluttered JSON output.
Clean up VAT JSON output from clutter like vat# prompts and such.
:param json_output: Cluttered JSON output.
- :return: Cleaned up output JSON string.
+ :returns: Cleaned up output JSON string.
"""
logger.debug("Executing command in VAT terminal: {}".format(cmd))
try:
"""
logger.debug("Executing command in VAT terminal: {}".format(cmd))
try:
- out = self._ssh.interactive_terminal_exec_command(self._tty,
- cmd,
- self.__VAT_PROMPT)
+ out = self._ssh.interactive_terminal_exec_command(self._tty, cmd,
+ self.__VAT_PROMPT)
except:
self._exec_failure = True
raise
except:
self._exec_failure = True
raise
array_start = out.find('[')
array_end = out.rfind(']')
array_start = out.find('[')
array_end = out.rfind(']')
- if -1 == obj_start and -1 == array_start:
+ if obj_start == -1 and array_start == -1:
raise RuntimeError("VAT: no JSON data.")
raise RuntimeError("VAT: no JSON data.")
- if obj_start < array_start or -1 == array_start:
+ if obj_start < array_start or array_start == -1:
start = obj_start
end = obj_end + 1
else:
start = obj_start
end = obj_end + 1
else:
:param node: Node to dump stats table on.
:type node: dict
:param node: Node to dump stats table on.
:type node: dict
"""
with VatTerminal(node) as vat:
vat.vat_terminal_exec_cmd('want_stats enable')
"""
with VatTerminal(node) as vat:
vat.vat_terminal_exec_cmd('want_stats enable')
return None
def vpp_get_ipv4_interface_counter(self, node, interface):
return None
def vpp_get_ipv4_interface_counter(self, node, interface):
+ """
+
+ :param node: Node to get interface IPv4 counter on.
+ :param interface: Interface name.
+ :type node: dict
+ :type interface: str
+ :returns: Interface IPv4 counter.
+ :rtype: int
+ """
return self.vpp_get_ipv46_interface_counter(node, interface, False)
def vpp_get_ipv6_interface_counter(self, node, interface):
return self.vpp_get_ipv46_interface_counter(node, interface, False)
def vpp_get_ipv6_interface_counter(self, node, interface):
+ """
+
+ :param node: Node to get interface IPv6 counter on.
+ :param interface: Interface name.
+ :type node: dict
+ :type interface: str
+ :returns: Interface IPv6 counter.
+ :rtype: int
+ """
return self.vpp_get_ipv46_interface_counter(node, interface, True)
def vpp_get_ipv46_interface_counter(self, node, interface, is_ipv6=True):
return self.vpp_get_ipv46_interface_counter(node, interface, True)
def vpp_get_ipv46_interface_counter(self, node, interface, is_ipv6=True):
:type node: dict
:type interface: str
:type is_ipv6: bool
:type node: dict
:type interface: str
:type is_ipv6: bool
- :return: Interface IPv4/IPv6 counter.
+ :returns: Interface IPv4/IPv6 counter.
:rtype: int
"""
version = 'ip6' if is_ipv6 else 'ip4'
:rtype: int
"""
version = 'ip6' if is_ipv6 else 'ip4'
# See the License for the specific language governing permissions and
# limitations under the License.
# See the License for the specific language governing permissions and
# limitations under the License.
+"""Constants used in CSIT."""
+
+ """Constants used in CSIT."""
+
# OpenVPP testing directory location at topology nodes
REMOTE_FW_DIR = '/tmp/openvpp-testing'
# OpenVPP testing directory location at topology nodes
REMOTE_FW_DIR = '/tmp/openvpp-testing'
suffix_dict = {"l2": "eth",
"l3_ip4": "ipv4",
"l3_ip6": "ipv6",
suffix_dict = {"l2": "eth",
"l3_ip4": "ipv4",
"l3_ip6": "ipv6",
try:
suffix = suffix_dict[layer]
except KeyError:
try:
suffix = suffix_dict[layer]
except KeyError:
}
except KeyError:
raise ValueError("Unknown network layer {0}. "
}
except KeyError:
raise ValueError("Unknown network layer {0}. "
- "Valid options are: {1}".format(
- layer, layers.keys()))
+ "Valid options are: {1}".
+ format(layer, layers.keys()))
status_code, resp = HcUtil.put_honeycomb_data(
node, "config_vpp_interfaces", data, path)
status_code, resp = HcUtil.put_honeycomb_data(
node, "config_vpp_interfaces", data, path)
import HoneycombUtil as HcUtil
import HoneycombUtil as HcUtil
-# pylint: disable=too-many-public-methods
-# pylint: disable=too-many-lines
class InterfaceKeywords(object):
"""Keywords for Interface manipulation.
class InterfaceKeywords(object):
"""Keywords for Interface manipulation.
node, super_interface, path, address)
@staticmethod
node, super_interface, path, address)
@staticmethod
- def remove_all_ipv4_addresses_from_sub_interface(node, super_interface, # pylint: disable=invalid-name
+ def remove_all_ipv4_addresses_from_sub_interface(node, super_interface,
identifier):
"""Remove all ipv4 addresses from the specified sub-interface.
identifier):
"""Remove all ipv4 addresses from the specified sub-interface.
:param url_file: URL file. The argument contains only the name of file
without extension, not the full path.
:type url_file: str
:param url_file: URL file. The argument contains only the name of file
without extension, not the full path.
:type url_file: str
- :return: Requested path.
+ :returns: Requested path.
:param path: Path to data we want to find.
:type data: dict
:type path: tuple
:param path: Path to data we want to find.
:type data: dict
:type path: tuple
- :return: Data represented by path.
+ :returns: Data represented by path.
:rtype: str, dict, or list
:raises HoneycombError: If the data has not been found.
"""
:rtype: str, dict, or list
:raises HoneycombError: If the data has not been found.
"""
:param path: Path to data we want to remove.
:type data: dict
:type path: tuple
:param path: Path to data we want to remove.
:type data: dict
:type path: tuple
- :return: Original data without removed part.
+ :returns: Original data without removed part.
:type data: dict
:type path: tuple
:type new_value: str, dict or list
:type data: dict
:type path: tuple
:type new_value: str, dict or list
- :return: Original data with the new value.
+ :returns: Original data with the new value.
:type node: dict
:type url_file: str
:type path: str
:type node: dict
:type url_file: str
:type path: str
- :return: Status code and content of response.
+ :returns: Status code and content of response.
:rtype tuple
"""
base_path = HoneycombUtil.read_path_from_url_file(url_file)
path = base_path + path
status_code, resp = HTTPRequest.get(node, path)
:rtype tuple
"""
base_path = HoneycombUtil.read_path_from_url_file(url_file)
path = base_path + path
status_code, resp = HTTPRequest.get(node, path)
- (status_node, response) = status_code, loads(resp)
if status_code != HTTPCodes.OK:
HoneycombUtil.read_log_tail(node)
return status_code, response
if status_code != HTTPCodes.OK:
HoneycombUtil.read_log_tail(node)
return status_code, response
:type data: dict, str
:type path: str
:type data_representation: DataRepresentation
:type data: dict, str
:type path: str
:type data_representation: DataRepresentation
- :return: Status code and content of response.
+ :returns: Status code and content of response.
:rtype: tuple
:raises HoneycombError: If the given data representation is not defined
in HEADERS.
:rtype: tuple
:raises HoneycombError: If the given data representation is not defined
in HEADERS.
:type data: dict, str
:type data_representation: DataRepresentation
:type timeout: int
:type data: dict, str
:type data_representation: DataRepresentation
:type timeout: int
- :return: Status code and content of response.
+ :returns: Status code and content of response.
:rtype: tuple
:raises HoneycombError: If the given data representation is not defined
in HEADERS.
:rtype: tuple
:raises HoneycombError: If the given data representation is not defined
in HEADERS.
:type node: dict
:type url_file: str
:type path: str
:type node: dict
:type url_file: str
:type path: str
- :return: Status code and content of response.
+ :returns: Status code and content of response.
:param lines: Number of lines to read.
:type node: dict
:type lines: int
:param lines: Number of lines to read.
:type node: dict
:type lines: int
- :return: Last N log lines.
+ :returns: Last N log lines.
"""Keywords used to connect to Honeycomb through Netconf, send messages
and receive replies."""
"""Keywords used to connect to Honeycomb through Netconf, send messages
and receive replies."""
from time import time
import paramiko
from time import time
import paramiko
from robot.api import logger
from interruptingcow import timeout
from robot.api import logger
from interruptingcow import timeout
password=node['honeycomb']['passwd'],
pkey=None,
port=node['honeycomb']['netconf_port'],
password=node['honeycomb']['passwd'],
pkey=None,
port=node['honeycomb']['netconf_port'],
logger.trace('Connect took {0} seconds'.format(time() - start))
logger.debug('New ssh: {0}'.format(client))
logger.trace('Connect took {0} seconds'.format(time() - start))
logger.debug('New ssh: {0}'.format(client))
:type size:int
:type time_out:int
:type err:str
:type size:int
:type time_out:int
:type err:str
- :return: Content of response.
+ :returns: Content of response.
:rtype: str
:raises HoneycombError: If the read process times out.
"""
:rtype: str
:raises HoneycombError: If the read process times out.
"""
break
except socket.timeout:
raise HoneycombError("Socket timeout.",
break
except socket.timeout:
raise HoneycombError("Socket timeout.",
- enable_logging=False
- )
except RuntimeError:
raise HoneycombError(err + " Content of buffer: {0}".format(reply),
except RuntimeError:
raise HoneycombError(err + " Content of buffer: {0}".format(reply),
- enable_logging=False
- )
logger.trace(reply)
return reply.replace(self.delimiter, "")
logger.trace(reply)
return reply.replace(self.delimiter, "")
:param time_out: Timeout value for getting the complete response.
:type size:int
:type time_out:int
:param time_out: Timeout value for getting the complete response.
:type size:int
:type time_out:int
- :return: Content of response.
+ :returns: Content of response.
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+"""
+__init__ file for resources/libraries/python/parsers
+"""
ssh = SSH()
try:
ssh.connect(node)
ssh = SSH()
try:
ssh.connect(node)
- except Exception as err:
+ except SSHException as err:
logger.error("Failed to connect to node" + str(err))
return None, None, None
logger.error("Failed to connect to node" + str(err))
return None, None, None
else:
(ret_code, stdout, stderr) = ssh.exec_command_sudo(cmd,
timeout=timeout)
else:
(ret_code, stdout, stderr) = ssh.exec_command_sudo(cmd,
timeout=timeout)
- except Exception as err:
+ except SSHException as err:
logger.error(err)
return None, None, None
logger.error(err)
return None, None, None
IntField("Sequence Number", 0),
IntField("Observation Domain ID", 0),
ShortField("Set_ID", 0),
IntField("Sequence Number", 0),
IntField("Observation Domain ID", 0),
ShortField("Set_ID", 0),
- ShortField("Set_Length", 0)
- ]
+ ShortField("Set_Length", 0)]
class IPFIXTemplate(Packet):
class IPFIXTemplate(Packet):
fields_desc = [ShortField("Template_ID", 256),
ShortField("nFields", 2),
FieldListField("Template", [], ShortField("type_len", ""),
fields_desc = [ShortField("Template_ID", 256),
ShortField("nFields", 2),
FieldListField("Template", [], ShortField("type_len", ""),
- count_from=lambda p: p.nFields*2)
- ]
+ count_from=lambda p: p.nFields*2)]
from resources.libraries.python.VatExecutor import VatTerminal
from resources.libraries.python.VatExecutor import VatTerminal
-# pylint: disable=too-few-public-methods
class SPAN(object):
"""Class contains methods for setting up SPAN mirroring on DUTs."""
class SPAN(object):
"""Class contains methods for setting up SPAN mirroring on DUTs."""
with VatTerminal(node, json_param=False) as vat:
vat.vat_terminal_exec_cmd_from_template('span_create.vat',
src_sw_if_index=src_if,
with VatTerminal(node, json_param=False) as vat:
vat.vat_terminal_exec_cmd_from_template('span_create.vat',
src_sw_if_index=src_if,
- dst_sw_if_index=dst_if,
- )
+ dst_sw_if_index=dst_if)
@staticmethod
def vpp_get_span_configuration(node):
@staticmethod
def vpp_get_span_configuration(node):
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+"""
+__init__ file for resources/libraries/python/telemetry
+"""
from yaml import load
from robot.api import logger
from yaml import load
from robot.api import logger
-from robot.libraries.BuiltIn import BuiltIn
+from robot.libraries.BuiltIn import BuiltIn, RobotNotRunningError
from robot.api.deco import keyword
__all__ = ["DICT__nodes", 'Topology']
from robot.api.deco import keyword
__all__ = ["DICT__nodes", 'Topology']
def load_topo_from_yaml():
"""Load topology from file defined in "${TOPOLOGY_PATH}" variable.
def load_topo_from_yaml():
"""Load topology from file defined in "${TOPOLOGY_PATH}" variable.
- :return: Nodes from loaded topology.
+ :returns: Nodes from loaded topology.
"""
try:
topo_path = BuiltIn().get_variable_value("${TOPOLOGY_PATH}")
"""
try:
topo_path = BuiltIn().get_variable_value("${TOPOLOGY_PATH}")
+ except RobotNotRunningError:
return ''
with open(topo_path) as work_file:
return ''
with open(topo_path) as work_file:
try:
if isinstance(iface_key, basestring):
return node['interfaces'][iface_key].get('vpp_sw_index')
try:
if isinstance(iface_key, basestring):
return node['interfaces'][iface_key].get('vpp_sw_index')
- #FIXME: use only iface_key, do not use integer
+ # TODO: use only iface_key, do not use integer
else:
return int(iface_key)
except (KeyError, ValueError):
else:
return int(iface_key)
except (KeyError, ValueError):
| | ... | ${settings['weight']}
| | Vpp Lisp Eid Table Mapping | ${dut_node}
| | ... | ${settings['vni']}
| | ... | ${settings['weight']}
| | Vpp Lisp Eid Table Mapping | ${dut_node}
| | ... | ${settings['vni']}
-| | ... | bd=${settings['bd']}
+| | ... | bd_id=${settings['bd']}
| | Vpp Add Lisp Local Eid | ${dut_node}
| | ... | ${settings['locator_name']}
| | ... | ${settings['vni']}
| | Vpp Add Lisp Local Eid | ${dut_node}
| | ... | ${settings['locator_name']}
| | ... | ${settings['vni']}