"""IPsec utilities library."""
from enum import Enum, IntEnum
-from io import open
-from ipaddress import ip_network, ip_address
+from io import open, TextIOWrapper
+from ipaddress import ip_network, ip_address, IPv4Address, IPv6Address
from random import choice
from string import ascii_letters
+from typing import Iterable, List, Optional, Sequence, Tuple, Union
from robot.libraries.BuiltIn import BuiltIn
from resources.libraries.python.Constants import Constants
+from resources.libraries.python.enum_util import get_enum_instance
from resources.libraries.python.IncrementUtil import ObjIncrement
from resources.libraries.python.InterfaceUtil import (
InterfaceUtil,
IPSEC_REPLAY_WINDOW_DEFAULT = 64
-def gen_key(length):
+def gen_key(length: int) -> bytes:
"""Generate random string as a key.
:param length: Length of generated payload.
)
-class PolicyAction(Enum):
- """Policy actions."""
+# TODO: Introduce a metaclass that adds .find and .InputType automatically?
+class IpsecSpdAction(Enum):
+ """IPsec SPD actions.
- BYPASS = ("bypass", 0)
+ Mirroring VPP: src/vnet/ipsec/ipsec_types.api enum ipsec_spd_action.
+ """
+
+ BYPASS = NONE = ("bypass", 0)
DISCARD = ("discard", 1)
+ RESOLVE = ("resolve", 2)
PROTECT = ("protect", 3)
- def __init__(self, policy_name, policy_int_repr):
- self.policy_name = policy_name
- self.policy_int_repr = policy_int_repr
+ def __init__(self, action_name: str, action_int_repr: int):
+ self.action_name = action_name
+ self.action_int_repr = action_int_repr
- def __str__(self):
- return self.policy_name
+ def __str__(self) -> str:
+ return self.action_name
- def __int__(self):
- return self.policy_int_repr
+ def __int__(self) -> int:
+ return self.action_int_repr
class CryptoAlg(Enum):
- """Encryption algorithms."""
+ """Encryption algorithms.
+
+ API names and numeric enums from ipsec_types.api (enum ipsec_crypto_alg).
+
+ Lowercase names from ipsec_sa.h (foreach_ipsec_crypto_alg).
+
+ Scapy names are from:
+ https://github.com/secdev/scapy/blob/master/scapy/layers/ipsec.py
+ Key lengths from crypto.h
+ (foreach_crypto_cipher_alg and foreach_crypto_aead_alg).
+ """
+
+ NONE = ("none", 0, "none", 0)
AES_CBC_128 = ("aes-cbc-128", 1, "AES-CBC", 16)
+ AES_CBC_192 = ("aes-cbc-192", 2, "AES-CBC", 24)
AES_CBC_256 = ("aes-cbc-256", 3, "AES-CBC", 32)
+ AES_CTR_128 = ("aes-ctr-128", 4, "AES-CTR", 16)
+ AES_CTR_192 = ("aes-ctr-192", 5, "AES-CTR", 24)
+ AES_CTR_256 = ("aes-ctr-256", 6, "AES-CTR", 32)
AES_GCM_128 = ("aes-gcm-128", 7, "AES-GCM", 16)
+ AES_GCM_192 = ("aes-gcm-192", 8, "AES-GCM", 24)
AES_GCM_256 = ("aes-gcm-256", 9, "AES-GCM", 32)
-
- def __init__(self, alg_name, alg_int_repr, scapy_name, key_len):
+ DES_CBC = ("des-cbc", 10, "DES", 7)
+ _3DES_CBC = ("3des-cbc", 11, "3DES", 24)
+ CHACHA20_POLY1305 = ("chacha20-poly1305", 12, "CHACHA20-POLY1305", 32)
+ AES_NULL_GMAC_128 = ("aes-null-gmac-128", 13, "AES-NULL-GMAC", 16)
+ AES_NULL_GMAC_192 = ("aes-null-gmac-192", 14, "AES-NULL-GMAC", 24)
+ AES_NULL_GMAC_256 = ("aes-null-gmac-256", 15, "AES-NULL-GMAC", 32)
+
+ def __init__(
+ self, alg_name: str, alg_int_repr: int, scapy_name: str, key_len: int
+ ):
self.alg_name = alg_name
self.alg_int_repr = alg_int_repr
self.scapy_name = scapy_name
self.key_len = key_len
+ # TODO: Investigate if __int__ works with PAPI. It was not enough for "if".
+ def __bool__(self):
+ """A shorthand to enable "if crypto_alg:" constructs."""
+ return self.alg_int_repr != 0
+
class IntegAlg(Enum):
- """Integrity algorithm."""
+ """Integrity algorithms.
+
+ API names and numeric enums from ipsec_types.api (enum ipsec_integ_alg).
+
+ Lowercase names from ipsec_sa.h (foreach_ipsec_integ_alg).
+ Scapy names are from:
+ https://github.com/secdev/scapy/blob/master/scapy/layers/ipsec.py
+ Among those, "AES-CMAC-96" may be a mismatch,
+ but there is no sha2-related item with "96" in it.
+
+ Key lengths seem to be given double of digest length
+ from crypto.h (foreach_crypto_link_async_alg),
+ but data there is not complete
+ (e.g. it does not distinguish sha-256-96 from sha-256-128).
+ The missing values are chosen based on last number (e.g. 192 / 4 = 48).
+ """
+
+ NONE = ("none", 0, "none", 0)
+ MD5_96 = ("md5-96", 1, "HMAC-MD5-96", 24)
+ SHA1_96 = ("sha1-96", 2, "HMAC-SHA1-96", 24)
+ SHA_256_96 = ("sha-256-96", 3, "AES-CMAC-96", 24)
SHA_256_128 = ("sha-256-128", 4, "SHA2-256-128", 32)
+ SHA_384_192 = ("sha-384-192", 5, "SHA2-384-192", 48)
SHA_512_256 = ("sha-512-256", 6, "SHA2-512-256", 64)
- def __init__(self, alg_name, alg_int_repr, scapy_name, key_len):
+ def __init__(
+ self, alg_name: str, alg_int_repr: int, scapy_name: str, key_len: int
+ ):
self.alg_name = alg_name
self.alg_int_repr = alg_int_repr
self.scapy_name = scapy_name
self.key_len = key_len
+ def __bool__(self):
+ """A shorthand to enable "if integ_alg:" constructs."""
+ return self.alg_int_repr != 0
+
+# TODO: Base on Enum, so str values can be defined as in alg enums?
class IPsecProto(IntEnum):
- """IPsec protocol."""
+ """IPsec protocol.
- IPSEC_API_PROTO_ESP = 50
- IPSEC_API_PROTO_AH = 51
+ Mirroring VPP: src/vnet/ipsec/ipsec_types.api enum ipsec_proto.
+ """
+ ESP = 50
+ AH = 51
+ NONE = 255
+ def __str__(self) -> str:
+ """Return string suitable for CLI commands.
+
+ None is not supported.
+
+ :returns: Lowercase name of the proto.
+ :rtype: str
+ :raises: ValueError if the numeric value is not recognized.
+ """
+ num = int(self)
+ if num == 50:
+ return "esp"
+ if num == 51:
+ return "ah"
+ raise ValueError(f"String form not defined for IPsecProto {num}")
+
+
+# The rest of enums do not appear outside this file, so no no change needed yet.
class IPsecSadFlags(IntEnum):
"""IPsec Security Association Database flags."""
- IPSEC_API_SAD_FLAG_NONE = 0
+ IPSEC_API_SAD_FLAG_NONE = NONE = 0
# Enable extended sequence numbers
IPSEC_API_SAD_FLAG_USE_ESN = 0x01
# Enable Anti - replay
class TunnelEncpaDecapFlags(IntEnum):
"""Flags controlling tunnel behaviour."""
- TUNNEL_API_ENCAP_DECAP_FLAG_NONE = 0
+ TUNNEL_API_ENCAP_DECAP_FLAG_NONE = NONE = 0
# at encap, copy the DF bit of the payload into the tunnel header
TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DF = 1
# at encap, set the DF bit in the tunnel header
"""Tunnel modes."""
# point-to-point
- TUNNEL_API_MODE_P2P = 0
+ TUNNEL_API_MODE_P2P = NONE = 0
# multi-point
TUNNEL_API_MODE_MP = 1
-class IPsecUtil:
- """IPsec utilities."""
+# Derived types for type hints, based on capabilities of get_enum_instance.
+IpsecSpdAction.InputType = Union[IpsecSpdAction, str, None]
+CryptoAlg.InputType = Union[CryptoAlg, str, None]
+IntegAlg.InputType = Union[IntegAlg, str, None]
+IPsecProto.InputType = Union[IPsecProto, str, int, None]
+# TODO: Introduce a metaclass that adds .find and .InputType automatically?
- @staticmethod
- def policy_action_bypass():
- """Return policy action bypass.
-
- :returns: PolicyAction enum BYPASS object.
- :rtype: PolicyAction
- """
- return PolicyAction.BYPASS
-
- @staticmethod
- def policy_action_discard():
- """Return policy action discard.
-
- :returns: PolicyAction enum DISCARD object.
- :rtype: PolicyAction
- """
- return PolicyAction.DISCARD
-
- @staticmethod
- def policy_action_protect():
- """Return policy action protect.
- :returns: PolicyAction enum PROTECT object.
- :rtype: PolicyAction
- """
- return PolicyAction.PROTECT
-
- @staticmethod
- def crypto_alg_aes_cbc_128():
- """Return encryption algorithm aes-cbc-128.
-
- :returns: CryptoAlg enum AES_CBC_128 object.
- :rtype: CryptoAlg
- """
- return CryptoAlg.AES_CBC_128
-
- @staticmethod
- def crypto_alg_aes_cbc_256():
- """Return encryption algorithm aes-cbc-256.
-
- :returns: CryptoAlg enum AES_CBC_256 object.
- :rtype: CryptoAlg
- """
- return CryptoAlg.AES_CBC_256
-
- @staticmethod
- def crypto_alg_aes_gcm_128():
- """Return encryption algorithm aes-gcm-128.
-
- :returns: CryptoAlg enum AES_GCM_128 object.
- :rtype: CryptoAlg
- """
- return CryptoAlg.AES_GCM_128
-
- @staticmethod
- def crypto_alg_aes_gcm_256():
- """Return encryption algorithm aes-gcm-256.
+class IPsecUtil:
+ """IPsec utilities."""
- :returns: CryptoAlg enum AES_GCM_128 object.
- :rtype: CryptoAlg
- """
- return CryptoAlg.AES_GCM_256
+ # The following 4 methods are Python one-liners,
+ # but they are useful when called as a Robot keyword.
@staticmethod
- def get_crypto_alg_key_len(crypto_alg):
+ def get_crypto_alg_key_len(crypto_alg: CryptoAlg.InputType) -> int:
"""Return encryption algorithm key length.
+ This is a Python one-liner, but useful when called as a Robot keyword.
+
:param crypto_alg: Encryption algorithm.
- :type crypto_alg: CryptoAlg
+ :type crypto_alg: CryptoAlg.InputType
:returns: Key length.
:rtype: int
"""
- return crypto_alg.key_len
+ return get_enum_instance(CryptoAlg, crypto_alg).key_len
@staticmethod
- def get_crypto_alg_scapy_name(crypto_alg):
+ def get_crypto_alg_scapy_name(crypto_alg: CryptoAlg.InputType) -> str:
"""Return encryption algorithm scapy name.
+ This is a Python one-liner, but useful when called as a Robot keyword.
+
:param crypto_alg: Encryption algorithm.
- :type crypto_alg: CryptoAlg
+ :type crypto_alg: CryptoAlg.InputType
:returns: Algorithm scapy name.
:rtype: str
"""
- return crypto_alg.scapy_name
+ return get_enum_instance(CryptoAlg, crypto_alg).scapy_name
+ # The below to keywords differ only by enum type conversion from str.
@staticmethod
- def integ_alg_sha_256_128():
- """Return integrity algorithm SHA-256-128.
-
- :returns: IntegAlg enum SHA_256_128 object.
- :rtype: IntegAlg
- """
- return IntegAlg.SHA_256_128
-
- @staticmethod
- def integ_alg_sha_512_256():
- """Return integrity algorithm SHA-512-256.
-
- :returns: IntegAlg enum SHA_512_256 object.
- :rtype: IntegAlg
- """
- return IntegAlg.SHA_512_256
-
- @staticmethod
- def get_integ_alg_key_len(integ_alg):
+ def get_integ_alg_key_len(integ_alg: IntegAlg.InputType) -> int:
"""Return integrity algorithm key length.
- None argument is accepted, returning zero.
-
:param integ_alg: Integrity algorithm.
- :type integ_alg: Optional[IntegAlg]
+ :type integ_alg: IntegAlg.InputType
:returns: Key length.
:rtype: int
"""
- return 0 if integ_alg is None else integ_alg.key_len
+ return get_enum_instance(IntegAlg, integ_alg).key_len
@staticmethod
- def get_integ_alg_scapy_name(integ_alg):
+ def get_integ_alg_scapy_name(integ_alg: IntegAlg.InputType) -> str:
"""Return integrity algorithm scapy name.
:param integ_alg: Integrity algorithm.
- :type integ_alg: IntegAlg
+ :type integ_alg: IntegAlg.InputType
:returns: Algorithm scapy name.
:rtype: str
"""
- return integ_alg.scapy_name
-
- @staticmethod
- def ipsec_proto_esp():
- """Return IPSec protocol ESP.
-
- :returns: IPsecProto enum ESP object.
- :rtype: IPsecProto
- """
- return int(IPsecProto.IPSEC_API_PROTO_ESP)
+ return get_enum_instance(IntegAlg, integ_alg).scapy_name
@staticmethod
- def ipsec_proto_ah():
- """Return IPSec protocol AH.
-
- :returns: IPsecProto enum AH object.
- :rtype: IPsecProto
- """
- return int(IPsecProto.IPSEC_API_PROTO_AH)
-
- @staticmethod
- def vpp_ipsec_select_backend(node, protocol, index=1):
+ def vpp_ipsec_select_backend(
+ node: dict, proto: IPsecProto.InputType, index: int = 1
+ ) -> None:
"""Select IPsec backend.
:param node: VPP node to select IPsec backend on.
- :param protocol: IPsec protocol.
+ :param proto: IPsec protocol.
:param index: Backend index.
:type node: dict
- :type protocol: IPsecProto
+ :type proto: IPsecProto.InputType
:type index: int
:raises RuntimeError: If failed to select IPsec backend or if no API
reply received.
"""
+ proto = get_enum_instance(IPsecProto, proto)
cmd = "ipsec_select_backend"
err_msg = f"Failed to select IPsec backend on host {node['host']}"
- args = dict(protocol=protocol, index=index)
+ args = dict(protocol=proto, index=index)
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
- def vpp_ipsec_set_async_mode(node, async_enable=1):
+ def vpp_ipsec_set_async_mode(node: dict, async_enable: int = 1) -> None:
"""Set IPsec async mode on|off.
Unconditionally, attempt to switch crypto dispatch into polling mode.
@staticmethod
def vpp_ipsec_crypto_sw_scheduler_set_worker(
- node, workers, crypto_enable=False
- ):
+ node: dict, workers: Iterable[int], crypto_enable: bool = False
+ ) -> None:
"""Enable or disable crypto on specific vpp worker threads.
:param node: VPP node to enable or disable crypto for worker threads.
@staticmethod
def vpp_ipsec_crypto_sw_scheduler_set_worker_on_all_duts(
- nodes, crypto_enable=False
- ):
+ nodes: dict, crypto_enable: bool = False
+ ) -> None:
"""Enable or disable crypto on specific vpp worker threads.
:param node: VPP node to enable or disable crypto for worker threads.
@staticmethod
def vpp_ipsec_add_sad_entry(
- node,
- sad_id,
- spi,
- crypto_alg,
- crypto_key,
- integ_alg=None,
- integ_key="",
- tunnel_src=None,
- tunnel_dst=None,
- ):
+ node: dict,
+ sad_id: int,
+ spi: int,
+ crypto_alg: CryptoAlg.InputType = None,
+ crypto_key: str = "",
+ integ_alg: IntegAlg.InputType = None,
+ integ_key: str = "",
+ tunnel_src: Optional[str] = None,
+ tunnel_dst: Optional[str] = None,
+ ) -> None:
"""Create Security Association Database entry on the VPP node.
:param node: VPP node to add SAD entry on.
:type node: dict
:type sad_id: int
:type spi: int
- :type crypto_alg: CryptoAlg
+ :type crypto_alg: CryptoAlg.InputType
:type crypto_key: str
- :type integ_alg: Optional[IntegAlg]
+ :type integ_alg: IntegAlg.InputType
:type integ_key: str
- :type tunnel_src: str
- :type tunnel_dst: str
+ :type tunnel_src: Optional[str]
+ :type tunnel_dst: Optional[str]
"""
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
if isinstance(crypto_key, str):
crypto_key = crypto_key.encode(encoding="utf-8")
if isinstance(integ_key, str):
spi=int(spi),
crypto_algorithm=crypto_alg.alg_int_repr,
crypto_key=ckey,
- integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0,
+ integrity_algorithm=integ_alg.alg_int_repr,
integrity_key=ikey,
flags=flags,
tunnel=dict(
),
dscp=int(IpDscp.IP_API_DSCP_CS0),
),
- protocol=int(IPsecProto.IPSEC_API_PROTO_ESP),
+ protocol=IPsecProto.ESP,
udp_src_port=IPSEC_UDP_PORT_DEFAULT,
udp_dst_port=IPSEC_UDP_PORT_DEFAULT,
anti_replay_window_size=IPSEC_REPLAY_WINDOW_DEFAULT,
@staticmethod
def vpp_ipsec_add_sad_entries(
- node,
- n_entries,
- sad_id,
- spi,
- crypto_alg,
- crypto_key,
- integ_alg=None,
- integ_key="",
- tunnel_src=None,
- tunnel_dst=None,
- tunnel_addr_incr=True,
- ):
+ node: dict,
+ n_entries: int,
+ sad_id: int,
+ spi: int,
+ crypto_alg: CryptoAlg.InputType = None,
+ crypto_key: str = "",
+ integ_alg: IntegAlg.InputType = None,
+ integ_key: str = "",
+ tunnel_src: Optional[str] = None,
+ tunnel_dst: Optional[str] = None,
+ tunnel_addr_incr: bool = True,
+ ) -> None:
"""Create multiple Security Association Database entries on VPP node.
:param node: VPP node to add SAD entry on.
:type n_entries: int
:type sad_id: int
:type spi: int
- :type crypto_alg: CryptoAlg
+ :type crypto_alg: CryptoAlg.InputType
:type crypto_key: str
- :type integ_alg: Optional[IntegAlg]
+ :type integ_alg: IntegAlg.InputType
:type integ_key: str
- :type tunnel_src: str
- :type tunnel_dst: str
+ :type tunnel_src: Optional[str]
+ :type tunnel_dst: Optional[str]
:type tunnel_addr_incr: bool
"""
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
if isinstance(crypto_key, str):
crypto_key = crypto_key.encode(encoding="utf-8")
if isinstance(integ_key, str):
spi=int(spi),
crypto_algorithm=crypto_alg.alg_int_repr,
crypto_key=ckey,
- integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0,
+ integrity_algorithm=integ_alg.alg_int_repr,
integrity_key=ikey,
flags=flags,
tunnel=dict(
),
dscp=int(IpDscp.IP_API_DSCP_CS0),
),
- protocol=int(IPsecProto.IPSEC_API_PROTO_ESP),
+ protocol=IPsecProto.ESP,
udp_src_port=IPSEC_UDP_PORT_DEFAULT,
udp_dst_port=IPSEC_UDP_PORT_DEFAULT,
anti_replay_window_size=IPSEC_REPLAY_WINDOW_DEFAULT,
@staticmethod
def vpp_ipsec_set_ip_route(
- node,
- n_tunnels,
- tunnel_src,
- traffic_addr,
- tunnel_dst,
- interface,
- raddr_range,
- dst_mac=None,
- ):
+ node: dict,
+ n_tunnels: int,
+ tunnel_src: str,
+ traffic_addr: str,
+ tunnel_dst: str,
+ interface: str,
+ raddr_range: int,
+ dst_mac: Optional[str] = None,
+ ) -> None:
"""Set IP address and route on interface.
:param node: VPP node to add config on.
:type tunnel_dst: str
:type interface: str
:type raddr_range: int
- :type dst_mac: str
+ :type dst_mac: Optional[str]
"""
tunnel_src = ip_address(tunnel_src)
tunnel_dst = ip_address(tunnel_dst)
papi_exec.get_replies(err_msg)
@staticmethod
- def vpp_ipsec_add_spd(node, spd_id):
+ def vpp_ipsec_add_spd(node: dict, spd_id: int) -> None:
"""Create Security Policy Database on the VPP node.
:param node: VPP node to add SPD on.
papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
- def vpp_ipsec_spd_add_if(node, spd_id, interface):
+ def vpp_ipsec_spd_add_if(
+ node: dict, spd_id: int, interface: Union[str, int]
+ ) -> None:
"""Add interface to the Security Policy Database.
:param node: VPP node.
@staticmethod
def vpp_ipsec_create_spds_match_nth_entry(
- node,
- dir1_interface,
- dir2_interface,
- entry_amount,
- local_addr_range,
- remote_addr_range,
- action=PolicyAction.BYPASS,
- inbound=False,
- bidirectional=True,
- ):
+ node: dict,
+ dir1_interface: Union[str, int],
+ dir2_interface: Union[str, int],
+ entry_amount: int,
+ local_addr_range: Union[str, IPv4Address, IPv6Address],
+ remote_addr_range: Union[str, IPv4Address, IPv6Address],
+ action: IpsecSpdAction.InputType = IpsecSpdAction.BYPASS,
+ inbound: bool = False,
+ bidirectional: bool = True,
+ ) -> None:
"""Create one matching SPD entry for inbound or outbound traffic on
a DUT for each traffic direction and also create entry_amount - 1
non-matching SPD entries. Create a Security Policy Database on each
:param remote_addr_range: Matching remote address range in
direction 1 in format IP/prefix or IP/mask. If no mask is
provided, it's considered to be /32.
- :param action: Policy action.
+ :param action: IPsec SPD action.
:param inbound: If True policy is for inbound traffic, otherwise
outbound.
:param bidirectional: When True, will create SPDs in both directions
of traffic. When False, only in one direction.
:type node: dict
- :type dir1_interface: Union[string, int]
- :type dir2_interface: Union[string, int]
+ :type dir1_interface: Union[str, int]
+ :type dir2_interface: Union[str, int]
:type entry_amount: int
:type local_addr_range:
- Union[string, ipaddress.IPv4Address, ipaddress.IPv6Address]
+ Union[str, IPv4Address, IPv6Address]
:type remote_addr_range:
- Union[string, ipaddress.IPv4Address, ipaddress.IPv6Address]
- :type action: IPsecUtil.PolicyAction
+ Union[str, IPv4Address, IPv6Address]
+ :type action: IpsecSpdAction.InputType
:type inbound: bool
:type bidirectional: bool
- :raises NotImplementedError: When the action is PolicyAction.PROTECT.
+ :raises NotImplementedError: When the action is IpsecSpdAction.PROTECT.
"""
-
- if action == PolicyAction.PROTECT:
- raise NotImplementedError("Policy action PROTECT is not supported.")
+ action = get_enum_instance(IpsecSpdAction, action)
+ if action == IpsecSpdAction.PROTECT:
+ raise NotImplementedError(
+ "IPsec SPD action PROTECT is not supported."
+ )
spd_id_dir1 = 1
spd_id_dir2 = 2
@staticmethod
def _vpp_ipsec_add_spd_entry_internal(
- executor,
- spd_id,
- priority,
- action,
- inbound=True,
- sa_id=None,
- proto=None,
- laddr_range=None,
- raddr_range=None,
- lport_range=None,
- rport_range=None,
- is_ipv6=False,
- ):
+ executor: PapiSocketExecutor,
+ spd_id: int,
+ priority: int,
+ action: IpsecSpdAction.InputType,
+ inbound: bool = True,
+ sa_id: Optional[int] = None,
+ proto: IPsecProto.InputType = None,
+ laddr_range: Optional[str] = None,
+ raddr_range: Optional[str] = None,
+ lport_range: Optional[str] = None,
+ rport_range: Optional[str] = None,
+ is_ipv6: bool = False,
+ ) -> None:
"""Prepare to create Security Policy Database entry on the VPP node.
This just adds one more command to the executor.
:param executor: Open PAPI executor (async handling) to add commands to.
:param spd_id: SPD ID to add entry on.
:param priority: SPD entry priority, higher number = higher priority.
- :param action: Policy action.
+ :param action: IPsec SPD action.
:param inbound: If True policy is for inbound traffic, otherwise
outbound.
- :param sa_id: SAD entry ID for action PolicyAction.PROTECT.
+ :param sa_id: SAD entry ID for action IpsecSpdAction.PROTECT.
:param proto: Policy selector next layer protocol number.
:param laddr_range: Policy selector local IPv4 or IPv6 address range
in format IP/prefix or IP/mask. If no mask is provided,
:type executor: PapiSocketExecutor
:type spd_id: int
:type priority: int
- :type action: IPsecUtil.PolicyAction
+ :type action: IpsecSpdAction.InputType
:type inbound: bool
- :type sa_id: int
- :type proto: int
- :type laddr_range: string
- :type raddr_range: string
- :type lport_range: string
- :type rport_range: string
+ :type sa_id: Optional[int]
+ :type proto: IPsecProto.InputType
+ :type laddr_range: Optional[str]
+ :type raddr_range: Optional[str]
+ :type lport_range: Optional[str]
+ :type rport_range: Optional[str]
:type is_ipv6: bool
"""
+ action = get_enum_instance(IpsecSpdAction, action)
+ proto = get_enum_instance(IPsecProto, proto)
if laddr_range is None:
laddr_range = "::/0" if is_ipv6 else "0.0.0.0/0"
is_outbound=not inbound,
sa_id=int(sa_id) if sa_id else 0,
policy=int(action),
- protocol=255 if proto is None else int(proto),
+ protocol=proto,
remote_address_start=IPAddress.create_ip_address_object(
remote_net.network_address
),
@staticmethod
def vpp_ipsec_add_spd_entry(
- node,
- spd_id,
- priority,
- action,
- inbound=True,
- sa_id=None,
- proto=None,
- laddr_range=None,
- raddr_range=None,
- lport_range=None,
- rport_range=None,
- is_ipv6=False,
- ):
+ node: dict,
+ spd_id: int,
+ priority: int,
+ action: IpsecSpdAction.InputType,
+ inbound: bool = True,
+ sa_id: Optional[int] = None,
+ proto: IPsecProto.InputType = None,
+ laddr_range: Optional[str] = None,
+ raddr_range: Optional[str] = None,
+ lport_range: Optional[str] = None,
+ rport_range: Optional[str] = None,
+ is_ipv6: bool = False,
+ ) -> None:
"""Create Security Policy Database entry on the VPP node.
:param node: VPP node to add SPD entry on.
:param spd_id: SPD ID to add entry on.
:param priority: SPD entry priority, higher number = higher priority.
- :param action: Policy action.
+ :param action: IPsec SPD action.
:param inbound: If True policy is for inbound traffic, otherwise
outbound.
- :param sa_id: SAD entry ID for action PolicyAction.PROTECT.
+ :param sa_id: SAD entry ID for action IpsecSpdAction.PROTECT.
:param proto: Policy selector next layer protocol number.
:param laddr_range: Policy selector local IPv4 or IPv6 address range
in format IP/prefix or IP/mask. If no mask is provided,
:type node: dict
:type spd_id: int
:type priority: int
- :type action: IPsecUtil.PolicyAction
+ :type action: IpsecSpdAction.InputType
:type inbound: bool
- :type sa_id: int
- :type proto: int
- :type laddr_range: string
- :type raddr_range: string
- :type lport_range: string
- :type rport_range: string
+ :type sa_id: Optional[int]
+ :type proto: IPsecProto.InputType
+ :type laddr_range: Optional[str]
+ :type raddr_range: Optional[str]
+ :type lport_range: Optional[str]
+ :type rport_range: Optional[str]
:type is_ipv6: bool
"""
+ action = get_enum_instance(IpsecSpdAction, action)
+ proto = get_enum_instance(IPsecProto, proto)
err_msg = (
"Failed to add entry to Security Policy Database"
f" {spd_id} on host {node['host']}"
@staticmethod
def vpp_ipsec_add_spd_entries(
- node,
- n_entries,
- spd_id,
- priority,
- action,
- inbound,
- sa_id=None,
- proto=None,
- laddr_range=None,
- raddr_range=None,
- lport_range=None,
- rport_range=None,
- is_ipv6=False,
- ):
+ node: dict,
+ n_entries: int,
+ spd_id: int,
+ priority: Optional[ObjIncrement],
+ action: IpsecSpdAction.InputType,
+ inbound: bool,
+ sa_id: Optional[ObjIncrement] = None,
+ proto: IPsecProto.InputType = None,
+ laddr_range: Optional[NetworkIncrement] = None,
+ raddr_range: Optional[NetworkIncrement] = None,
+ lport_range: Optional[str] = None,
+ rport_range: Optional[str] = None,
+ is_ipv6: bool = False,
+ ) -> None:
"""Create multiple Security Policy Database entries on the VPP node.
:param node: VPP node to add SPD entries on.
:param n_entries: Number of SPD entries to be added.
:param spd_id: SPD ID to add entries on.
:param priority: SPD entries priority, higher number = higher priority.
- :param action: Policy action.
+ :param action: IPsec SPD action.
:param inbound: If True policy is for inbound traffic, otherwise
outbound.
- :param sa_id: SAD entry ID for action PolicyAction.PROTECT.
+ :param sa_id: SAD entry ID for action IpsecSpdAction.PROTECT.
:param proto: Policy selector next layer protocol number.
:param laddr_range: Policy selector local IPv4 or IPv6 address range
in format IP/prefix or IP/mask. If no mask is provided,
:type node: dict
:type n_entries: int
:type spd_id: int
- :type priority: IPsecUtil.ObjIncrement
- :type action: IPsecUtil.PolicyAction
+ :type priority: Optional[ObjIncrement]
+ :type action: IpsecSpdAction.InputType
:type inbound: bool
- :type sa_id: IPsecUtil.ObjIncrement
- :type proto: int
- :type laddr_range: IPsecUtil.NetworkIncrement
- :type raddr_range: IPsecUtil.NetworkIncrement
- :type lport_range: string
- :type rport_range: string
+ :type sa_id: Optional[ObjIncrement]
+ :type proto: IPsecProto.InputType
+ :type laddr_range: Optional[NetworkIncrement]
+ :type raddr_range: Optional[NetworkIncrement]
+ :type lport_range: Optional[str]
+ :type rport_range: Optional[str]
:type is_ipv6: bool
"""
+ action = get_enum_instance(IpsecSpdAction, action)
+ proto = get_enum_instance(IPsecProto, proto)
if laddr_range is None:
laddr_range = "::/0" if is_ipv6 else "0.0.0.0/0"
laddr_range = NetworkIncrement(ip_network(laddr_range), 0)
papi_exec.get_replies(err_msg)
@staticmethod
- def _ipsec_create_loopback_dut1_papi(nodes, tun_ips, if1_key, if2_key):
+ def _ipsec_create_loopback_dut1_papi(
+ nodes: dict, tun_ips: dict, if1_key: str, if2_key: str
+ ) -> int:
"""Create loopback interface and set IP address on VPP node 1 interface
using PAPI.
:type tun_ips: dict
:type if1_key: str
:type if2_key: str
+ :returns: sw_if_idx Of the created loopback interface.
+ :rtype: int
"""
with PapiSocketExecutor(nodes["DUT1"]) as papi_exec:
# Create loopback interface on DUT1, set it to up state
@staticmethod
def _ipsec_create_tunnel_interfaces_dut1_papi(
- nodes,
- tun_ips,
- if1_key,
- if2_key,
- n_tunnels,
- crypto_alg,
- integ_alg,
- raddr_ip2,
- addr_incr,
- spi_d,
- existing_tunnels=0,
- ):
+ nodes: dict,
+ tun_ips: dict,
+ if1_key: str,
+ if2_key: str,
+ n_tunnels: int,
+ crypto_alg: CryptoAlg.InputType,
+ integ_alg: IntegAlg.InputType,
+ raddr_ip2: Union[IPv4Address, IPv6Address],
+ addr_incr: int,
+ spi_d: dict,
+ existing_tunnels: int = 0,
+ udp_encap: bool = False,
+ anti_replay: bool = False,
+ ) -> Tuple[List[bytes], List[bytes]]:
"""Create multiple IPsec tunnel interfaces on DUT1 node using PAPI.
Generate random keys and return them (so DUT2 or TG can decrypt).
:param addr_incr: IP / IPv6 address incremental step.
:param existing_tunnels: Number of tunnel interfaces before creation.
Useful mainly for reconf tests. Default 0.
+ :param udp_encap: Whether to apply UDP_ENCAP flag.
+ :param anti_replay: Whether to apply USE_ANTI_REPLAY flag.
:type nodes: dict
:type tun_ips: dict
:type if1_key: str
:type if2_key: str
:type n_tunnels: int
- :type crypto_alg: CryptoAlg
- :type integ_alg: Optional[IntegAlg]
- :type raddr_ip2: IPv4Address or IPv6Address
+ :type crypto_alg: CryptoAlg.InputType
+ :type integ_alg: IntegAlg.InputType
+ :type raddr_ip2: Union[IPv4Address, IPv6Address]
:type addr_incr: int
:type spi_d: dict
:type existing_tunnels: int
+ :type udp_encap: bool
+ :type anti_replay: bool
:returns: Generated ckeys and ikeys.
:rtype: List[bytes], List[bytes]
"""
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
if not existing_tunnels:
loop_sw_if_idx = IPsecUtil._ipsec_create_loopback_dut1_papi(
nodes, tun_ips, if1_key, if2_key
c_key = dict(length=0, data=None)
i_key = dict(length=0, data=None)
common_flags = IPsecSadFlags.IPSEC_API_SAD_FLAG_NONE
+ if udp_encap:
+ common_flags |= IPsecSadFlags.IPSEC_API_SAD_FLAG_UDP_ENCAP
+ if anti_replay:
+ common_flags |= IPsecSadFlags.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
sad_entry = dict(
sad_id=None,
spi=None,
- protocol=int(IPsecProto.IPSEC_API_PROTO_ESP),
+ protocol=IPsecProto.ESP,
crypto_algorithm=crypto_alg.alg_int_repr,
crypto_key=c_key,
- integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0,
+ integrity_algorithm=integ_alg.alg_int_repr,
integrity_key=i_key,
flags=common_flags,
tunnel=dict(
)
args = dict(entry=sad_entry)
for i in range(existing_tunnels, n_tunnels):
- ckeys.append(
- gen_key(IPsecUtil.get_crypto_alg_key_len(crypto_alg))
- )
- ikeys.append(
- gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg))
- )
+ ckeys.append(gen_key(crypto_alg.key_len))
+ ikeys.append(gen_key(integ_alg.key_len))
# SAD entry for outband / tx path
sad_entry["sad_id"] = i
sad_entry["spi"] = spi_d["spi_1"] + i
@staticmethod
def _ipsec_create_tunnel_interfaces_dut2_papi(
- nodes,
- tun_ips,
- if2_key,
- n_tunnels,
- crypto_alg,
- ckeys,
- integ_alg,
- ikeys,
- raddr_ip1,
- addr_incr,
- spi_d,
- existing_tunnels=0,
- ):
+ nodes: dict,
+ tun_ips: dict,
+ if2_key: str,
+ n_tunnels: int,
+ crypto_alg: CryptoAlg.InputType,
+ ckeys: Sequence[bytes],
+ integ_alg: IntegAlg.InputType,
+ ikeys: Sequence[bytes],
+ raddr_ip1: Union[IPv4Address, IPv6Address],
+ addr_incr: int,
+ spi_d: dict,
+ existing_tunnels: int = 0,
+ udp_encap: bool = False,
+ anti_replay: bool = False,
+ ) -> None:
"""Create multiple IPsec tunnel interfaces on DUT2 node using PAPI.
This method accesses keys generated by DUT1 method
:param ckeys: List of encryption keys.
:param integ_alg: The integrity algorithm name.
:param ikeys: List of integrity keys.
+ :param raddr_ip1: Policy selector remote IPv4/IPv6 start address for the
+ first tunnel in direction node1->node2.
:param spi_d: Dictionary with SPIs for VPP node 1 and VPP node 2.
:param addr_incr: IP / IPv6 address incremental step.
:param existing_tunnels: Number of tunnel interfaces before creation.
Useful mainly for reconf tests. Default 0.
+ :param udp_encap: Whether to apply UDP_ENCAP flag.
+ :param anti_replay: Whether to apply USE_ANTI_REPLAY flag.
:type nodes: dict
:type tun_ips: dict
:type if2_key: str
:type n_tunnels: int
- :type crypto_alg: CryptoAlg
+ :type crypto_alg: CryptoAlg.InputType
:type ckeys: Sequence[bytes]
- :type integ_alg: Optional[IntegAlg]
+ :type integ_alg: IntegAlg.InputType
:type ikeys: Sequence[bytes]
+ :type raddr_ip1: Union[IPv4Address, IPv6Address]
:type addr_incr: int
:type spi_d: dict
:type existing_tunnels: int
+ :type udp_encap: bool
+ :type anti_replay: bool
"""
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
with PapiSocketExecutor(nodes["DUT2"], is_async=True) as papi_exec:
if not existing_tunnels:
# Set IP address on VPP node 2 interface
c_key = dict(length=0, data=None)
i_key = dict(length=0, data=None)
common_flags = IPsecSadFlags.IPSEC_API_SAD_FLAG_NONE
+ if udp_encap:
+ common_flags |= IPsecSadFlags.IPSEC_API_SAD_FLAG_UDP_ENCAP
+ if anti_replay:
+ common_flags |= IPsecSadFlags.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
sad_entry = dict(
sad_id=None,
spi=None,
- protocol=int(IPsecProto.IPSEC_API_PROTO_ESP),
+ protocol=IPsecProto.ESP,
crypto_algorithm=crypto_alg.alg_int_repr,
crypto_key=c_key,
- integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0,
+ integrity_algorithm=integ_alg.alg_int_repr,
integrity_key=i_key,
flags=common_flags,
tunnel=dict(
)
args = dict(entry=sad_entry)
for i in range(existing_tunnels, n_tunnels):
- ckeys.append(
- gen_key(IPsecUtil.get_crypto_alg_key_len(crypto_alg))
- )
- ikeys.append(
- gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg))
- )
+ ckeys.append(gen_key(crypto_alg.key_len))
+ ikeys.append(gen_key(integ_alg.key_len))
# SAD entry for outband / tx path
sad_entry["sad_id"] = 100000 + i
sad_entry["spi"] = spi_d["spi_2"] + i
@staticmethod
def vpp_ipsec_create_tunnel_interfaces(
- nodes,
- tun_if1_ip_addr,
- tun_if2_ip_addr,
- if1_key,
- if2_key,
- n_tunnels,
- crypto_alg,
- integ_alg,
- raddr_ip1,
- raddr_ip2,
- raddr_range,
- existing_tunnels=0,
- return_keys=False,
- ):
+ nodes: dict,
+ tun_if1_ip_addr: str,
+ tun_if2_ip_addr: str,
+ if1_key: str,
+ if2_key: str,
+ n_tunnels: int,
+ crypto_alg: CryptoAlg.InputType,
+ integ_alg: IntegAlg.InputType,
+ raddr_ip1: str,
+ raddr_ip2: str,
+ raddr_range: int,
+ existing_tunnels: int = 0,
+ udp_encap: bool = False,
+ anti_replay: bool = False,
+ return_keys: bool = False,
+ ) -> Optional[Tuple[List[bytes], List[bytes], int, int]]:
"""Create multiple IPsec tunnel interfaces between two VPP nodes.
Some deployments (e.g. devicetest) need to know the generated keys.
:param existing_tunnels: Number of tunnel interfaces before creation.
Useful mainly for reconf tests. Default 0.
:param return_keys: Whether generated keys should be returned.
+ :param udp_encap: Whether to apply UDP_ENCAP flag.
+ :param anti_replay: Whether to apply USE_ANTI_REPLAY flag.
:type nodes: dict
:type tun_if1_ip_addr: str
:type tun_if2_ip_addr: str
:type if1_key: str
:type if2_key: str
:type n_tunnels: int
- :type crypto_alg: CryptoAlg
- :type integ_alg: Optonal[IntegAlg]
- :type raddr_ip1: string
- :type raddr_ip2: string
+ :type crypto_alg: CryptoAlg.InputType
+ :type integ_alg: IntegAlg.InputType
+ :type raddr_ip1: str
+ :type raddr_ip2: str
:type raddr_range: int
:type existing_tunnels: int
:type return_keys: bool
+ :type udp_encap: bool
+ :type anti_replay: bool
:returns: Ckeys, ikeys, spi_1, spi_2.
- :rtype: Optional[List[bytes], List[bytes], int, int]
+ :rtype: Optional[Tuple[List[bytes], List[bytes], int, int]]
"""
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
n_tunnels = int(n_tunnels)
existing_tunnels = int(existing_tunnels)
spi_d = dict(spi_1=100000, spi_2=200000)
addr_incr,
spi_d,
existing_tunnels,
+ udp_encap,
+ anti_replay,
)
if "DUT2" in nodes.keys():
IPsecUtil._ipsec_create_tunnel_interfaces_dut2_papi(
addr_incr,
spi_d,
existing_tunnels,
+ udp_encap,
+ anti_replay,
)
if return_keys:
return None
@staticmethod
- def _create_ipsec_script_files(dut, instances):
+ def _create_ipsec_script_files(
+ dut: str, instances: int
+ ) -> List[TextIOWrapper]:
"""Create script files for configuring IPsec in containers
:param dut: DUT node on which to create the script files
:param instances: number of containers on DUT node
- :type dut: string
+ :type dut: str
:type instances: int
+ :returns: Created opened file handles.
+ :rtype: List[TextIOWrapper]
"""
scripts = []
for cnf in range(0, instances):
return scripts
@staticmethod
- def _close_and_copy_ipsec_script_files(dut, nodes, instances, scripts):
+ def _close_and_copy_ipsec_script_files(
+ dut: str, nodes: dict, instances: int, scripts: Sequence[TextIOWrapper]
+ ) -> None:
"""Close created scripts and copy them to containers
:param dut: DUT node on which to create the script files
:param nodes: VPP nodes
:param instances: number of containers on DUT node
:param scripts: dictionary holding the script files
- :type dut: string
+ :type dut: str
:type nodes: dict
:type instances: int
:type scripts: dict
)
scp_node(nodes[dut], script_filename, script_filename)
- @staticmethod
- def vpp_ipsec_create_tunnel_interfaces_in_containers(
- nodes,
- if1_ip_addr,
- if2_ip_addr,
- n_tunnels,
- crypto_alg,
- integ_alg,
- raddr_ip1,
- raddr_ip2,
- raddr_range,
- n_instances,
- ):
- """Create multiple IPsec tunnel interfaces between two VPP nodes.
-
- :param nodes: VPP nodes to create tunnel interfaces.
- :param if1_ip_addr: VPP node 1 interface IP4 address.
- :param if2_ip_addr: VPP node 2 interface IP4 address.
- :param n_tunnels: Number of tunnell interfaces to create.
- :param crypto_alg: The encryption algorithm name.
- :param integ_alg: The integrity algorithm name.
- :param raddr_ip1: Policy selector remote IPv4 start address for the
- first tunnel in direction node1->node2.
- :param raddr_ip2: Policy selector remote IPv4 start address for the
- first tunnel in direction node2->node1.
- :param raddr_range: Mask specifying range of Policy selector Remote
- IPv4 addresses. Valid values are from 1 to 32.
- :param n_instances: Number of containers.
- :type nodes: dict
- :type if1_ip_addr: str
- :type if2_ip_addr: str
- :type n_tunnels: int
- :type crypto_alg: CryptoAlg
- :type integ_alg: Optional[IntegAlg]
- :type raddr_ip1: string
- :type raddr_ip2: string
- :type raddr_range: int
- :type n_instances: int
- """
- spi_1 = 100000
- spi_2 = 200000
- addr_incr = 1 << (32 - raddr_range)
-
- dut1_scripts = IPsecUtil._create_ipsec_script_files("DUT1", n_instances)
- dut2_scripts = IPsecUtil._create_ipsec_script_files("DUT2", n_instances)
-
- for cnf in range(0, n_instances):
- dut1_scripts[cnf].write(
- "create loopback interface\nset interface state loop0 up\n\n"
- )
- dut2_scripts[cnf].write(
- f"ip route add {if1_ip_addr}/8 via"
- f" {ip_address(if2_ip_addr) + cnf + 100} memif1/{cnf + 1}\n\n"
- )
-
- for tnl in range(0, n_tunnels):
- cnf = tnl % n_instances
- ckey = getattr(
- gen_key(IPsecUtil.get_crypto_alg_key_len(crypto_alg)), "hex"
- )
- integ = ""
- ikey = getattr(
- gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg)), "hex"
- )
- if integ_alg:
- integ = (
- f"integ-alg {integ_alg.alg_name}"
- f" local-integ-key {ikey}"
- f" remote-integ-key {ikey}"
- )
- # Configure tunnel end point(s) on left side
- dut1_scripts[cnf].write(
- "set interface ip address loop0"
- f" {ip_address(if1_ip_addr) + tnl * addr_incr}/32\n"
- "create ipsec tunnel"
- f" local-ip {ip_address(if1_ip_addr) + tnl * addr_incr}"
- f" local-spi {spi_1 + tnl}"
- f" remote-ip {ip_address(if2_ip_addr) + cnf}"
- f" remote-spi {spi_2 + tnl}"
- f" crypto-alg {crypto_alg.alg_name}"
- f" local-crypto-key {ckey}"
- f" remote-crypto-key {ckey}"
- f" instance {tnl // n_instances}"
- f" salt 0x0 {integ}\n"
- f"set interface unnumbered ipip{tnl // n_instances} use loop0\n"
- f"set interface state ipip{tnl // n_instances} up\n"
- f"ip route add {ip_address(raddr_ip2)+tnl}/32"
- f" via ipip{tnl // n_instances}\n\n"
- )
- # Configure tunnel end point(s) on right side
- dut2_scripts[cnf].write(
- f"set ip neighbor memif1/{cnf + 1}"
- f" {ip_address(if1_ip_addr) + tnl * addr_incr}"
- f" 02:02:00:00:{17:02X}:{cnf:02X} static\n"
- f"create ipsec tunnel local-ip {ip_address(if2_ip_addr) + cnf}"
- f" local-spi {spi_2 + tnl}"
- f" remote-ip {ip_address(if1_ip_addr) + tnl * addr_incr}"
- f" remote-spi {spi_1 + tnl}"
- f" crypto-alg {crypto_alg.alg_name}"
- f" local-crypto-key {ckey}"
- f" remote-crypto-key {ckey}"
- f" instance {tnl // n_instances}"
- f" salt 0x0 {integ}\n"
- f"set interface unnumbered ipip{tnl // n_instances}"
- f" use memif1/{cnf + 1}\n"
- f"set interface state ipip{tnl // n_instances} up\n"
- f"ip route add {ip_address(raddr_ip1) + tnl}/32"
- f" via ipip{tnl // n_instances}\n\n"
- )
-
- IPsecUtil._close_and_copy_ipsec_script_files(
- "DUT1", nodes, n_instances, dut1_scripts
- )
- IPsecUtil._close_and_copy_ipsec_script_files(
- "DUT2", nodes, n_instances, dut2_scripts
- )
-
@staticmethod
def vpp_ipsec_add_multiple_tunnels(
- nodes,
- interface1,
- interface2,
- n_tunnels,
- crypto_alg,
- integ_alg,
- tunnel_ip1,
- tunnel_ip2,
- raddr_ip1,
- raddr_ip2,
- raddr_range,
- tunnel_addr_incr=True,
- ):
+ nodes: dict,
+ interface1: Union[str, int],
+ interface2: Union[str, int],
+ n_tunnels: int,
+ crypto_alg: CryptoAlg.InputType,
+ integ_alg: IntegAlg.InputType,
+ tunnel_ip1: str,
+ tunnel_ip2: str,
+ raddr_ip1: str,
+ raddr_ip2: str,
+ raddr_range: int,
+ tunnel_addr_incr: bool = True,
+ ) -> None:
"""Create multiple IPsec tunnels between two VPP nodes.
:param nodes: VPP nodes to create tunnels.
:param tunnel_addr_incr: Enable or disable tunnel IP address
incremental step.
:type nodes: dict
- :type interface1: str or int
- :type interface2: str or int
+ :type interface1: Union[str, int]
+ :type interface2: Union[str, int]
:type n_tunnels: int
- :type crypto_alg: CryptoAlg
- :type integ_alg: Optional[IntegAlg]
+ :type crypto_alg: CryptoAlg.InputType
+ :type integ_alg: IntegAlg.InputType
:type tunnel_ip1: str
:type tunnel_ip2: str
- :type raddr_ip1: string
- :type raddr_ip2: string
+ :type raddr_ip1: str
+ :type raddr_ip2: str
:type raddr_range: int
:type tunnel_addr_incr: bool
"""
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
+
spd_id = 1
p_hi = 100
p_lo = 10
spi_1 = 300000
spi_2 = 400000
- crypto_key = gen_key(
- IPsecUtil.get_crypto_alg_key_len(crypto_alg)
- ).decode()
- integ_key = (
- gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg)).decode()
- if integ_alg
- else ""
- )
-
+ crypto_key = gen_key(crypto_alg.key_len).decode()
+ integ_key = gen_key(integ_alg.key_len).decode()
rmac = (
Topology.get_interface_mac(nodes["DUT2"], interface2)
if "DUT2" in nodes.keys()
nodes["DUT1"],
spd_id,
p_hi,
- PolicyAction.BYPASS,
+ IpsecSpdAction.BYPASS,
inbound=False,
- proto=50,
+ proto=IPsecProto.ESP,
laddr_range=dut1_local_outbound_range,
raddr_range=dut1_remote_outbound_range,
)
nodes["DUT1"],
spd_id,
p_hi,
- PolicyAction.BYPASS,
+ IpsecSpdAction.BYPASS,
inbound=True,
- proto=50,
+ proto=IPsecProto.ESP,
laddr_range=dut1_remote_outbound_range,
raddr_range=dut1_local_outbound_range,
)
n_tunnels,
spd_id,
priority=ObjIncrement(p_lo, 0),
- action=PolicyAction.PROTECT,
+ action=IpsecSpdAction.PROTECT,
inbound=False,
sa_id=ObjIncrement(sa_id_1, 1),
raddr_range=NetworkIncrement(ip_network(raddr_ip2)),
n_tunnels,
spd_id,
priority=ObjIncrement(p_lo, 0),
- action=PolicyAction.PROTECT,
+ action=IpsecSpdAction.PROTECT,
inbound=True,
sa_id=ObjIncrement(sa_id_2, 1),
raddr_range=NetworkIncrement(ip_network(raddr_ip1)),
nodes["DUT2"],
spd_id,
p_hi,
- PolicyAction.BYPASS,
+ IpsecSpdAction.BYPASS,
inbound=False,
- proto=50,
+ proto=IPsecProto.ESP,
laddr_range=dut2_remote_outbound_range,
raddr_range=dut2_local_outbound_range,
)
nodes["DUT2"],
spd_id,
p_hi,
- PolicyAction.BYPASS,
+ IpsecSpdAction.BYPASS,
inbound=True,
- proto=50,
+ proto=IPsecProto.ESP,
laddr_range=dut2_local_outbound_range,
raddr_range=dut2_remote_outbound_range,
)
n_tunnels,
spd_id,
priority=ObjIncrement(p_lo, 0),
- action=PolicyAction.PROTECT,
+ action=IpsecSpdAction.PROTECT,
inbound=True,
sa_id=ObjIncrement(sa_id_1, 1),
raddr_range=NetworkIncrement(ip_network(raddr_ip2)),
n_tunnels,
spd_id,
priority=ObjIncrement(p_lo, 0),
- action=PolicyAction.PROTECT,
+ action=IpsecSpdAction.PROTECT,
inbound=False,
sa_id=ObjIncrement(sa_id_2, 1),
raddr_range=NetworkIncrement(ip_network(raddr_ip1)),
)
@staticmethod
- def vpp_ipsec_show_all(node):
+ def vpp_ipsec_show_all(node: dict) -> None:
"""Run "show ipsec all" debug CLI command.
:param node: Node to run command on.
PapiSocketExecutor.run_cli_cmd(node, "show ipsec all")
@staticmethod
- def show_ipsec_security_association(node):
+ def show_ipsec_security_association(node: dict) -> None:
"""Show IPSec security association.
:param node: DUT node.
PapiSocketExecutor.dump_and_log(node, [cmd])
@staticmethod
- def vpp_ipsec_flow_enable_rss(node, proto, rss_type, function="default"):
+ def vpp_ipsec_flow_enable_rss(
+ node: dict,
+ proto: str = "IPSEC_ESP",
+ rss_type: str = "esp",
+ function: str = "default",
+ ) -> int:
"""Ipsec flow enable rss action.
:param node: DUT node.
:param proto: The flow protocol.
:param rss_type: RSS type.
:param function: RSS function.
-
:type node: dict
- :type proto: str
+ :type proto: IPsecProto.InputType
:type rss_type: str
:type function: str
:returns: flow_index.
+ :rtype: int
"""
+ # The proto argument does not correspond to IPsecProto.
+ # The allowed values come from src/vnet/ip/protocols.def
+ # and we do not have a good enum for that yet.
+ # FlowUtil.FlowType and FlowUtil.FlowProto are close,
+ # but not exactly the same.
+
# TODO: to be fixed to use full PAPI when it is ready in VPP
cmd = (
f"test flow add src-ip any proto {proto} rss function"
@staticmethod
def vpp_create_ipsec_flows_on_dut(
- node, n_flows, rx_queues, spi_start, interface
- ):
+ node: dict, n_flows: int, rx_queues: int, spi_start: int, interface: str
+ ) -> None:
"""Create mutiple ipsec flows and enable flows onto interface.
:param node: DUT node.
:type rx_queues: int
:type spi_start: int
:type interface: str
- :returns: flow_index.
"""
for i in range(0, n_flows):