X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=resources%2Flibraries%2Fpython%2FIPsecUtil.py;h=e455dd7a886490572647b2d4f163b55e541118fb;hb=HEAD;hp=dd7bd068facecd1363fd963e7e92a6f658b7db09;hpb=5ba25d499683ba5fa8e0e14ede051fcd4643e008;p=csit.git diff --git a/resources/libraries/python/IPsecUtil.py b/resources/libraries/python/IPsecUtil.py index dd7bd068fa..59374ab73f 100644 --- a/resources/libraries/python/IPsecUtil.py +++ b/resources/libraries/python/IPsecUtil.py @@ -15,14 +15,16 @@ """IPsec utilities library.""" from enum import Enum, IntEnum -from io import open -from ipaddress import ip_network, ip_address +from io import open, TextIOWrapper +from ipaddress import ip_network, ip_address, IPv4Address, IPv6Address from random import choice from string import ascii_letters +from typing import Iterable, List, Optional, Sequence, Tuple, Union from robot.libraries.BuiltIn import BuiltIn from resources.libraries.python.Constants import Constants +from resources.libraries.python.enum_util import get_enum_instance from resources.libraries.python.IncrementUtil import ObjIncrement from resources.libraries.python.InterfaceUtil import ( InterfaceUtil, @@ -46,7 +48,7 @@ IPSEC_UDP_PORT_DEFAULT = 4500 IPSEC_REPLAY_WINDOW_DEFAULT = 64 -def gen_key(length): +def gen_key(length: int) -> bytes: """Generate random string as a key. :param length: Length of generated payload. @@ -59,63 +61,105 @@ def gen_key(length): ) -class PolicyAction(Enum): - """Policy actions.""" +# TODO: Introduce a metaclass that adds .find and .InputType automatically? +class IpsecSpdAction(Enum): + """IPsec SPD actions. - BYPASS = ("bypass", 0) + Mirroring VPP: src/vnet/ipsec/ipsec_types.api enum ipsec_spd_action. + """ + + BYPASS = NONE = ("bypass", 0) DISCARD = ("discard", 1) + RESOLVE = ("resolve", 2) PROTECT = ("protect", 3) - def __init__(self, policy_name, policy_int_repr): - self.policy_name = policy_name - self.policy_int_repr = policy_int_repr + def __init__(self, action_name: str, action_int_repr: int): + self.action_name = action_name + self.action_int_repr = action_int_repr - def __str__(self): - return self.policy_name + def __str__(self) -> str: + return self.action_name - def __int__(self): - return self.policy_int_repr + def __int__(self) -> int: + return self.action_int_repr class CryptoAlg(Enum): """Encryption algorithms.""" + NONE = ("none", 0, "none", 0) AES_CBC_128 = ("aes-cbc-128", 1, "AES-CBC", 16) AES_CBC_256 = ("aes-cbc-256", 3, "AES-CBC", 32) AES_GCM_128 = ("aes-gcm-128", 7, "AES-GCM", 16) AES_GCM_256 = ("aes-gcm-256", 9, "AES-GCM", 32) - def __init__(self, alg_name, alg_int_repr, scapy_name, key_len): + def __init__( + self, alg_name: str, alg_int_repr: int, scapy_name: str, key_len: int + ): self.alg_name = alg_name self.alg_int_repr = alg_int_repr self.scapy_name = scapy_name self.key_len = key_len + # TODO: Investigate if __int__ works with PAPI. It was not enough for "if". + def __bool__(self): + """A shorthand to enable "if crypto_alg:" constructs.""" + return self.alg_int_repr != 0 + class IntegAlg(Enum): """Integrity algorithm.""" + NONE = ("none", 0, "none", 0) SHA_256_128 = ("sha-256-128", 4, "SHA2-256-128", 32) SHA_512_256 = ("sha-512-256", 6, "SHA2-512-256", 64) - def __init__(self, alg_name, alg_int_repr, scapy_name, key_len): + def __init__( + self, alg_name: str, alg_int_repr: int, scapy_name: str, key_len: int + ): self.alg_name = alg_name self.alg_int_repr = alg_int_repr self.scapy_name = scapy_name self.key_len = key_len + def __bool__(self): + """A shorthand to enable "if integ_alg:" constructs.""" + return self.alg_int_repr != 0 + +# TODO: Base on Enum, so str values can be defined as in alg enums? class IPsecProto(IntEnum): - """IPsec protocol.""" + """IPsec protocol. + + Mirroring VPP: src/vnet/ipsec/ipsec_types.api enum ipsec_proto. + """ - IPSEC_API_PROTO_ESP = 50 - IPSEC_API_PROTO_AH = 51 + ESP = 50 + AH = 51 + NONE = 255 + def __str__(self) -> str: + """Return string suitable for CLI commands. + None is not supported. + + :returns: Lowercase name of the proto. + :rtype: str + :raises: ValueError if the numeric value is not recognized. + """ + num = int(self) + if num == 50: + return "esp" + if num == 51: + return "ah" + raise ValueError(f"String form not defined for IPsecProto {num}") + + +# The rest of enums do not appear outside this file, so no no change needed yet. class IPsecSadFlags(IntEnum): """IPsec Security Association Database flags.""" - IPSEC_API_SAD_FLAG_NONE = 0 + IPSEC_API_SAD_FLAG_NONE = NONE = 0 # Enable extended sequence numbers IPSEC_API_SAD_FLAG_USE_ESN = 0x01 # Enable Anti - replay @@ -134,7 +178,7 @@ class IPsecSadFlags(IntEnum): class TunnelEncpaDecapFlags(IntEnum): """Flags controlling tunnel behaviour.""" - TUNNEL_API_ENCAP_DECAP_FLAG_NONE = 0 + TUNNEL_API_ENCAP_DECAP_FLAG_NONE = NONE = 0 # at encap, copy the DF bit of the payload into the tunnel header TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DF = 1 # at encap, set the DF bit in the tunnel header @@ -151,180 +195,98 @@ class TunnelMode(IntEnum): """Tunnel modes.""" # point-to-point - TUNNEL_API_MODE_P2P = 0 + TUNNEL_API_MODE_P2P = NONE = 0 # multi-point TUNNEL_API_MODE_MP = 1 -class IPsecUtil: - """IPsec utilities.""" - - @staticmethod - def policy_action_bypass(): - """Return policy action bypass. - - :returns: PolicyAction enum BYPASS object. - :rtype: PolicyAction - """ - return PolicyAction.BYPASS - - @staticmethod - def policy_action_discard(): - """Return policy action discard. - - :returns: PolicyAction enum DISCARD object. - :rtype: PolicyAction - """ - return PolicyAction.DISCARD - - @staticmethod - def policy_action_protect(): - """Return policy action protect. - - :returns: PolicyAction enum PROTECT object. - :rtype: PolicyAction - """ - return PolicyAction.PROTECT - - @staticmethod - def crypto_alg_aes_cbc_128(): - """Return encryption algorithm aes-cbc-128. - - :returns: CryptoAlg enum AES_CBC_128 object. - :rtype: CryptoAlg - """ - return CryptoAlg.AES_CBC_128 +# Derived types for type hints, based on capabilities of get_enum_instance. +IpsecSpdAction.InputType = Union[IpsecSpdAction, str, None] +CryptoAlg.InputType = Union[CryptoAlg, str, None] +IntegAlg.InputType = Union[IntegAlg, str, None] +IPsecProto.InputType = Union[IPsecProto, str, int, None] +# TODO: Introduce a metaclass that adds .find and .InputType automatically? - @staticmethod - def crypto_alg_aes_cbc_256(): - """Return encryption algorithm aes-cbc-256. - :returns: CryptoAlg enum AES_CBC_256 object. - :rtype: CryptoAlg - """ - return CryptoAlg.AES_CBC_256 - - @staticmethod - def crypto_alg_aes_gcm_128(): - """Return encryption algorithm aes-gcm-128. - - :returns: CryptoAlg enum AES_GCM_128 object. - :rtype: CryptoAlg - """ - return CryptoAlg.AES_GCM_128 - - @staticmethod - def crypto_alg_aes_gcm_256(): - """Return encryption algorithm aes-gcm-256. +class IPsecUtil: + """IPsec utilities.""" - :returns: CryptoAlg enum AES_GCM_128 object. - :rtype: CryptoAlg - """ - return CryptoAlg.AES_GCM_256 + # The following 4 methods are Python one-liners, + # but they are useful when called as a Robot keyword. @staticmethod - def get_crypto_alg_key_len(crypto_alg): + def get_crypto_alg_key_len(crypto_alg: CryptoAlg.InputType) -> int: """Return encryption algorithm key length. + This is a Python one-liner, but useful when called as a Robot keyword. + :param crypto_alg: Encryption algorithm. - :type crypto_alg: CryptoAlg + :type crypto_alg: CryptoAlg.InputType :returns: Key length. :rtype: int """ - return crypto_alg.key_len + return get_enum_instance(CryptoAlg, crypto_alg).key_len @staticmethod - def get_crypto_alg_scapy_name(crypto_alg): + def get_crypto_alg_scapy_name(crypto_alg: CryptoAlg.InputType) -> str: """Return encryption algorithm scapy name. + This is a Python one-liner, but useful when called as a Robot keyword. + :param crypto_alg: Encryption algorithm. - :type crypto_alg: CryptoAlg + :type crypto_alg: CryptoAlg.InputType :returns: Algorithm scapy name. :rtype: str """ - return crypto_alg.scapy_name - - @staticmethod - def integ_alg_sha_256_128(): - """Return integrity algorithm SHA-256-128. - - :returns: IntegAlg enum SHA_256_128 object. - :rtype: IntegAlg - """ - return IntegAlg.SHA_256_128 + return get_enum_instance(CryptoAlg, crypto_alg).scapy_name + # The below to keywords differ only by enum type conversion from str. @staticmethod - def integ_alg_sha_512_256(): - """Return integrity algorithm SHA-512-256. - - :returns: IntegAlg enum SHA_512_256 object. - :rtype: IntegAlg - """ - return IntegAlg.SHA_512_256 - - @staticmethod - def get_integ_alg_key_len(integ_alg): + def get_integ_alg_key_len(integ_alg: IntegAlg.InputType) -> int: """Return integrity algorithm key length. - None argument is accepted, returning zero. - :param integ_alg: Integrity algorithm. - :type integ_alg: Optional[IntegAlg] + :type integ_alg: IntegAlg.InputType :returns: Key length. :rtype: int """ - return 0 if integ_alg is None else integ_alg.key_len + return get_enum_instance(IntegAlg, integ_alg).key_len @staticmethod - def get_integ_alg_scapy_name(integ_alg): + def get_integ_alg_scapy_name(integ_alg: IntegAlg.InputType) -> str: """Return integrity algorithm scapy name. :param integ_alg: Integrity algorithm. - :type integ_alg: IntegAlg + :type integ_alg: IntegAlg.InputType :returns: Algorithm scapy name. :rtype: str """ - return integ_alg.scapy_name + return get_enum_instance(IntegAlg, integ_alg).scapy_name @staticmethod - def ipsec_proto_esp(): - """Return IPSec protocol ESP. - - :returns: IPsecProto enum ESP object. - :rtype: IPsecProto - """ - return int(IPsecProto.IPSEC_API_PROTO_ESP) - - @staticmethod - def ipsec_proto_ah(): - """Return IPSec protocol AH. - - :returns: IPsecProto enum AH object. - :rtype: IPsecProto - """ - return int(IPsecProto.IPSEC_API_PROTO_AH) - - @staticmethod - def vpp_ipsec_select_backend(node, protocol, index=1): + def vpp_ipsec_select_backend( + node: dict, proto: IPsecProto.InputType, index: int = 1 + ) -> None: """Select IPsec backend. :param node: VPP node to select IPsec backend on. - :param protocol: IPsec protocol. + :param proto: IPsec protocol. :param index: Backend index. :type node: dict - :type protocol: IPsecProto + :type proto: IPsecProto.InputType :type index: int :raises RuntimeError: If failed to select IPsec backend or if no API reply received. """ + proto = get_enum_instance(IPsecProto, proto) cmd = "ipsec_select_backend" err_msg = f"Failed to select IPsec backend on host {node['host']}" - args = dict(protocol=protocol, index=index) + args = dict(protocol=proto, index=index) with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) @staticmethod - def vpp_ipsec_set_async_mode(node, async_enable=1): + def vpp_ipsec_set_async_mode(node: dict, async_enable: int = 1) -> None: """Set IPsec async mode on|off. Unconditionally, attempt to switch crypto dispatch into polling mode. @@ -354,8 +316,8 @@ class IPsecUtil: @staticmethod def vpp_ipsec_crypto_sw_scheduler_set_worker( - node, workers, crypto_enable=False - ): + node: dict, workers: Iterable[int], crypto_enable: bool = False + ) -> None: """Enable or disable crypto on specific vpp worker threads. :param node: VPP node to enable or disable crypto for worker threads. @@ -379,8 +341,8 @@ class IPsecUtil: @staticmethod def vpp_ipsec_crypto_sw_scheduler_set_worker_on_all_duts( - nodes, crypto_enable=False - ): + nodes: dict, crypto_enable: bool = False + ) -> None: """Enable or disable crypto on specific vpp worker threads. :param node: VPP node to enable or disable crypto for worker threads. @@ -410,16 +372,16 @@ class IPsecUtil: @staticmethod def vpp_ipsec_add_sad_entry( - node, - sad_id, - spi, - crypto_alg, - crypto_key, - integ_alg=None, - integ_key="", - tunnel_src=None, - tunnel_dst=None, - ): + node: dict, + sad_id: int, + spi: int, + crypto_alg: CryptoAlg.InputType = None, + crypto_key: str = "", + integ_alg: IntegAlg.InputType = None, + integ_key: str = "", + tunnel_src: Optional[str] = None, + tunnel_dst: Optional[str] = None, + ) -> None: """Create Security Association Database entry on the VPP node. :param node: VPP node to add SAD entry on. @@ -436,13 +398,15 @@ class IPsecUtil: :type node: dict :type sad_id: int :type spi: int - :type crypto_alg: CryptoAlg + :type crypto_alg: CryptoAlg.InputType :type crypto_key: str - :type integ_alg: Optional[IntegAlg] + :type integ_alg: IntegAlg.InputType :type integ_key: str - :type tunnel_src: str - :type tunnel_dst: str + :type tunnel_src: Optional[str] + :type tunnel_dst: Optional[str] """ + crypto_alg = get_enum_instance(CryptoAlg, crypto_alg) + integ_alg = get_enum_instance(IntegAlg, integ_alg) if isinstance(crypto_key, str): crypto_key = crypto_key.encode(encoding="utf-8") if isinstance(integ_key, str): @@ -473,7 +437,7 @@ class IPsecUtil: spi=int(spi), crypto_algorithm=crypto_alg.alg_int_repr, crypto_key=ckey, - integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0, + integrity_algorithm=integ_alg.alg_int_repr, integrity_key=ikey, flags=flags, tunnel=dict( @@ -485,7 +449,7 @@ class IPsecUtil: ), dscp=int(IpDscp.IP_API_DSCP_CS0), ), - protocol=int(IPsecProto.IPSEC_API_PROTO_ESP), + protocol=IPsecProto.ESP, udp_src_port=IPSEC_UDP_PORT_DEFAULT, udp_dst_port=IPSEC_UDP_PORT_DEFAULT, anti_replay_window_size=IPSEC_REPLAY_WINDOW_DEFAULT, @@ -496,18 +460,18 @@ class IPsecUtil: @staticmethod def vpp_ipsec_add_sad_entries( - node, - n_entries, - sad_id, - spi, - crypto_alg, - crypto_key, - integ_alg=None, - integ_key="", - tunnel_src=None, - tunnel_dst=None, - tunnel_addr_incr=True, - ): + node: dict, + n_entries: int, + sad_id: int, + spi: int, + crypto_alg: CryptoAlg.InputType = None, + crypto_key: str = "", + integ_alg: IntegAlg.InputType = None, + integ_key: str = "", + tunnel_src: Optional[str] = None, + tunnel_dst: Optional[str] = None, + tunnel_addr_incr: bool = True, + ) -> None: """Create multiple Security Association Database entries on VPP node. :param node: VPP node to add SAD entry on. @@ -530,14 +494,16 @@ class IPsecUtil: :type n_entries: int :type sad_id: int :type spi: int - :type crypto_alg: CryptoAlg + :type crypto_alg: CryptoAlg.InputType :type crypto_key: str - :type integ_alg: Optional[IntegAlg] + :type integ_alg: IntegAlg.InputType :type integ_key: str - :type tunnel_src: str - :type tunnel_dst: str + :type tunnel_src: Optional[str] + :type tunnel_dst: Optional[str] :type tunnel_addr_incr: bool """ + crypto_alg = get_enum_instance(CryptoAlg, crypto_alg) + integ_alg = get_enum_instance(IntegAlg, integ_alg) if isinstance(crypto_key, str): crypto_key = crypto_key.encode(encoding="utf-8") if isinstance(integ_key, str): @@ -578,7 +544,7 @@ class IPsecUtil: spi=int(spi), crypto_algorithm=crypto_alg.alg_int_repr, crypto_key=ckey, - integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0, + integrity_algorithm=integ_alg.alg_int_repr, integrity_key=ikey, flags=flags, tunnel=dict( @@ -590,7 +556,7 @@ class IPsecUtil: ), dscp=int(IpDscp.IP_API_DSCP_CS0), ), - protocol=int(IPsecProto.IPSEC_API_PROTO_ESP), + protocol=IPsecProto.ESP, udp_src_port=IPSEC_UDP_PORT_DEFAULT, udp_dst_port=IPSEC_UDP_PORT_DEFAULT, anti_replay_window_size=IPSEC_REPLAY_WINDOW_DEFAULT, @@ -616,15 +582,15 @@ class IPsecUtil: @staticmethod def vpp_ipsec_set_ip_route( - node, - n_tunnels, - tunnel_src, - traffic_addr, - tunnel_dst, - interface, - raddr_range, - dst_mac=None, - ): + node: dict, + n_tunnels: int, + tunnel_src: str, + traffic_addr: str, + tunnel_dst: str, + interface: str, + raddr_range: int, + dst_mac: Optional[str] = None, + ) -> None: """Set IP address and route on interface. :param node: VPP node to add config on. @@ -644,7 +610,7 @@ class IPsecUtil: :type tunnel_dst: str :type interface: str :type raddr_range: int - :type dst_mac: str + :type dst_mac: Optional[str] """ tunnel_src = ip_address(tunnel_src) tunnel_dst = ip_address(tunnel_dst) @@ -717,7 +683,7 @@ class IPsecUtil: papi_exec.get_replies(err_msg) @staticmethod - def vpp_ipsec_add_spd(node, spd_id): + def vpp_ipsec_add_spd(node: dict, spd_id: int) -> None: """Create Security Policy Database on the VPP node. :param node: VPP node to add SPD on. @@ -734,7 +700,9 @@ class IPsecUtil: papi_exec.add(cmd, **args).get_reply(err_msg) @staticmethod - def vpp_ipsec_spd_add_if(node, spd_id, interface): + def vpp_ipsec_spd_add_if( + node: dict, spd_id: int, interface: Union[str, int] + ) -> None: """Add interface to the Security Policy Database. :param node: VPP node. @@ -759,16 +727,16 @@ class IPsecUtil: @staticmethod def vpp_ipsec_create_spds_match_nth_entry( - node, - dir1_interface, - dir2_interface, - entry_amount, - local_addr_range, - remote_addr_range, - action=PolicyAction.BYPASS, - inbound=False, - bidirectional=True, - ): + node: dict, + dir1_interface: Union[str, int], + dir2_interface: Union[str, int], + entry_amount: int, + local_addr_range: Union[str, IPv4Address, IPv6Address], + remote_addr_range: Union[str, IPv4Address, IPv6Address], + action: IpsecSpdAction.InputType = IpsecSpdAction.BYPASS, + inbound: bool = False, + bidirectional: bool = True, + ) -> None: """Create one matching SPD entry for inbound or outbound traffic on a DUT for each traffic direction and also create entry_amount - 1 non-matching SPD entries. Create a Security Policy Database on each @@ -792,27 +760,29 @@ class IPsecUtil: :param remote_addr_range: Matching remote address range in direction 1 in format IP/prefix or IP/mask. If no mask is provided, it's considered to be /32. - :param action: Policy action. + :param action: IPsec SPD action. :param inbound: If True policy is for inbound traffic, otherwise outbound. :param bidirectional: When True, will create SPDs in both directions of traffic. When False, only in one direction. :type node: dict - :type dir1_interface: Union[string, int] - :type dir2_interface: Union[string, int] + :type dir1_interface: Union[str, int] + :type dir2_interface: Union[str, int] :type entry_amount: int :type local_addr_range: - Union[string, ipaddress.IPv4Address, ipaddress.IPv6Address] + Union[str, IPv4Address, IPv6Address] :type remote_addr_range: - Union[string, ipaddress.IPv4Address, ipaddress.IPv6Address] - :type action: IPsecUtil.PolicyAction + Union[str, IPv4Address, IPv6Address] + :type action: IpsecSpdAction.InputType :type inbound: bool :type bidirectional: bool - :raises NotImplementedError: When the action is PolicyAction.PROTECT. + :raises NotImplementedError: When the action is IpsecSpdAction.PROTECT. """ - - if action == PolicyAction.PROTECT: - raise NotImplementedError("Policy action PROTECT is not supported.") + action = get_enum_instance(IpsecSpdAction, action) + if action == IpsecSpdAction.PROTECT: + raise NotImplementedError( + "IPsec SPD action PROTECT is not supported." + ) spd_id_dir1 = 1 spd_id_dir2 = 2 @@ -901,19 +871,19 @@ class IPsecUtil: @staticmethod def _vpp_ipsec_add_spd_entry_internal( - executor, - spd_id, - priority, - action, - inbound=True, - sa_id=None, - proto=None, - laddr_range=None, - raddr_range=None, - lport_range=None, - rport_range=None, - is_ipv6=False, - ): + executor: PapiSocketExecutor, + spd_id: int, + priority: int, + action: IpsecSpdAction.InputType, + inbound: bool = True, + sa_id: Optional[int] = None, + proto: IPsecProto.InputType = None, + laddr_range: Optional[str] = None, + raddr_range: Optional[str] = None, + lport_range: Optional[str] = None, + rport_range: Optional[str] = None, + is_ipv6: bool = False, + ) -> None: """Prepare to create Security Policy Database entry on the VPP node. This just adds one more command to the executor. @@ -923,10 +893,10 @@ class IPsecUtil: :param executor: Open PAPI executor (async handling) to add commands to. :param spd_id: SPD ID to add entry on. :param priority: SPD entry priority, higher number = higher priority. - :param action: Policy action. + :param action: IPsec SPD action. :param inbound: If True policy is for inbound traffic, otherwise outbound. - :param sa_id: SAD entry ID for action PolicyAction.PROTECT. + :param sa_id: SAD entry ID for action IpsecSpdAction.PROTECT. :param proto: Policy selector next layer protocol number. :param laddr_range: Policy selector local IPv4 or IPv6 address range in format IP/prefix or IP/mask. If no mask is provided, @@ -943,16 +913,18 @@ class IPsecUtil: :type executor: PapiSocketExecutor :type spd_id: int :type priority: int - :type action: IPsecUtil.PolicyAction + :type action: IpsecSpdAction.InputType :type inbound: bool - :type sa_id: int - :type proto: int - :type laddr_range: string - :type raddr_range: string - :type lport_range: string - :type rport_range: string + :type sa_id: Optional[int] + :type proto: IPsecProto.InputType + :type laddr_range: Optional[str] + :type raddr_range: Optional[str] + :type lport_range: Optional[str] + :type rport_range: Optional[str] :type is_ipv6: bool """ + action = get_enum_instance(IpsecSpdAction, action) + proto = get_enum_instance(IPsecProto, proto) if laddr_range is None: laddr_range = "::/0" if is_ipv6 else "0.0.0.0/0" @@ -970,7 +942,7 @@ class IPsecUtil: is_outbound=not inbound, sa_id=int(sa_id) if sa_id else 0, policy=int(action), - protocol=255 if proto is None else int(proto), + protocol=proto, remote_address_start=IPAddress.create_ip_address_object( remote_net.network_address ), @@ -1001,28 +973,28 @@ class IPsecUtil: @staticmethod def vpp_ipsec_add_spd_entry( - node, - spd_id, - priority, - action, - inbound=True, - sa_id=None, - proto=None, - laddr_range=None, - raddr_range=None, - lport_range=None, - rport_range=None, - is_ipv6=False, - ): + node: dict, + spd_id: int, + priority: int, + action: IpsecSpdAction.InputType, + inbound: bool = True, + sa_id: Optional[int] = None, + proto: IPsecProto.InputType = None, + laddr_range: Optional[str] = None, + raddr_range: Optional[str] = None, + lport_range: Optional[str] = None, + rport_range: Optional[str] = None, + is_ipv6: bool = False, + ) -> None: """Create Security Policy Database entry on the VPP node. :param node: VPP node to add SPD entry on. :param spd_id: SPD ID to add entry on. :param priority: SPD entry priority, higher number = higher priority. - :param action: Policy action. + :param action: IPsec SPD action. :param inbound: If True policy is for inbound traffic, otherwise outbound. - :param sa_id: SAD entry ID for action PolicyAction.PROTECT. + :param sa_id: SAD entry ID for action IpsecSpdAction.PROTECT. :param proto: Policy selector next layer protocol number. :param laddr_range: Policy selector local IPv4 or IPv6 address range in format IP/prefix or IP/mask. If no mask is provided, @@ -1039,16 +1011,18 @@ class IPsecUtil: :type node: dict :type spd_id: int :type priority: int - :type action: IPsecUtil.PolicyAction + :type action: IpsecSpdAction.InputType :type inbound: bool - :type sa_id: int - :type proto: int - :type laddr_range: string - :type raddr_range: string - :type lport_range: string - :type rport_range: string + :type sa_id: Optional[int] + :type proto: IPsecProto.InputType + :type laddr_range: Optional[str] + :type raddr_range: Optional[str] + :type lport_range: Optional[str] + :type rport_range: Optional[str] :type is_ipv6: bool """ + action = get_enum_instance(IpsecSpdAction, action) + proto = get_enum_instance(IPsecProto, proto) err_msg = ( "Failed to add entry to Security Policy Database" f" {spd_id} on host {node['host']}" @@ -1072,30 +1046,30 @@ class IPsecUtil: @staticmethod def vpp_ipsec_add_spd_entries( - node, - n_entries, - spd_id, - priority, - action, - inbound, - sa_id=None, - proto=None, - laddr_range=None, - raddr_range=None, - lport_range=None, - rport_range=None, - is_ipv6=False, - ): + node: dict, + n_entries: int, + spd_id: int, + priority: Optional[ObjIncrement], + action: IpsecSpdAction.InputType, + inbound: bool, + sa_id: Optional[ObjIncrement] = None, + proto: IPsecProto.InputType = None, + laddr_range: Optional[NetworkIncrement] = None, + raddr_range: Optional[NetworkIncrement] = None, + lport_range: Optional[str] = None, + rport_range: Optional[str] = None, + is_ipv6: bool = False, + ) -> None: """Create multiple Security Policy Database entries on the VPP node. :param node: VPP node to add SPD entries on. :param n_entries: Number of SPD entries to be added. :param spd_id: SPD ID to add entries on. :param priority: SPD entries priority, higher number = higher priority. - :param action: Policy action. + :param action: IPsec SPD action. :param inbound: If True policy is for inbound traffic, otherwise outbound. - :param sa_id: SAD entry ID for action PolicyAction.PROTECT. + :param sa_id: SAD entry ID for action IpsecSpdAction.PROTECT. :param proto: Policy selector next layer protocol number. :param laddr_range: Policy selector local IPv4 or IPv6 address range in format IP/prefix or IP/mask. If no mask is provided, @@ -1112,17 +1086,19 @@ class IPsecUtil: :type node: dict :type n_entries: int :type spd_id: int - :type priority: IPsecUtil.ObjIncrement - :type action: IPsecUtil.PolicyAction + :type priority: Optional[ObjIncrement] + :type action: IpsecSpdAction.InputType :type inbound: bool - :type sa_id: IPsecUtil.ObjIncrement - :type proto: int - :type laddr_range: IPsecUtil.NetworkIncrement - :type raddr_range: IPsecUtil.NetworkIncrement - :type lport_range: string - :type rport_range: string + :type sa_id: Optional[ObjIncrement] + :type proto: IPsecProto.InputType + :type laddr_range: Optional[NetworkIncrement] + :type raddr_range: Optional[NetworkIncrement] + :type lport_range: Optional[str] + :type rport_range: Optional[str] :type is_ipv6: bool """ + action = get_enum_instance(IpsecSpdAction, action) + proto = get_enum_instance(IPsecProto, proto) if laddr_range is None: laddr_range = "::/0" if is_ipv6 else "0.0.0.0/0" laddr_range = NetworkIncrement(ip_network(laddr_range), 0) @@ -1154,7 +1130,9 @@ class IPsecUtil: papi_exec.get_replies(err_msg) @staticmethod - def _ipsec_create_loopback_dut1_papi(nodes, tun_ips, if1_key, if2_key): + def _ipsec_create_loopback_dut1_papi( + nodes: dict, tun_ips: dict, if1_key: str, if2_key: str + ) -> int: """Create loopback interface and set IP address on VPP node 1 interface using PAPI. @@ -1169,6 +1147,8 @@ class IPsecUtil: :type tun_ips: dict :type if1_key: str :type if2_key: str + :returns: sw_if_idx Of the created loopback interface. + :rtype: int """ with PapiSocketExecutor(nodes["DUT1"]) as papi_exec: # Create loopback interface on DUT1, set it to up state @@ -1235,18 +1215,18 @@ class IPsecUtil: @staticmethod def _ipsec_create_tunnel_interfaces_dut1_papi( - nodes, - tun_ips, - if1_key, - if2_key, - n_tunnels, - crypto_alg, - integ_alg, - raddr_ip2, - addr_incr, - spi_d, - existing_tunnels=0, - ): + nodes: dict, + tun_ips: dict, + if1_key: str, + if2_key: str, + n_tunnels: int, + crypto_alg: CryptoAlg.InputType, + integ_alg: IntegAlg.InputType, + raddr_ip2: Union[IPv4Address, IPv6Address], + addr_incr: int, + spi_d: dict, + existing_tunnels: int = 0, + ) -> Tuple[List[bytes], List[bytes]]: """Create multiple IPsec tunnel interfaces on DUT1 node using PAPI. Generate random keys and return them (so DUT2 or TG can decrypt). @@ -1272,15 +1252,17 @@ class IPsecUtil: :type if1_key: str :type if2_key: str :type n_tunnels: int - :type crypto_alg: CryptoAlg - :type integ_alg: Optional[IntegAlg] - :type raddr_ip2: IPv4Address or IPv6Address + :type crypto_alg: CryptoAlg.InputType + :type integ_alg: IntegAlg.InputType + :type raddr_ip2: Union[IPv4Address, IPv6Address] :type addr_incr: int :type spi_d: dict :type existing_tunnels: int :returns: Generated ckeys and ikeys. :rtype: List[bytes], List[bytes] """ + crypto_alg = get_enum_instance(CryptoAlg, crypto_alg) + integ_alg = get_enum_instance(IntegAlg, integ_alg) if not existing_tunnels: loop_sw_if_idx = IPsecUtil._ipsec_create_loopback_dut1_papi( nodes, tun_ips, if1_key, if2_key @@ -1352,10 +1334,10 @@ class IPsecUtil: sad_entry = dict( sad_id=None, spi=None, - protocol=int(IPsecProto.IPSEC_API_PROTO_ESP), + protocol=IPsecProto.ESP, crypto_algorithm=crypto_alg.alg_int_repr, crypto_key=c_key, - integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0, + integrity_algorithm=integ_alg.alg_int_repr, integrity_key=i_key, flags=common_flags, tunnel=dict( @@ -1374,12 +1356,8 @@ class IPsecUtil: ) args = dict(entry=sad_entry) for i in range(existing_tunnels, n_tunnels): - ckeys.append( - gen_key(IPsecUtil.get_crypto_alg_key_len(crypto_alg)) - ) - ikeys.append( - gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg)) - ) + ckeys.append(gen_key(crypto_alg.key_len)) + ikeys.append(gen_key(integ_alg.key_len)) # SAD entry for outband / tx path sad_entry["sad_id"] = i sad_entry["spi"] = spi_d["spi_1"] + i @@ -1480,19 +1458,19 @@ class IPsecUtil: @staticmethod def _ipsec_create_tunnel_interfaces_dut2_papi( - nodes, - tun_ips, - if2_key, - n_tunnels, - crypto_alg, - ckeys, - integ_alg, - ikeys, - raddr_ip1, - addr_incr, - spi_d, - existing_tunnels=0, - ): + nodes: dict, + tun_ips: dict, + if2_key: str, + n_tunnels: int, + crypto_alg: CryptoAlg.InputType, + ckeys: Sequence[bytes], + integ_alg: IntegAlg.InputType, + ikeys: Sequence[bytes], + raddr_ip1: Union[IPv4Address, IPv6Address], + addr_incr: int, + spi_d: dict, + existing_tunnels: int = 0, + ) -> None: """Create multiple IPsec tunnel interfaces on DUT2 node using PAPI. This method accesses keys generated by DUT1 method @@ -1509,6 +1487,8 @@ class IPsecUtil: :param ckeys: List of encryption keys. :param integ_alg: The integrity algorithm name. :param ikeys: List of integrity keys. + :param raddr_ip1: Policy selector remote IPv4/IPv6 start address for the + first tunnel in direction node1->node2. :param spi_d: Dictionary with SPIs for VPP node 1 and VPP node 2. :param addr_incr: IP / IPv6 address incremental step. :param existing_tunnels: Number of tunnel interfaces before creation. @@ -1517,14 +1497,17 @@ class IPsecUtil: :type tun_ips: dict :type if2_key: str :type n_tunnels: int - :type crypto_alg: CryptoAlg + :type crypto_alg: CryptoAlg.InputType :type ckeys: Sequence[bytes] - :type integ_alg: Optional[IntegAlg] + :type integ_alg: IntegAlg.InputType :type ikeys: Sequence[bytes] + :type raddr_ip1: Union[IPv4Address, IPv6Address] :type addr_incr: int :type spi_d: dict :type existing_tunnels: int """ + crypto_alg = get_enum_instance(CryptoAlg, crypto_alg) + integ_alg = get_enum_instance(IntegAlg, integ_alg) with PapiSocketExecutor(nodes["DUT2"], is_async=True) as papi_exec: if not existing_tunnels: # Set IP address on VPP node 2 interface @@ -1589,10 +1572,10 @@ class IPsecUtil: sad_entry = dict( sad_id=None, spi=None, - protocol=int(IPsecProto.IPSEC_API_PROTO_ESP), + protocol=IPsecProto.ESP, crypto_algorithm=crypto_alg.alg_int_repr, crypto_key=c_key, - integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0, + integrity_algorithm=integ_alg.alg_int_repr, integrity_key=i_key, flags=common_flags, tunnel=dict( @@ -1611,12 +1594,8 @@ class IPsecUtil: ) args = dict(entry=sad_entry) for i in range(existing_tunnels, n_tunnels): - ckeys.append( - gen_key(IPsecUtil.get_crypto_alg_key_len(crypto_alg)) - ) - ikeys.append( - gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg)) - ) + ckeys.append(gen_key(crypto_alg.key_len)) + ikeys.append(gen_key(integ_alg.key_len)) # SAD entry for outband / tx path sad_entry["sad_id"] = 100000 + i sad_entry["spi"] = spi_d["spi_2"] + i @@ -1727,20 +1706,20 @@ class IPsecUtil: @staticmethod def vpp_ipsec_create_tunnel_interfaces( - nodes, - tun_if1_ip_addr, - tun_if2_ip_addr, - if1_key, - if2_key, - n_tunnels, - crypto_alg, - integ_alg, - raddr_ip1, - raddr_ip2, - raddr_range, - existing_tunnels=0, - return_keys=False, - ): + nodes: dict, + tun_if1_ip_addr: str, + tun_if2_ip_addr: str, + if1_key: str, + if2_key: str, + n_tunnels: int, + crypto_alg: CryptoAlg.InputType, + integ_alg: IntegAlg.InputType, + raddr_ip1: str, + raddr_ip2: str, + raddr_range: int, + existing_tunnels: int = 0, + return_keys: bool = False, + ) -> Optional[Tuple[List[bytes], List[bytes], int, int]]: """Create multiple IPsec tunnel interfaces between two VPP nodes. Some deployments (e.g. devicetest) need to know the generated keys. @@ -1774,16 +1753,18 @@ class IPsecUtil: :type if1_key: str :type if2_key: str :type n_tunnels: int - :type crypto_alg: CryptoAlg - :type integ_alg: Optonal[IntegAlg] - :type raddr_ip1: string - :type raddr_ip2: string + :type crypto_alg: CryptoAlg.InputType + :type integ_alg: IntegAlg.InputType + :type raddr_ip1: str + :type raddr_ip2: str :type raddr_range: int :type existing_tunnels: int :type return_keys: bool :returns: Ckeys, ikeys, spi_1, spi_2. - :rtype: Optional[List[bytes], List[bytes], int, int] + :rtype: Optional[Tuple[List[bytes], List[bytes], int, int]] """ + crypto_alg = get_enum_instance(CryptoAlg, crypto_alg) + integ_alg = get_enum_instance(IntegAlg, integ_alg) n_tunnels = int(n_tunnels) existing_tunnels = int(existing_tunnels) spi_d = dict(spi_1=100000, spi_2=200000) @@ -1832,13 +1813,17 @@ class IPsecUtil: return None @staticmethod - def _create_ipsec_script_files(dut, instances): + def _create_ipsec_script_files( + dut: str, instances: int + ) -> List[TextIOWrapper]: """Create script files for configuring IPsec in containers :param dut: DUT node on which to create the script files :param instances: number of containers on DUT node - :type dut: string + :type dut: str :type instances: int + :returns: Created opened file handles. + :rtype: List[TextIOWrapper] """ scripts = [] for cnf in range(0, instances): @@ -1849,14 +1834,16 @@ class IPsecUtil: return scripts @staticmethod - def _close_and_copy_ipsec_script_files(dut, nodes, instances, scripts): + def _close_and_copy_ipsec_script_files( + dut: str, nodes: dict, instances: int, scripts: Sequence[TextIOWrapper] + ) -> None: """Close created scripts and copy them to containers :param dut: DUT node on which to create the script files :param nodes: VPP nodes :param instances: number of containers on DUT node :param scripts: dictionary holding the script files - :type dut: string + :type dut: str :type nodes: dict :type instances: int :type scripts: dict @@ -1868,138 +1855,21 @@ class IPsecUtil: ) scp_node(nodes[dut], script_filename, script_filename) - @staticmethod - def vpp_ipsec_create_tunnel_interfaces_in_containers( - nodes, - if1_ip_addr, - if2_ip_addr, - n_tunnels, - crypto_alg, - integ_alg, - raddr_ip1, - raddr_ip2, - raddr_range, - n_instances, - ): - """Create multiple IPsec tunnel interfaces between two VPP nodes. - - :param nodes: VPP nodes to create tunnel interfaces. - :param if1_ip_addr: VPP node 1 interface IP4 address. - :param if2_ip_addr: VPP node 2 interface IP4 address. - :param n_tunnels: Number of tunnell interfaces to create. - :param crypto_alg: The encryption algorithm name. - :param integ_alg: The integrity algorithm name. - :param raddr_ip1: Policy selector remote IPv4 start address for the - first tunnel in direction node1->node2. - :param raddr_ip2: Policy selector remote IPv4 start address for the - first tunnel in direction node2->node1. - :param raddr_range: Mask specifying range of Policy selector Remote - IPv4 addresses. Valid values are from 1 to 32. - :param n_instances: Number of containers. - :type nodes: dict - :type if1_ip_addr: str - :type if2_ip_addr: str - :type n_tunnels: int - :type crypto_alg: CryptoAlg - :type integ_alg: Optional[IntegAlg] - :type raddr_ip1: string - :type raddr_ip2: string - :type raddr_range: int - :type n_instances: int - """ - spi_1 = 100000 - spi_2 = 200000 - addr_incr = 1 << (32 - raddr_range) - - dut1_scripts = IPsecUtil._create_ipsec_script_files("DUT1", n_instances) - dut2_scripts = IPsecUtil._create_ipsec_script_files("DUT2", n_instances) - - for cnf in range(0, n_instances): - dut1_scripts[cnf].write( - "create loopback interface\nset interface state loop0 up\n\n" - ) - dut2_scripts[cnf].write( - f"ip route add {if1_ip_addr}/8 via" - f" {ip_address(if2_ip_addr) + cnf + 100} memif1/{cnf + 1}\n\n" - ) - - for tnl in range(0, n_tunnels): - cnf = tnl % n_instances - ckey = getattr( - gen_key(IPsecUtil.get_crypto_alg_key_len(crypto_alg)), "hex" - ) - integ = "" - ikey = getattr( - gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg)), "hex" - ) - if integ_alg: - integ = ( - f"integ-alg {integ_alg.alg_name}" - f" local-integ-key {ikey}" - f" remote-integ-key {ikey}" - ) - # Configure tunnel end point(s) on left side - dut1_scripts[cnf].write( - "set interface ip address loop0" - f" {ip_address(if1_ip_addr) + tnl * addr_incr}/32\n" - "create ipsec tunnel" - f" local-ip {ip_address(if1_ip_addr) + tnl * addr_incr}" - f" local-spi {spi_1 + tnl}" - f" remote-ip {ip_address(if2_ip_addr) + cnf}" - f" remote-spi {spi_2 + tnl}" - f" crypto-alg {crypto_alg.alg_name}" - f" local-crypto-key {ckey}" - f" remote-crypto-key {ckey}" - f" instance {tnl // n_instances}" - f" salt 0x0 {integ}\n" - f"set interface unnumbered ipip{tnl // n_instances} use loop0\n" - f"set interface state ipip{tnl // n_instances} up\n" - f"ip route add {ip_address(raddr_ip2)+tnl}/32" - f" via ipip{tnl // n_instances}\n\n" - ) - # Configure tunnel end point(s) on right side - dut2_scripts[cnf].write( - f"set ip neighbor memif1/{cnf + 1}" - f" {ip_address(if1_ip_addr) + tnl * addr_incr}" - f" 02:02:00:00:{17:02X}:{cnf:02X} static\n" - f"create ipsec tunnel local-ip {ip_address(if2_ip_addr) + cnf}" - f" local-spi {spi_2 + tnl}" - f" remote-ip {ip_address(if1_ip_addr) + tnl * addr_incr}" - f" remote-spi {spi_1 + tnl}" - f" crypto-alg {crypto_alg.alg_name}" - f" local-crypto-key {ckey}" - f" remote-crypto-key {ckey}" - f" instance {tnl // n_instances}" - f" salt 0x0 {integ}\n" - f"set interface unnumbered ipip{tnl // n_instances}" - f" use memif1/{cnf + 1}\n" - f"set interface state ipip{tnl // n_instances} up\n" - f"ip route add {ip_address(raddr_ip1) + tnl}/32" - f" via ipip{tnl // n_instances}\n\n" - ) - - IPsecUtil._close_and_copy_ipsec_script_files( - "DUT1", nodes, n_instances, dut1_scripts - ) - IPsecUtil._close_and_copy_ipsec_script_files( - "DUT2", nodes, n_instances, dut2_scripts - ) - @staticmethod def vpp_ipsec_add_multiple_tunnels( - nodes, - interface1, - interface2, - n_tunnels, - crypto_alg, - integ_alg, - tunnel_ip1, - tunnel_ip2, - raddr_ip1, - raddr_ip2, - raddr_range, - tunnel_addr_incr=True, - ): + nodes: dict, + interface1: Union[str, int], + interface2: Union[str, int], + n_tunnels: int, + crypto_alg: CryptoAlg.InputType, + integ_alg: IntegAlg.InputType, + tunnel_ip1: str, + tunnel_ip2: str, + raddr_ip1: str, + raddr_ip2: str, + raddr_range: int, + tunnel_addr_incr: bool = True, + ) -> None: """Create multiple IPsec tunnels between two VPP nodes. :param nodes: VPP nodes to create tunnels. @@ -2019,18 +1889,21 @@ class IPsecUtil: :param tunnel_addr_incr: Enable or disable tunnel IP address incremental step. :type nodes: dict - :type interface1: str or int - :type interface2: str or int + :type interface1: Union[str, int] + :type interface2: Union[str, int] :type n_tunnels: int - :type crypto_alg: CryptoAlg - :type integ_alg: Optional[IntegAlg] + :type crypto_alg: CryptoAlg.InputType + :type integ_alg: IntegAlg.InputType :type tunnel_ip1: str :type tunnel_ip2: str - :type raddr_ip1: string - :type raddr_ip2: string + :type raddr_ip1: str + :type raddr_ip2: str :type raddr_range: int :type tunnel_addr_incr: bool """ + crypto_alg = get_enum_instance(CryptoAlg, crypto_alg) + integ_alg = get_enum_instance(IntegAlg, integ_alg) + spd_id = 1 p_hi = 100 p_lo = 10 @@ -2039,15 +1912,8 @@ class IPsecUtil: spi_1 = 300000 spi_2 = 400000 - crypto_key = gen_key( - IPsecUtil.get_crypto_alg_key_len(crypto_alg) - ).decode() - integ_key = ( - gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg)).decode() - if integ_alg - else "" - ) - + crypto_key = gen_key(crypto_alg.key_len).decode() + integ_key = gen_key(integ_alg.key_len).decode() rmac = ( Topology.get_interface_mac(nodes["DUT2"], interface2) if "DUT2" in nodes.keys() @@ -2084,9 +1950,9 @@ class IPsecUtil: nodes["DUT1"], spd_id, p_hi, - PolicyAction.BYPASS, + IpsecSpdAction.BYPASS, inbound=False, - proto=50, + proto=IPsecProto.ESP, laddr_range=dut1_local_outbound_range, raddr_range=dut1_remote_outbound_range, ) @@ -2094,9 +1960,9 @@ class IPsecUtil: nodes["DUT1"], spd_id, p_hi, - PolicyAction.BYPASS, + IpsecSpdAction.BYPASS, inbound=True, - proto=50, + proto=IPsecProto.ESP, laddr_range=dut1_remote_outbound_range, raddr_range=dut1_local_outbound_range, ) @@ -2120,7 +1986,7 @@ class IPsecUtil: n_tunnels, spd_id, priority=ObjIncrement(p_lo, 0), - action=PolicyAction.PROTECT, + action=IpsecSpdAction.PROTECT, inbound=False, sa_id=ObjIncrement(sa_id_1, 1), raddr_range=NetworkIncrement(ip_network(raddr_ip2)), @@ -2144,7 +2010,7 @@ class IPsecUtil: n_tunnels, spd_id, priority=ObjIncrement(p_lo, 0), - action=PolicyAction.PROTECT, + action=IpsecSpdAction.PROTECT, inbound=True, sa_id=ObjIncrement(sa_id_2, 1), raddr_range=NetworkIncrement(ip_network(raddr_ip1)), @@ -2177,9 +2043,9 @@ class IPsecUtil: nodes["DUT2"], spd_id, p_hi, - PolicyAction.BYPASS, + IpsecSpdAction.BYPASS, inbound=False, - proto=50, + proto=IPsecProto.ESP, laddr_range=dut2_remote_outbound_range, raddr_range=dut2_local_outbound_range, ) @@ -2187,9 +2053,9 @@ class IPsecUtil: nodes["DUT2"], spd_id, p_hi, - PolicyAction.BYPASS, + IpsecSpdAction.BYPASS, inbound=True, - proto=50, + proto=IPsecProto.ESP, laddr_range=dut2_local_outbound_range, raddr_range=dut2_remote_outbound_range, ) @@ -2212,7 +2078,7 @@ class IPsecUtil: n_tunnels, spd_id, priority=ObjIncrement(p_lo, 0), - action=PolicyAction.PROTECT, + action=IpsecSpdAction.PROTECT, inbound=True, sa_id=ObjIncrement(sa_id_1, 1), raddr_range=NetworkIncrement(ip_network(raddr_ip2)), @@ -2236,14 +2102,14 @@ class IPsecUtil: n_tunnels, spd_id, priority=ObjIncrement(p_lo, 0), - action=PolicyAction.PROTECT, + action=IpsecSpdAction.PROTECT, inbound=False, sa_id=ObjIncrement(sa_id_2, 1), raddr_range=NetworkIncrement(ip_network(raddr_ip1)), ) @staticmethod - def vpp_ipsec_show_all(node): + def vpp_ipsec_show_all(node: dict) -> None: """Run "show ipsec all" debug CLI command. :param node: Node to run command on. @@ -2252,7 +2118,7 @@ class IPsecUtil: PapiSocketExecutor.run_cli_cmd(node, "show ipsec all") @staticmethod - def show_ipsec_security_association(node): + def show_ipsec_security_association(node: dict) -> None: """Show IPSec security association. :param node: DUT node. @@ -2262,20 +2128,30 @@ class IPsecUtil: PapiSocketExecutor.dump_and_log(node, [cmd]) @staticmethod - def vpp_ipsec_flow_enable_rss(node, proto, rss_type, function="default"): + def vpp_ipsec_flow_enable_rss( + node: dict, + proto: str = "IPSEC_ESP", + rss_type: str = "esp", + function: str = "default", + ) -> int: """Ipsec flow enable rss action. :param node: DUT node. :param proto: The flow protocol. :param rss_type: RSS type. :param function: RSS function. - :type node: dict - :type proto: str + :type proto: IPsecProto.InputType :type rss_type: str :type function: str :returns: flow_index. + :rtype: int """ + # The proto argument does not correspond to IPsecProto. + # The allowed values come from src/vnet/ip/protocols.def + # and we do not have a good enum for that yet. + # FlowUti. and FlowUtil. are close but not exactly the same. + # TODO: to be fixed to use full PAPI when it is ready in VPP cmd = ( f"test flow add src-ip any proto {proto} rss function" @@ -2288,8 +2164,8 @@ class IPsecUtil: @staticmethod def vpp_create_ipsec_flows_on_dut( - node, n_flows, rx_queues, spi_start, interface - ): + node: dict, n_flows: int, rx_queues: int, spi_start: int, interface: str + ) -> None: """Create mutiple ipsec flows and enable flows onto interface. :param node: DUT node. @@ -2303,7 +2179,6 @@ class IPsecUtil: :type rx_queues: int :type spi_start: int :type interface: str - :returns: flow_index. """ for i in range(0, n_flows):