if not isinstance(ip, IPv6):
raise RuntimeError(f"Not an IP packet received {ip!r}")
elif not isinstance(ip, ip_format):
- raise RuntimeError(f"Not an IP packet received {ip!r}")
+ raise RuntimeError(f"Not an IP packet received {ip!r}")
lisp = ether.getlayer(lisp_layer)
if not lisp:
if not isinstance(ip, IPv6):
raise RuntimeError(f"Not an IP packet received {ip!r}")
elif not isinstance(ip, ip_format):
- raise RuntimeError(f"Not an IP packet received {ip!r}")
+ raise RuntimeError(f"Not an IP packet received {ip!r}")
lisp = ether.getlayer(LispGPEHeader).underlayer
if not lisp:
# sriov is not supported and we want 0 VFs
# no need to do anything
return
- else:
- raise RuntimeError(
- f"Can't configure {numvfs} VFs on {pf_pci_addr} device "
- f"on {node[u'host']} since it doesn't support SR-IOV."
- )
+
+ raise RuntimeError(
+ f"Can't configure {numvfs} VFs on {pf_pci_addr} device "
+ f"on {node[u'host']} since it doesn't support SR-IOV."
+ )
pci = pf_pci_addr.replace(u":", r"\:")
command = f"sh -c \"echo {numvfs} | " \
# the directory doesn't exist which means the device is not bound
# to any driver
return None
- else:
- cmd = f"basename $(readlink -f {driver_path})"
- ret_val, _ = exec_cmd_no_error(node, cmd)
- return ret_val.strip()
+ cmd = f"basename $(readlink -f {driver_path})"
+ ret_val, _ = exec_cmd_no_error(node, cmd)
+ return ret_val.strip()
@staticmethod
def verify_kernel_module(node, module, force_load=False):
self._search_result = SearchResults.SUCCESS
self._search_result_rate = rate
return
- else:
- raise RuntimeError(u"Unknown search result")
- else:
- raise Exception(u"Unknown search direction")
+ raise RuntimeError(u"Unknown search result")
+ raise Exception(u"Unknown search direction")
def verify_search_result(self):
"""Fail if search was not successful.
from resources.libraries.python.ssh import exec_cmd_no_error
from resources.libraries.python.PapiExecutor import PapiSocketExecutor
+from vpp_papi import VppEnum
+
class FlowUtil:
"""Utilities for flow configuration."""
:returns: flow_index.
:rtype: int
"""
- from vpp_papi import VppEnum
-
flow = u"ip4_n_tuple"
flow_type = VppEnum.vl_api_flow_type_t.FLOW_TYPE_IP4_N_TUPLE
:returns: flow_index.
:rtype: int
"""
- from vpp_papi import VppEnum
-
flow = u"ip6_n_tuple"
flow_type = VppEnum.vl_api_flow_type_t.FLOW_TYPE_IP6_N_TUPLE
:returns: flow_index.
:rtype: int
"""
- from vpp_papi import VppEnum
-
flow = u"ip4"
flow_type = VppEnum.vl_api_flow_type_t.FLOW_TYPE_IP4
:returns: flow_index.
:rtype: int
"""
- from vpp_papi import VppEnum
-
flow = u"ip6"
flow_type = VppEnum.vl_api_flow_type_t.FLOW_TYPE_IP6
:returns: flow_index.
:rtype: int
"""
- from vpp_papi import VppEnum
-
flow = u"ip4_gtpu"
flow_type = VppEnum.vl_api_flow_type_t.FLOW_TYPE_IP4_GTPU
flow_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
:returns: flow_index.
:rtype: int
"""
- from vpp_papi import VppEnum
-
if proto == u"ESP":
flow = u"ip4_ipsec_esp"
flow_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ESP
:returns: flow_index.
:rtype: int
"""
- from vpp_papi import VppEnum
-
flow = u"ip4_l2tpv3oip"
flow_proto = 115 # IP_API_PROTO_L2TP
flow_type = VppEnum.vl_api_flow_type_t.FLOW_TYPE_IP4_L2TPV3OIP
:type value: int
:returns: flow_index.
"""
- from vpp_papi import VppEnum
-
flow = u"ip4_vxlan"
flow_type = VppEnum.vl_api_flow_type_t.FLOW_TYPE_IP4_VXLAN
flow_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
:rtype: int
:raises ValueError: If action type is not supported.
"""
- from vpp_papi import VppEnum
-
cmd = u"flow_add"
if action == u"redirect-to-queue":
return f"{self._value.network_address}/{self._prefix_len}"
elif self._format == u"addr":
return f"{self._value.network_address}"
- else:
- raise RuntimeError(f"Unsupported format {self._format}")
+
+ raise RuntimeError(f"Unsupported format {self._format}")
class IPUtil:
:type action: IPsecUtil.PolicyAction
:type inbound: bool
:type bidirectional: bool
- :raises NotImplemented: When the action is PolicyAction.PROTECT.
+ :raises NotImplementedError: When the action is PolicyAction.PROTECT.
"""
if action == PolicyAction.PROTECT:
- raise NotImplemented('Policy action PROTECT is not supported.')
+ raise NotImplementedError('Policy action PROTECT is not supported.')
spd_id_dir1 = 1
spd_id_dir2 = 2
tmp_filename = f"/tmp/ipsec_spd_{spd_id}_add_del_entry.script"
with open(tmp_filename, 'w') as tmp_file:
- for i in range(n_entries):
+ for _ in range(n_entries):
direction = u'inbound' if inbound else u'outbound'
sa = f' sa {sa_id.inc_fmt()}' if sa_id is not None else ''
protocol = f' protocol {protocol}' if proto else ''
os.remove(tmp_filename)
return
- for i in range(n_entries):
+ for _ in range(n_entries):
IPsecUtil.vpp_ipsec_add_spd_entry(
node, spd_id, next(priority), action, inbound,
next(sa_id) if sa_id is not None else sa_id,
crypto_key, integ_alg, integ_key, tunnel_ip1, tunnel_ip2
)
IPsecUtil.vpp_ipsec_add_spd_entries(
- nodes[u"DUT2"], n_tunnels, spd_id, priority=ObjIncrement(p_lo, 0),
+ nodes[u"DUT2"], n_tunnels, spd_id,
+ priority=ObjIncrement(p_lo, 0),
action=PolicyAction.PROTECT, inbound=True,
sa_id=ObjIncrement(sa_id_1, 1),
raddr_range=NetworkIncrement(ip_network(raddr_ip2))
crypto_key, integ_alg, integ_key, tunnel_ip2, tunnel_ip1
)
IPsecUtil.vpp_ipsec_add_spd_entries(
- nodes[u"DUT2"], n_tunnels, spd_id, priority=ObjIncrement(p_lo, 0),
+ nodes[u"DUT2"], n_tunnels, spd_id,
+ priority=ObjIncrement(p_lo, 0),
action=PolicyAction.PROTECT, inbound=False,
sa_id=ObjIncrement(sa_id_2, 1),
raddr_range=NetworkIncrement(ip_network(raddr_ip1))
"""Increment utilities library."""
-class ObjIncrement(object):
+class ObjIncrement():
"""
An iterator class used to generate incremented values in each iteration
or when inc_fmt is called.
raise RuntimeError(u"No path for topology")
return self._path[-2]
- def compute_circular_topology(self, nodes, filter_list=None, nic_pfs=1,
+ def compute_circular_topology(
+ self, nodes, filter_list=None, nic_pfs=1,
always_same_link=False, topo_has_tg=True, topo_has_dut=True):
"""Return computed circular path.