from ipaddress import ip_network, ip_address
-from resources.libraries.python.IPUtil import IPUtil
from resources.libraries.python.InterfaceUtil import InterfaceUtil, \
InterfaceStatusFlags
+from resources.libraries.python.IPAddress import IPAddress
+from resources.libraries.python.IPUtil import IPUtil
from resources.libraries.python.PapiExecutor import PapiSocketExecutor
from resources.libraries.python.ssh import scp_node
from resources.libraries.python.topology import Topology
class IPsecProto(IntEnum):
"""IPsec protocol."""
- ESP = 1
- SEC_AH = 0
+ IPSEC_API_PROTO_ESP = 50
+ IPSEC_API_PROTO_AH = 51
class IPsecSadFlags(IntEnum):
:returns: IPsecProto enum ESP object.
:rtype: IPsecProto
"""
- return int(IPsecProto.ESP)
+ return int(IPsecProto.IPSEC_API_PROTO_ESP)
@staticmethod
def ipsec_proto_ah():
:returns: IPsecProto enum AH object.
:rtype: IPsecProto
"""
- return int(IPsecProto.SEC_AH)
+ return int(IPsecProto.IPSEC_API_PROTO_AH)
@staticmethod
def vpp_ipsec_select_backend(node, protocol, index=1):
flags=flags,
tunnel_src=str(src_addr),
tunnel_dst=str(dst_addr),
- protocol=int(IPsecProto.ESP)
+ protocol=int(IPsecProto.IPSEC_API_PROTO_ESP)
)
args = dict(
- is_add=1,
+ is_add=True,
entry=sad_entry
)
with PapiSocketExecutor(node) as papi_exec:
flags=flags,
tunnel_src=str(src_addr),
tunnel_dst=str(dst_addr),
- protocol=int(IPsecProto.ESP)
+ protocol=int(IPsecProto.IPSEC_API_PROTO_ESP)
)
args = dict(
- is_add=1,
+ is_add=True,
entry=sad_entry
)
with PapiSocketExecutor(node) as papi_exec:
err_msg = f"Failed to add Security Policy Database " \
f"on host {node[u'host']}"
args = dict(
- is_add=1,
+ is_add=True,
spd_id=int(spd_id)
)
with PapiSocketExecutor(node) as papi_exec:
err_msg = f"Failed to add interface {interface} to Security Policy " \
f"Database {spd_id} on host {node[u'host']}"
args = dict(
- is_add=1,
+ is_add=True,
sw_if_index=InterfaceUtil.get_interface_index(node, interface),
spd_id=int(spd_id)
)
spd_entry = dict(
spd_id=int(spd_id),
priority=int(priority),
- is_outbound=0 if inbound else 1,
+ is_outbound=not inbound,
sa_id=int(sa_id) if sa_id else 0,
policy=action.policy_int_repr,
protocol=int(proto) if proto else 0,
- remote_address_start=IPUtil.create_ip_address_object(
+ remote_address_start=IPAddress.create_ip_address_object(
ip_network(raddr_range, strict=False).network_address
),
- remote_address_stop=IPUtil.create_ip_address_object(
+ remote_address_stop=IPAddress.create_ip_address_object(
ip_network(raddr_range, strict=False).broadcast_address
),
- local_address_start=IPUtil.create_ip_address_object(
+ local_address_start=IPAddress.create_ip_address_object(
ip_network(laddr_range, strict=False).network_address
),
- local_address_stop=IPUtil.create_ip_address_object(
+ local_address_stop=IPAddress.create_ip_address_object(
ip_network(laddr_range, strict=False).broadcast_address
),
remote_port_start=int(rport_range.split(u"-")[0]) if rport_range
else 65535
)
args = dict(
- is_add=1,
+ is_add=True,
entry=spd_entry
)
with PapiSocketExecutor(node) as papi_exec:
spd_entry = dict(
spd_id=int(spd_id),
priority=int(priority),
- is_outbound=0 if inbound else 1,
+ is_outbound=not inbound,
sa_id=int(sa_id) if sa_id else 0,
policy=IPsecUtil.policy_action_protect().policy_int_repr,
protocol=0,
- remote_address_start=IPUtil.create_ip_address_object(raddr_ip),
- remote_address_stop=IPUtil.create_ip_address_object(raddr_ip),
- local_address_start=IPUtil.create_ip_address_object(
+ remote_address_start=IPAddress.create_ip_address_object(raddr_ip),
+ remote_address_stop=IPAddress.create_ip_address_object(raddr_ip),
+ local_address_start=IPAddress.create_ip_address_object(
ip_network(laddr_range, strict=False).network_address
),
- local_address_stop=IPUtil.create_ip_address_object(
+ local_address_stop=IPAddress.create_ip_address_object(
ip_network(laddr_range, strict=False).broadcast_address
),
remote_port_start=0,
local_port_stop=65535
)
args = dict(
- is_add=1,
+ is_add=True,
entry=spd_entry
)
with PapiSocketExecutor(node) as papi_exec:
for i in range(n_entries):
args[u"entry"][u"remote_address_start"][u"un"] = \
- IPUtil.union_addr(raddr_ip + i)
+ IPAddress.union_addr(raddr_ip + i)
args[u"entry"][u"remote_address_stop"][u"un"] = \
- IPUtil.union_addr(raddr_ip + i)
+ IPAddress.union_addr(raddr_ip + i)
history = bool(not 1 < i < n_entries - 2)
papi_exec.add(cmd, history=history, **args)
papi_exec.get_replies(err_msg)
)
cmd2 = u"ipsec_tunnel_if_add_del"
args2 = dict(
- is_add=1,
+ is_add=True,
local_ip=None,
remote_ip=None,
local_spi=0,
)
args2[u"local_spi"] = spi_1 + i
args2[u"remote_spi"] = spi_2 + i
- args2[u"local_ip"] = IPUtil.create_ip_address_object(
+ args2[u"local_ip"] = IPAddress.create_ip_address_object(
if1_ip + i * addr_incr
)
- args2[u"remote_ip"] = IPUtil.create_ip_address_object(if2_ip)
+ args2[u"remote_ip"] = IPAddress.create_ip_address_object(if2_ip)
args2[u"local_crypto_key_len"] = len(ckeys[i])
args2[u"local_crypto_key"] = ckeys[i]
args2[u"remote_crypto_key_len"] = len(ckeys[i])
# Configure IPsec tunnel interfaces
cmd2 = u"ipsec_tunnel_if_add_del"
args2 = dict(
- is_add=1,
- local_ip=IPUtil.create_ip_address_object(if2_ip),
+ is_add=True,
+ local_ip=IPAddress.create_ip_address_object(if2_ip),
remote_ip=None,
local_spi=0,
remote_spi=0,
for i in range(existing_tunnels, n_tunnels):
args2[u"local_spi"] = spi_2 + i
args2[u"remote_spi"] = spi_1 + i
- args2[u"local_ip"] = IPUtil.create_ip_address_object(if2_ip)
- args2[u"remote_ip"] = IPUtil.create_ip_address_object(
+ args2[u"local_ip"] = IPAddress.create_ip_address_object(if2_ip)
+ args2[u"remote_ip"] = IPAddress.create_ip_address_object(
if1_ip + i * addr_incr)
args2[u"local_crypto_key_len"] = len(ckeys[i])
args2[u"local_crypto_key"] = ckeys[i]