no-space-check=trailing-comma,dict-separator
# Maximum number of lines in a module
-max-module-lines=1000
+max-module-lines=2000
# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
# tab).
function-rgx=[a-z_][a-z0-9_]{2,30}$
# Regular expression which should only match correct method names
-method-rgx=[a-z_][a-z0-9_]{2,40}$
+method-rgx=[a-z_][a-z0-9_]{2,50}$
# Regular expression which should only match correct instance attribute names
attr-rgx=[a-z_][a-z0-9_]{2,30}$
[DESIGN]
# Maximum number of arguments for function / method
-max-args=10
+max-args=12
# Argument names that match this expression will be ignored. Default to name
# with leading underscore
min-public-methods=0
# Maximum number of public methods for a class (see R0904).
-max-public-methods=50
+max-public-methods=60
[EXCEPTIONS]
from resources.libraries.python.VatExecutor import VatExecutor, VatTerminal
-# pylint: disable=too-many-arguments, invalid-name
-
class Classify(object):
"""Classify utilities."""
:type node: dict
:type ip_version: str
:type direction: str
- :return (table_index, skip_n, match_n)
+ :returns (table_index, skip_n, match_n)
table_index: Classify table index.
skip_n: Number of skip vectors.
match_n: Number of match vectors.
:param direction: Direction of traffic - src/dst.
:type node: dict
:type direction: str
- :return (table_index, skip_n, match_n)
+ :returns (table_index, skip_n, match_n)
table_index: Classify table index.
skip_n: Number of skip vectors.
match_n: Number of match vectors.
:param hex_mask: Classify hex mask.
:type node: dict
:type hex_mask: str
- :return (table_index, skip_n, match_n)
+ :returns (table_index, skip_n, match_n)
table_index: Classify table index.
skip_n: Number of skip vectors.
match_n: Number of match vectors.
:type ip_version: str
:type protocol: str
:type direction: str
- :return: Classify hex mask.
+ :returns: Classify hex mask.
:rtype : str
:raises ValueError: If protocol is not TCP or UDP.
:raises ValueError: If direction is not source or destination or
:type hex_mask: str
:type source_port: str
:type destination_port: str
- :return: Classify hex value.
+ :returns: Classify hex value.
:rtype: str
"""
source_port_hex = Classify._port_convert(source_port)
:param port: TCP/UDP port number.
:type port: str
- :return: TCP/UDP port number in 4-digit hexadecimal format.
+ :returns: TCP/UDP port number in 4-digit hexadecimal format.
:rtype: str
"""
return '{0:04x}'.format(int(port))
:param table_index: Index of a specific classify table.
:type node: dict
:type table_index: int
- :return: Classify table settings.
+ :returns: Classify table settings.
:rtype: dict
"""
with VatTerminal(node) as vat:
:type node: dict
:type table_index: int
:type session_index: int
- :return: List of classify session settings, or a dictionary of settings
+ :returns: List of classify session settings, or a dictionary of settings
for a specific classify session.
:rtype: list or dict
"""
ssh.connect(node)
(ret_code, stdout, stderr) = \
- ssh.exec_command('sudo -Sn bash {0}/{1}/dut_setup.sh'.format(
- Constants.REMOTE_FW_DIR, Constants.RESOURCES_LIB_SH), timeout=120)
+ ssh.exec_command('sudo -Sn bash {0}/{1}/dut_setup.sh'.
+ format(Constants.REMOTE_FW_DIR,
+ Constants.RESOURCES_LIB_SH), timeout=120)
logger.trace(stdout)
logger.trace(stderr)
if int(ret_code) != 0:
if int(ret_code) != 0:
logger.debug('Not possible to get PID of VPP process on node: '
- '"{1}"'.format(node['host'], stdout + stderr))
+ '{0}\n {1}'.format(node['host'], stdout + stderr))
raise RuntimeError('Not possible to get PID of VPP process on node:'
' {}'.format(node['host']))
class DpdkUtil(object):
"""Utilities for DPDK."""
- #pylint: disable=too-many-locals
@staticmethod
def dpdk_testpmd_start(node, **args):
"""Start DPDK testpmd app on VM node.
@staticmethod
def setup_network_namespace(node, namespace_name, interface_name,
- ip_address, prefix):
+ ip_addr, prefix):
"""Setup namespace on given node and attach interface and IP to
this namespace. Applicable also on TG node.
:param node: Node to set namespace on.
:param namespace_name: Namespace name.
:param interface_name: Interface name.
- :param ip_address: IP address of namespace's interface.
+ :param ip_addr: IP address of namespace's interface.
:param prefix: IP address prefix length.
:type node: dict
:type namespace_name: str
:type vhost_if: str
- :type ip_address: str
+ :type ip_addr: str
:type prefix: int
"""
cmd = ('ip netns add {0}'.format(namespace_name))
exec_cmd_no_error(node, cmd, sudo=True)
cmd = ('ip netns exec {0} ip addr add {1}/{2} dev {3}'.format(
- namespace_name, ip_address, prefix, interface_name))
+ namespace_name, ip_addr, prefix, interface_name))
exec_cmd_no_error(node, cmd, sudo=True)
@staticmethod
exec_cmd_no_error(node, cmd, sudo=True)
@staticmethod
- def set_linux_interface_ip(node, interface, ip, prefix, namespace=None):
+ def set_linux_interface_ip(node, interface, ip_addr, prefix,
+ namespace=None):
"""Set IP address to interface in linux.
:param node: Node where to execute command.
:param interface: Interface in namespace.
- :param ip: IP to be set on interface.
+ :param ip_addr: IP to be set on interface.
:param prefix: IP prefix.
:param namespace: Execute command in namespace. Optional
:type node: dict
:type interface: str
- :type ip: str
+ :type ip_addr: str
:type prefix: int
:type namespace: str
:raises RuntimeError: IP could not be set.
"""
if namespace is not None:
cmd = 'ip netns exec {} ip addr add {}/{} dev {}'.format(
- namespace, ip, prefix, interface)
+ namespace, ip_addr, prefix, interface)
else:
- cmd = 'ip addr add {}/{} dev {}'.format(ip, prefix, interface)
- (rc, _, stderr) = exec_cmd(node, cmd, timeout=5, sudo=True)
- if rc != 0:
+ cmd = 'ip addr add {}/{} dev {}'.format(ip_addr, prefix, interface)
+ (ret_code, _, stderr) = exec_cmd(node, cmd, timeout=5, sudo=True)
+ if ret_code != 0:
raise RuntimeError(
'Could not set IP for interface, reason:{}'.format(stderr))
Example: mask 255.255.0.0 -> prefix length 16
:param network: Network mask or network prefix length.
:type network: str or int
- :return: Network mask or network prefix length.
+ :returns: Network mask or network prefix length.
:rtype: str or int
"""
temp_address = "0.0.0.0"
from resources.libraries.python.VatJsonUtil import VatJsonUtil
-# pylint: disable=too-few-public-methods
class PolicyAction(Enum):
"""Policy actions."""
BYPASS = 'bypass'
class IPsecUtil(object):
"""IPsec utilities."""
- # pylint: disable=too-many-arguments
- # pylint: disable=too-many-locals
-
@staticmethod
def policy_action_bypass():
"""Return policy action bypass.
- :return: PolicyAction enum BYPASS object.
+ :returns: PolicyAction enum BYPASS object.
:rtype: PolicyAction
"""
return PolicyAction.BYPASS
def policy_action_discard():
"""Return policy action discard.
- :return: PolicyAction enum DISCARD object.
+ :returns: PolicyAction enum DISCARD object.
:rtype: PolicyAction
"""
return PolicyAction.DISCARD
def policy_action_protect():
"""Return policy action protect.
- :return: PolicyAction enum PROTECT object.
+ :returns: PolicyAction enum PROTECT object.
:rtype: PolicyAction
"""
return PolicyAction.PROTECT
def crypto_alg_aes_cbc_128():
"""Return encryption algorithm aes-cbc-128.
- :return: CryptoAlg enum AES_CBC_128 object.
+ :returns: CryptoAlg enum AES_CBC_128 object.
:rtype: CryptoAlg
"""
return CryptoAlg.AES_CBC_128
def crypto_alg_aes_cbc_192():
"""Return encryption algorithm aes-cbc-192.
- :return: CryptoAlg enum AES_CBC_192 objec.
+ :returns: CryptoAlg enum AES_CBC_192 objec.
:rtype: CryptoAlg
"""
return CryptoAlg.AES_CBC_192
def crypto_alg_aes_cbc_256():
"""Return encryption algorithm aes-cbc-256.
- :return: CryptoAlg enum AES_CBC_256 object.
+ :returns: CryptoAlg enum AES_CBC_256 object.
:rtype: CryptoAlg
"""
return CryptoAlg.AES_CBC_256
:param crypto_alg: Encryption algorithm.
:type crypto_alg: CryptoAlg
- :return: Key length.
+ :returns: Key length.
:rtype: int
"""
return crypto_alg.key_len
:param crypto_alg: Encryption algorithm.
:type crypto_alg: CryptoAlg
- :return: Algorithm scapy name.
+ :returns: Algorithm scapy name.
:rtype: str
"""
return crypto_alg.scapy_name
def integ_alg_sha1_96():
"""Return integrity algorithm SHA1-96.
- :return: IntegAlg enum SHA1_96 object.
+ :returns: IntegAlg enum SHA1_96 object.
:rtype: IntegAlg
"""
return IntegAlg.SHA1_96
def integ_alg_sha_256_128():
"""Return integrity algorithm SHA-256-128.
- :return: IntegAlg enum SHA_256_128 object.
+ :returns: IntegAlg enum SHA_256_128 object.
:rtype: IntegAlg
"""
return IntegAlg.SHA_256_128
def integ_alg_sha_384_192():
"""Return integrity algorithm SHA-384-192.
- :return: IntegAlg enum SHA_384_192 object.
+ :returns: IntegAlg enum SHA_384_192 object.
:rtype: IntegAlg
"""
return IntegAlg.SHA_384_192
def integ_alg_sha_512_256():
"""Return integrity algorithm SHA-512-256.
- :return: IntegAlg enum SHA_512_256 object.
+ :returns: IntegAlg enum SHA_512_256 object.
:rtype: IntegAlg
"""
return IntegAlg.SHA_512_256
:param integ_alg: Integrity algorithm.
:type integ_alg: IntegAlg
- :return: Key length.
+ :returns: Key length.
:rtype: int
"""
return integ_alg.key_len
:param integ_alg: Integrity algorithm.
:type integ_alg: IntegAlg
- :return: Algorithm scapy name.
+ :returns: Algorithm scapy name.
:rtype: str
"""
return integ_alg.scapy_name
:param prefix_length: Length of network prefix.
:type prefix_length: int
-
- :return: Network mask.
+ :returns: Network mask.
:rtype: str
"""
:type interface: str
:type address: str
:type prefix_length: int
- :return: nothing
+ :returns: nothing
"""
pass
:type prefix_length: int
:type gateway: str
:type interface: str
- :return: nothing
+ :returns: nothing
"""
pass
:type prefix_length: int
:type gateway: str
:type interface: str
- :return: nothing
+ :returns: nothing
"""
pass
:param source_interface: Source interface name.
:type destination_address: str
:type source_interface: str
- :return: nothing
+ :returns: nothing
"""
pass
:param cmd: Command to be executed.
:type cmd: str
-
- :return: Content of stdout and stderr returned by command.
+ :returns: Content of stdout and stderr returned by command.
:rtype: tuple
"""
return exec_cmd_no_error(self.node_info, cmd)
:param cmd: Command to be executed.
:type cmd: str
-
- :return: Content of stdout and stderr returned by command.
+ :returns: Content of stdout and stderr returned by command.
:rtype: tuple
"""
return exec_cmd_no_error(self.node_info, cmd, sudo=True)
:param interface: Interface name.
:type interface: str
- :return: sw_if_index of the interface or None.
+ :returns: sw_if_index of the interface or None.
:rtype: int
"""
return Topology().get_interface_sw_index(self.node_info, interface)
:param args: Parameters to the script.
:type script: str
:type args: dict
- :return: nothing
+ :returns: nothing
"""
# TODO: check return value
VatExecutor.cmd_from_template(self.node_info, script, **args)
:param node_info: Dictionary containing information on nodes in topology.
:type node_info: dict
- :return: Class instance that is derived from Node.
+ :returns: Class instance that is derived from Node.
"""
if node_info['type'] == NodeType.TG:
return Tg(node_info)
:param nodes_addr: Available nodes IPv4 addresses.
:type nodes: dict
:type nodes_addr: dict
- :return: Affected interfaces as list of (node, interface) tuples.
+ :returns: Affected interfaces as list of (node, interface) tuples.
:rtype: list
"""
interfaces = []
:type node: dict
:type iface_key: str
:type nodes_addr: dict
- :return: IPv4 address.
+ :returns: IPv4 address.
:rtype: str
"""
interface = Topology.get_interface_name(node, iface_key)
:param nodes_addr: Available nodes IPv6 addresses.
:type nodes: dict
:type nodes_addr: dict
- :return: Affected interfaces as list of (node, interface) tuples.
+ :returns: Affected interfaces as list of (node, interface) tuples.
:rtype: list
"""
interfaces = []
:type addr: str
:type prefix: str
"""
- topo = Topology()
sw_if_index = Topology.get_interface_sw_index(node, iface_key)
with VatTerminal(node) as vat:
vat.vat_terminal_exec_cmd_from_template('add_ip_address.vat',
:param nodes_addr: Available nodes IPv6 addresses.
:type link: str
:type nodes_addr: dict
- :return: Link IPv6 address.
+ :returns: Link IPv6 address.
:rtype: str
"""
net = nodes_addr.get(link)
:param nodes_addr: Available nodes IPv6 addresses.
:type link: str
:type nodes_addr: dict
- :return: Link IPv6 address prefix.
+ :returns: Link IPv6 address prefix.
:rtype: int
"""
net = nodes_addr.get(link)
"""
@staticmethod
- def vpp_lisp_eid_table_mapping(node, vni, bd=None, vrf=None):
+ def vpp_lisp_eid_table_mapping(node, vni, bd_id=None, vrf=None):
"""
Map LISP VNI to either bridge domain ID, or VRF ID.
:param node: VPP node.
:param vni: Lisp VNI.
- :param bd: Bridge domain ID.
+ :param bd_id: Bridge domain ID.
:param vrf: VRF id.
:type node: dict
:type vni: int
- :type bd: int
+ :type bd_id: int
:type vrf: int
"""
- if bd:
- bd_or_vrf = 'bd_index {}'.format(bd)
+ if bd_id:
+ bd_or_vrf = 'bd_index {}'.format(bd_id)
else:
bd_or_vrf = 'vrf {}'.format(vrf)
VatExecutor.cmd_from_template(node,
:param node: VPP node.
:type node: dict
- :return: Lisp gpe state.
+ :returns: Lisp gpe state.
:rtype: list
"""
retrieved - local, remote, empty string = both.
:type node: dict
:type items_filter: str
- :return: Lisp locator_set data as python list.
+ :returns: Lisp locator_set data as python list.
:rtype: list
"""
:param node: VPP node.
:type node: dict
- :return: Lisp eid table as python list.
+ :returns: Lisp eid table as python list.
:rtype: list
"""
:param node: VPP node.
:type node: dict
- :return: Lisp map resolver as python list.
+ :returns: Lisp map resolver as python list.
:rtype: list
"""
:param locator_set_number: Generate n locator_set.
:type node: dict
:type locator_set_number: str
- :return: list of lisp locator_set, list of lisp locator_set expected
+ :returns: list of lisp locator_set, list of lisp locator_set expected
from VAT.
:rtype: tuple
"""
:param locator_set_number: Generate n locator_set.
:type node: dict
:type locator_set_number: str
- :return: list of lisp locator_set, list of lisp locator_set expected
+ :returns: list of lisp locator_set, list of lisp locator_set expected
from VAT.
:rtype: tuple
"""
locator_set_list.append(locator_set)
locator_set_vat = {"ls_name": l_name,
- "ls_index": num}
+ "ls_index": num}
locator_set_list_vat.append(locator_set_vat)
return locator_set_list, locator_set_list_vat
:type psid_offset: int
:type psid_len: int
:type map_t: bool
- :return: Index of created map domain.
+ :returns: Index of created map domain.
:rtype: int
:raises RuntimeError: If unable to add map domain.
"""
:type port: int
:type psid_len: int
:type psid_offset: int
-
- :return: PSID.
+ :returns: PSID.
:rtype: int
"""
ones = 2**16-1
:type ea_bit_len: int
:type psid_len: int
:type psid: int
- :return: Number representing EA bit field of destination IPv6 address.
+ :returns: Number representing EA bit field of destination IPv6 address.
:rtype: int
"""
- v4_suffix_len = ipv4_net._max_prefixlen - ipv4_net.prefixlen
+ v4_suffix_len = ipv4_net.max_prefixlen - ipv4_net.prefixlen
v4_suffix = ipv4_net.network_address._ip ^ ipv4_host._ip
if ipv4_net.prefixlen + ea_bit_len <= 32:
:type dst_ip: ipaddress.IPv4Address
:type ea_bit_len: int
:type psid: int
- :return: Number representing interface id field of destination IPv6
+ :returns: Number representing interface id field of destination IPv6
address.
:rtype: int
"""
if rule_net.prefixlen + ea_bit_len < 32:
- v4_suffix_len = rule_net._max_prefixlen - rule_net.prefixlen
+ v4_suffix_len = rule_net.max_prefixlen - rule_net.prefixlen
v4_suffix = rule_net.network_address._ip ^ dst_ip._ip
ea_bits = v4_suffix >> (v4_suffix_len - ea_bit_len)
address = rule_net.network_address._ip >> v4_suffix_len
address |= ea_bits
address <<= 32 - rule_net.prefixlen - ea_bit_len
address <<= 16
- # psid_field = 0
- # address = address | psid_field
elif rule_net.prefixlen + ea_bit_len == 32:
address = dst_ip._ip << 16
- # psid_field = 0
- # address = address | psid_field
else:
address = dst_ip._ip << 16
address |= psid
:type psid_len: int
:type ipv4_dst: str
:type dst_port: int
- :return: Computed IPv6 address.
+ :returns: Computed IPv6 address.
:rtype: str
"""
ipv6_net = ipaddress.ip_network(unicode(ipv6_pfx))
ipv4_net = ipaddress.ip_network(unicode(ipv4_pfx))
ipv4_host = ipaddress.ip_address(unicode(ipv4_dst))
- ipv6_host_len = ipv6_net._max_prefixlen - ipv6_net.prefixlen
- ipv4_host_len = ipv4_net._max_prefixlen - ipv4_net.prefixlen
+ ipv6_host_len = ipv6_net.max_prefixlen - ipv6_net.prefixlen
end_user_v6_pfx_len = ipv6_net.prefixlen + ea_bit_len
psid = Map.get_psid_from_port(dst_port, psid_len, psid_offset)
rule_v6_pfx = ipv6_net.network_address._ip >> ipv6_host_len
ea_bits = Map._make_ea_bits(ipv4_net, ipv4_host, ea_bit_len, psid_len,
psid)
- subnet_id = 0
interface_id = Map._make_interface_id(ipv4_net, ipv4_host, ea_bit_len,
psid)
:param ipv4_src: IPv4 source address
:type ipv6_pfx: str
:type ipv4_src: str
- :return: IPv6 address, combination of IPv6 prefix and IPv4 address.
+ :returns: IPv6 address, combination of IPv6 prefix and IPv4 address.
:rtype: str
"""
ipv6_net = ipaddress.ip_network(unicode(ipv6_pfx))
.. note:: First add at least two nodes to the topology.
"""
nodes = self._nodes
- nodes_filer = self._nodes_filter
if len(nodes) < 2:
raise RuntimeError('Not enough nodes to compute path')
def next_interface(self):
"""Path interface iterator.
- :return: Interface and node or None if not next interface.
+ :returns: Interface and node or None if not next interface.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
def first_interface(self):
"""Return first interface on the path.
- :return: Interface and node.
+ :returns: Interface and node.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
def last_interface(self):
"""Return last interface on the path.
- :return: Interface and node.
+ :returns: Interface and node.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
def first_ingress_interface(self):
"""Return first ingress interface on the path.
- :return: Interface and node.
+ :returns: Interface and node.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
def last_egress_interface(self):
"""Return last egress interface on the path.
- :return: Interface and node.
+ :returns: Interface and node.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
chksum = 0xa607
src = 11.11.11.11
dst = 11.11.11.10
- \options \
+ options
###[ ICMP ]###
type = echo-reply
code = 0
__all__ = ['RxQueue', 'TxQueue', 'Interface', 'create_gratuitous_arp_request',
'auto_pad', 'checksum_equal']
-# TODO: http://stackoverflow.com/questions/320232/ensuring-subprocesses-are-dead-on-exiting-python-program
+# TODO: http://stackoverflow.com/questions/320232/
+# ensuring-subprocesses-are-dead-on-exiting-python-program
class PacketVerifier(object):
:param buf: String representation of incoming packet buffer.
:type buf: str
- :return: String representation of first packet in buf.
+ :returns: String representation of first packet in buf.
:rtype: str
"""
pkt_len = 0
if len(buf) < 60:
return None
- # print
- # print buf.__repr__()
- # print Ether(buf).__repr__()
- # print len(Ether(buf))
- # print
try:
ether_type = Ether(buf[0:14]).type
except AttributeError:
:param queue: Queue in which this function will push incoming packets.
:type interface_name: str
:type queue: multiprocessing.Queue
- :return: None
+ :returns: None
"""
sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, ETH_P_ALL)
sock.bind((interface_name, ETH_P_ALL))
:type ignore: list
:type verbose: bool
- :return: Ether() initialized object from packet data.
+ :returns: Ether() initialized object from packet data.
:rtype: scapy.Ether
"""
(rlist, _, _) = select.select([self._sock], [], [], timeout)
pkt = self._sock.recv(0x7fff)
pkt_pad = auto_pad(pkt)
- print 'Received packet on {0} of len {1}'.format(self._ifname, len(pkt))
+ print'Received packet on {0} of len {1}'.format(self._ifname, len(pkt))
if verbose:
Ether(pkt).show2()
print
:param timeout: Timeout value in seconds.
:type timeout: int
- :return: Ether() initialized object from packet data.
+ :returns: Ether() initialized object from packet data.
:rtype: scapy.Ether
"""
return self.rxq.recv(timeout, self.sent_packets)
:type chksum1: uint16
:type chksum2: uint16
- :return: True if checksums are equivalent, False otherwise.
+ :returns: True if checksums are equivalent, False otherwise.
:rtype: boolean
"""
if chksum1 == 0xFFFF:
from resources.libraries.python.topology import Topology
-# pylint: disable=too-few-public-methods
class PolicerRateType(Enum):
"""Policer rate types."""
KBPS = 'kbps'
# pylint: disable=too-many-instance-attributes
-# pylint: disable=too-many-public-methods
class Policer(object):
"""Policer utilities."""
exceed_action=exceed_action,
violate_action=violate_action,
color_aware=color_aware)
- # pylint: enable=no-member
VatJsonUtil.verify_vat_retval(
out[0],
# create classify table
direction = 'src' if self._classify_match_is_src else 'dst'
- if 6 == ip_address(unicode(self._classify_match_ip)).version:
+ if ip_address(unicode(self._classify_match_ip)).version == 6:
ip_version = 'ip6'
table_type = PolicerClassifyTableType.IP6_TABLE
else:
class Routing(object):
"""Routing utilities."""
- # pylint: disable=too-many-arguments
@staticmethod
def vpp_route_add(node, network, prefix_len, gateway=None,
interface=None, use_sw_index=True, resolve_attempts=10,
:param node: Dictionary created from topology.
:type tarball: str
:type node: dict
- :return: nothing
+ :returns: nothing
"""
logger.console('Copying tarball to {0}'.format(node['host']))
ssh = SSH()
:param node: Dictionary created from topology.
:type tarball: str
:type node: dict
- :return: nothing
+ :returns: nothing
"""
logger.console('Extracting tarball to {0} on {1}'.format(
con.REMOTE_FW_DIR, node['host']))
:param args: All parameters needed to setup one node.
:type args: tuple
- :return: nothing
- :return: True - success, False - error
+ :returns: True - success, False - error
:rtype: bool
"""
tarball, remote_tarball, node = args
:param tarball: Path to tarball to upload.
:type tarball: str
- :return: nothing
+ :returns: nothing
"""
call(split('sh -c "rm {0} > /dev/null 2>&1"'.format(tarball)))
-class SetupFramework(object): # pylint: disable=too-few-public-methods
+class SetupFramework(object):
"""Setup suite run on topology nodes.
Many VAT/CLI based tests need the scripts at remote hosts before executing
logger.trace('Test framework copied to all topology nodes')
delete_local_tarball(tarball)
logger.console('All nodes are ready')
-
# See the License for the specific language governing permissions and
# limitations under the License.
+"""Packet trace library."""
+
from resources.libraries.python.VatExecutor import VatExecutor
from resources.libraries.python.topology import NodeType
class Trace(object):
+ """This class provides methods to manipulate the VPP packet trace."""
@staticmethod
def show_packet_trace_on_all_duts(nodes):
+ """Show VPP packet trace.
+
+ :param nodes: Nodes from which the packet trace will be displayed.
+ :type nodes: list
+ """
for node in nodes.values():
if node['type'] == NodeType.DUT:
vat = VatExecutor()
@staticmethod
def clear_packet_trace_on_all_duts(nodes):
+ """Clear VPP packet trace.
+
+ :param nodes: Nodes where the packet trace will be cleared.
+ :type nodes: list
+ """
for node in nodes.values():
if node['type'] == NodeType.DUT:
vat = VatExecutor()
vat.execute_script("clear_trace.vat", node, json_out=False)
-
tg_instance = BuiltIn().get_library_instance(
'resources.libraries.python.TrafficGenerator')
- if tg_instance._node['subtype'] is None:
+ if tg_instance.node['subtype'] is None:
raise RuntimeError('TG subtype not defined')
- elif tg_instance._node['subtype'] == NodeSubTypeTG.TREX:
+ elif tg_instance.node['subtype'] == NodeSubTypeTG.TREX:
unit_rate = str(rate) + self.get_rate_type_str()
tg_instance.trex_stl_start_remote_exec(self.get_duration(),
unit_rate, frame_size,
'resources.libraries.python.TrafficGenerator')
return tg_instance.get_latency_int()
+
class TrafficGenerator(object):
"""Traffic Generator."""
# T-REX interface order mapping
self._ifaces_reordered = 0
+ @property
+ def node(self):
+ """Getter.
+
+ :returns: Traffic generator node.
+ :rtype: dict
+ """
+ return self._node
+
def get_loss(self):
"""Return number of lost packets.
:param node: VPP node.
:param additional_cmds: Additional commands that the vpp should print
- settings for.
+ settings for.
:type node: dict
:type additional_cmds: tuple
"""
def_setting_tb_displayed['Custom Setting: {}'.format(cmd)] = cmd
ssh = SSH()
ssh.connect(node)
- print("=" * 40)
- for key, value in def_setting_tb_displayed.iteritems():
- (_, stdout, _) = ssh.exec_command_sudo('vppctl sh {}'.format(value))
- print("{} : {} \n".format(key, value))
- print(stdout)
- print("=" * 40)
+ for _, value in def_setting_tb_displayed.items():
+ ssh.exec_command_sudo('vppctl sh {}'.format(value))
Clean up VAT JSON output from clutter like vat# prompts and such.
:param json_output: Cluttered JSON output.
- :return: Cleaned up output JSON string.
+ :returns: Cleaned up output JSON string.
"""
retval = json_output
"""
logger.debug("Executing command in VAT terminal: {}".format(cmd))
try:
- out = self._ssh.interactive_terminal_exec_command(self._tty,
- cmd,
- self.__VAT_PROMPT)
+ out = self._ssh.interactive_terminal_exec_command(self._tty, cmd,
+ self.__VAT_PROMPT)
except:
self._exec_failure = True
raise
array_start = out.find('[')
array_end = out.rfind(']')
- if -1 == obj_start and -1 == array_start:
+ if obj_start == -1 and array_start == -1:
raise RuntimeError("VAT: no JSON data.")
- if obj_start < array_start or -1 == array_start:
+ if obj_start < array_start or array_start == -1:
start = obj_start
end = obj_end + 1
else:
:param node: Node to dump stats table on.
:type node: dict
- :return: Stats table.
+ :returns: Stats table.
"""
with VatTerminal(node) as vat:
vat.vat_terminal_exec_cmd('want_stats enable')
return None
def vpp_get_ipv4_interface_counter(self, node, interface):
+ """
+
+ :param node: Node to get interface IPv4 counter on.
+ :param interface: Interface name.
+ :type node: dict
+ :type interface: str
+ :returns: Interface IPv4 counter.
+ :rtype: int
+ """
return self.vpp_get_ipv46_interface_counter(node, interface, False)
def vpp_get_ipv6_interface_counter(self, node, interface):
+ """
+
+ :param node: Node to get interface IPv6 counter on.
+ :param interface: Interface name.
+ :type node: dict
+ :type interface: str
+ :returns: Interface IPv6 counter.
+ :rtype: int
+ """
return self.vpp_get_ipv46_interface_counter(node, interface, True)
def vpp_get_ipv46_interface_counter(self, node, interface, is_ipv6=True):
:type node: dict
:type interface: str
:type is_ipv6: bool
- :return: Interface IPv4/IPv6 counter.
+ :returns: Interface IPv4/IPv6 counter.
:rtype: int
"""
version = 'ip6' if is_ipv6 else 'ip4'
# See the License for the specific language governing permissions and
# limitations under the License.
+"""Constants used in CSIT."""
+
class Constants(object):
+ """Constants used in CSIT."""
+
# OpenVPP testing directory location at topology nodes
REMOTE_FW_DIR = '/tmp/openvpp-testing'
suffix_dict = {"l2": "eth",
"l3_ip4": "ipv4",
"l3_ip6": "ipv6",
- "mixed": "mixed"
- }
+ "mixed": "mixed"}
try:
suffix = suffix_dict[layer]
except KeyError:
}
except KeyError:
raise ValueError("Unknown network layer {0}. "
- "Valid options are: {1}".format(
- layer, layers.keys()))
+ "Valid options are: {1}".
+ format(layer, layers.keys()))
status_code, resp = HcUtil.put_honeycomb_data(
node, "config_vpp_interfaces", data, path)
import HoneycombUtil as HcUtil
-# pylint: disable=too-many-public-methods
-# pylint: disable=too-many-lines
class InterfaceKeywords(object):
"""Keywords for Interface manipulation.
node, super_interface, path, address)
@staticmethod
- def remove_all_ipv4_addresses_from_sub_interface(node, super_interface, # pylint: disable=invalid-name
+ def remove_all_ipv4_addresses_from_sub_interface(node, super_interface,
identifier):
"""Remove all ipv4 addresses from the specified sub-interface.
:param url_file: URL file. The argument contains only the name of file
without extension, not the full path.
:type url_file: str
- :return: Requested path.
+ :returns: Requested path.
:rtype: str
"""
:param path: Path to data we want to find.
:type data: dict
:type path: tuple
- :return: Data represented by path.
+ :returns: Data represented by path.
:rtype: str, dict, or list
:raises HoneycombError: If the data has not been found.
"""
:param path: Path to data we want to remove.
:type data: dict
:type path: tuple
- :return: Original data without removed part.
+ :returns: Original data without removed part.
:rtype: dict
"""
:type data: dict
:type path: tuple
:type new_value: str, dict or list
- :return: Original data with the new value.
+ :returns: Original data with the new value.
:rtype: dict
"""
:type node: dict
:type url_file: str
:type path: str
- :return: Status code and content of response.
+ :returns: Status code and content of response.
:rtype tuple
"""
base_path = HoneycombUtil.read_path_from_url_file(url_file)
path = base_path + path
status_code, resp = HTTPRequest.get(node, path)
- (status_node, response) = status_code, loads(resp)
+ response = loads(resp)
if status_code != HTTPCodes.OK:
HoneycombUtil.read_log_tail(node)
return status_code, response
:type data: dict, str
:type path: str
:type data_representation: DataRepresentation
- :return: Status code and content of response.
+ :returns: Status code and content of response.
:rtype: tuple
:raises HoneycombError: If the given data representation is not defined
in HEADERS.
:type data: dict, str
:type data_representation: DataRepresentation
:type timeout: int
- :return: Status code and content of response.
+ :returns: Status code and content of response.
:rtype: tuple
:raises HoneycombError: If the given data representation is not defined
in HEADERS.
:type node: dict
:type url_file: str
:type path: str
- :return: Status code and content of response.
+ :returns: Status code and content of response.
:rtype tuple
"""
:param lines: Number of lines to read.
:type node: dict
:type lines: int
- :return: Last N log lines.
+ :returns: Last N log lines.
:rtype: str
"""
"""Keywords used to connect to Honeycomb through Netconf, send messages
and receive replies."""
+import socket
from time import time
import paramiko
-import socket
from robot.api import logger
from interruptingcow import timeout
password=node['honeycomb']['passwd'],
pkey=None,
port=node['honeycomb']['netconf_port'],
- timeout=time_out,
- )
+ timeout=time_out)
logger.trace('Connect took {0} seconds'.format(time() - start))
logger.debug('New ssh: {0}'.format(client))
:type size:int
:type time_out:int
:type err:str
- :return: Content of response.
+ :returns: Content of response.
:rtype: str
:raises HoneycombError: If the read process times out.
"""
break
except socket.timeout:
raise HoneycombError("Socket timeout.",
- enable_logging=False
- )
+ enable_logging=False)
except RuntimeError:
raise HoneycombError(err + " Content of buffer: {0}".format(reply),
- enable_logging=False
- )
+ enable_logging=False)
logger.trace(reply)
return reply.replace(self.delimiter, "")
:param time_out: Timeout value for getting the complete response.
:type size:int
:type time_out:int
- :return: Content of response.
+ :returns: Content of response.
:rtype: str
"""
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+"""
+__init__ file for resources/libraries/python/parsers
+"""
ssh = SSH()
try:
ssh.connect(node)
- except Exception as err:
+ except SSHException as err:
logger.error("Failed to connect to node" + str(err))
return None, None, None
else:
(ret_code, stdout, stderr) = ssh.exec_command_sudo(cmd,
timeout=timeout)
- except Exception as err:
+ except SSHException as err:
logger.error(err)
return None, None, None
IntField("Sequence Number", 0),
IntField("Observation Domain ID", 0),
ShortField("Set_ID", 0),
- ShortField("Set_Length", 0)
- ]
+ ShortField("Set_Length", 0)]
class IPFIXTemplate(Packet):
fields_desc = [ShortField("Template_ID", 256),
ShortField("nFields", 2),
FieldListField("Template", [], ShortField("type_len", ""),
- count_from=lambda p: p.nFields*2)
- ]
+ count_from=lambda p: p.nFields*2)]
class IPFIXData(Packet):
from resources.libraries.python.VatExecutor import VatTerminal
-# pylint: disable=too-few-public-methods
class SPAN(object):
"""Class contains methods for setting up SPAN mirroring on DUTs."""
with VatTerminal(node, json_param=False) as vat:
vat.vat_terminal_exec_cmd_from_template('span_create.vat',
src_sw_if_index=src_if,
- dst_sw_if_index=dst_if,
- )
+ dst_sw_if_index=dst_if)
@staticmethod
def vpp_get_span_configuration(node):
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+"""
+__init__ file for resources/libraries/python/telemetry
+"""
from yaml import load
from robot.api import logger
-from robot.libraries.BuiltIn import BuiltIn
+from robot.libraries.BuiltIn import BuiltIn, RobotNotRunningError
from robot.api.deco import keyword
__all__ = ["DICT__nodes", 'Topology']
def load_topo_from_yaml():
"""Load topology from file defined in "${TOPOLOGY_PATH}" variable.
- :return: Nodes from loaded topology.
+ :returns: Nodes from loaded topology.
"""
try:
topo_path = BuiltIn().get_variable_value("${TOPOLOGY_PATH}")
- except:
+ except RobotNotRunningError:
return ''
with open(topo_path) as work_file:
try:
if isinstance(iface_key, basestring):
return node['interfaces'][iface_key].get('vpp_sw_index')
- #FIXME: use only iface_key, do not use integer
+ # TODO: use only iface_key, do not use integer
else:
return int(iface_key)
except (KeyError, ValueError):
| | ... | ${settings['weight']}
| | Vpp Lisp Eid Table Mapping | ${dut_node}
| | ... | ${settings['vni']}
-| | ... | bd=${settings['bd']}
+| | ... | bd_id=${settings['bd']}
| | Vpp Add Lisp Local Eid | ${dut_node}
| | ... | ${settings['locator_name']}
| | ... | ${settings['vni']}