"""IPsec utilities library."""
-import os
-
from enum import Enum, IntEnum
from io import open
from ipaddress import ip_network, ip_address
def vpp_ipsec_set_async_mode(node, async_enable=1):
"""Set IPsec async mode on|off.
+ Unconditionally, attempt to switch crypto dispatch into polling mode.
+
:param node: VPP node to set IPsec async mode.
:param async_enable: Async mode on or off.
:type node: dict
:raises RuntimeError: If failed to set IPsec async mode or if no API
reply received.
"""
- cmd = u"ipsec_set_async_mode"
- err_msg = f"Failed to set IPsec async mode on host {node[u'host']}"
- args = dict(
- async_enable=async_enable
- )
with PapiSocketExecutor(node) as papi_exec:
+ cmd = u"ipsec_set_async_mode"
+ err_msg = f"Failed to set IPsec async mode on host {node[u'host']}"
+ args = dict(
+ async_enable=async_enable
+ )
papi_exec.add(cmd, **args).get_reply(err_msg)
+ cmd = "crypto_set_async_dispatch_v2"
+ err_msg = "Failed to set dispatch mode."
+ args = dict(mode=0, adaptive=False)
+ try:
+ papi_exec.add(cmd, **args).get_reply(err_msg)
+ except (AttributeError, RuntimeError):
+ # Expected when VPP build does not have the _v2 yet
+ # (after and before the first CRC check).
+ # TODO: Fail here when testing of pre-23.10 builds is over.
+ pass
@staticmethod
def vpp_ipsec_crypto_sw_scheduler_set_worker(
@staticmethod
def vpp_ipsec_add_sad_entries(
node, n_entries, sad_id, spi, crypto_alg, crypto_key,
- integ_alg=None, integ_key=u"", tunnel_src=None,tunnel_dst=None,
+ integ_alg=None, integ_key=u"", tunnel_src=None, tunnel_dst=None,
tunnel_addr_incr=True):
"""Create multiple Security Association Database entries on VPP node.
for i in range(n_tunnels//(addr_incr**2)+1):
dut1_local_outbound_range = \
ip_network(f"{ip_address(tunnel_ip1) + i*(addr_incr**3)}/8",
- False).with_prefixlen
+ False).with_prefixlen
dut1_remote_outbound_range = \
ip_network(f"{ip_address(tunnel_ip2) + i*(addr_incr**3)}/8",
- False).with_prefixlen
+ False).with_prefixlen
IPsecUtil.vpp_ipsec_add_spd_entry(
nodes[u"DUT1"], spd_id, p_hi, PolicyAction.BYPASS, inbound=False,
for i in range(n_tunnels//(addr_incr**2)+1):
dut2_local_outbound_range = \
ip_network(f"{ip_address(tunnel_ip1) + i*(addr_incr**3)}/8",
- False).with_prefixlen
+ False).with_prefixlen
dut2_remote_outbound_range = \
ip_network(f"{ip_address(tunnel_ip2) + i*(addr_incr**3)}/8",
- False).with_prefixlen
+ False).with_prefixlen
IPsecUtil.vpp_ipsec_add_spd_entry(
nodes[u"DUT2"], spd_id, p_hi, PolicyAction.BYPASS,
- inbound=False, proto=50, laddr_range=dut2_remote_outbound_range,
+ inbound=False, proto=50,
+ laddr_range=dut2_remote_outbound_range,
raddr_range=dut2_local_outbound_range
)
IPsecUtil.vpp_ipsec_add_spd_entry(
nodes[u"DUT2"], spd_id, p_hi, PolicyAction.BYPASS,
- inbound=True, proto=50, laddr_range=dut2_local_outbound_range,
+ inbound=True, proto=50,
+ laddr_range=dut2_local_outbound_range,
raddr_range=dut2_remote_outbound_range
)
for i in range(0, n_flows):
rx_queue = i%rx_queues
-
spi = spi_start + i
flow_index = FlowUtil.vpp_create_ip4_ipsec_flow(
- node, "ESP", spi, "redirect-to-queue", value=rx_queue)
+ node, "ESP", spi, "redirect-to-queue", value=rx_queue)
FlowUtil.vpp_flow_enable(node, interface, flow_index)