X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Flibraries%2Fpython%2FIPsecUtil.py;h=e066bc94247d97ab1e51af2b4eaf44a1dd0612ce;hp=19995e547de2465d7a8e1575d45df25920c14460;hb=HEAD;hpb=82315071628b1ee70d699699cafb5afa425e135f diff --git a/resources/libraries/python/IPsecUtil.py b/resources/libraries/python/IPsecUtil.py index 19995e547d..59374ab73f 100644 --- a/resources/libraries/python/IPsecUtil.py +++ b/resources/libraries/python/IPsecUtil.py @@ -24,6 +24,7 @@ from typing import Iterable, List, Optional, Sequence, Tuple, Union from robot.libraries.BuiltIn import BuiltIn from resources.libraries.python.Constants import Constants +from resources.libraries.python.enum_util import get_enum_instance from resources.libraries.python.IncrementUtil import ObjIncrement from resources.libraries.python.InterfaceUtil import ( InterfaceUtil, @@ -60,27 +61,33 @@ def gen_key(length: int) -> bytes: ) -class PolicyAction(Enum): - """Policy actions.""" +# TODO: Introduce a metaclass that adds .find and .InputType automatically? +class IpsecSpdAction(Enum): + """IPsec SPD actions. - BYPASS = ("bypass", 0) + Mirroring VPP: src/vnet/ipsec/ipsec_types.api enum ipsec_spd_action. + """ + + BYPASS = NONE = ("bypass", 0) DISCARD = ("discard", 1) + RESOLVE = ("resolve", 2) PROTECT = ("protect", 3) - def __init__(self, policy_name: str, policy_int_repr: int): - self.policy_name = policy_name - self.policy_int_repr = policy_int_repr + def __init__(self, action_name: str, action_int_repr: int): + self.action_name = action_name + self.action_int_repr = action_int_repr def __str__(self) -> str: - return self.policy_name + return self.action_name def __int__(self) -> int: - return self.policy_int_repr + return self.action_int_repr class CryptoAlg(Enum): """Encryption algorithms.""" + NONE = ("none", 0, "none", 0) AES_CBC_128 = ("aes-cbc-128", 1, "AES-CBC", 16) AES_CBC_256 = ("aes-cbc-256", 3, "AES-CBC", 32) AES_GCM_128 = ("aes-gcm-128", 7, "AES-GCM", 16) @@ -94,10 +101,16 @@ class CryptoAlg(Enum): self.scapy_name = scapy_name self.key_len = key_len + # TODO: Investigate if __int__ works with PAPI. It was not enough for "if". + def __bool__(self): + """A shorthand to enable "if crypto_alg:" constructs.""" + return self.alg_int_repr != 0 + class IntegAlg(Enum): """Integrity algorithm.""" + NONE = ("none", 0, "none", 0) SHA_256_128 = ("sha-256-128", 4, "SHA2-256-128", 32) SHA_512_256 = ("sha-512-256", 6, "SHA2-512-256", 64) @@ -109,18 +122,44 @@ class IntegAlg(Enum): self.scapy_name = scapy_name self.key_len = key_len + def __bool__(self): + """A shorthand to enable "if integ_alg:" constructs.""" + return self.alg_int_repr != 0 + +# TODO: Base on Enum, so str values can be defined as in alg enums? class IPsecProto(IntEnum): - """IPsec protocol.""" + """IPsec protocol. + + Mirroring VPP: src/vnet/ipsec/ipsec_types.api enum ipsec_proto. + """ + + ESP = 50 + AH = 51 + NONE = 255 + + def __str__(self) -> str: + """Return string suitable for CLI commands. - IPSEC_API_PROTO_ESP = 50 - IPSEC_API_PROTO_AH = 51 + None is not supported. + + :returns: Lowercase name of the proto. + :rtype: str + :raises: ValueError if the numeric value is not recognized. + """ + num = int(self) + if num == 50: + return "esp" + if num == 51: + return "ah" + raise ValueError(f"String form not defined for IPsecProto {num}") +# The rest of enums do not appear outside this file, so no no change needed yet. class IPsecSadFlags(IntEnum): """IPsec Security Association Database flags.""" - IPSEC_API_SAD_FLAG_NONE = 0 + IPSEC_API_SAD_FLAG_NONE = NONE = 0 # Enable extended sequence numbers IPSEC_API_SAD_FLAG_USE_ESN = 0x01 # Enable Anti - replay @@ -139,7 +178,7 @@ class IPsecSadFlags(IntEnum): class TunnelEncpaDecapFlags(IntEnum): """Flags controlling tunnel behaviour.""" - TUNNEL_API_ENCAP_DECAP_FLAG_NONE = 0 + TUNNEL_API_ENCAP_DECAP_FLAG_NONE = NONE = 0 # at encap, copy the DF bit of the payload into the tunnel header TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DF = 1 # at encap, set the DF bit in the tunnel header @@ -156,177 +195,93 @@ class TunnelMode(IntEnum): """Tunnel modes.""" # point-to-point - TUNNEL_API_MODE_P2P = 0 + TUNNEL_API_MODE_P2P = NONE = 0 # multi-point TUNNEL_API_MODE_MP = 1 -class IPsecUtil: - """IPsec utilities.""" - - @staticmethod - def policy_action_bypass() -> PolicyAction: - """Return policy action bypass. - - :returns: PolicyAction enum BYPASS object. - :rtype: PolicyAction - """ - return PolicyAction.BYPASS - - @staticmethod - def policy_action_discard() -> PolicyAction: - """Return policy action discard. - - :returns: PolicyAction enum DISCARD object. - :rtype: PolicyAction - """ - return PolicyAction.DISCARD - - @staticmethod - def policy_action_protect() -> PolicyAction: - """Return policy action protect. - - :returns: PolicyAction enum PROTECT object. - :rtype: PolicyAction - """ - return PolicyAction.PROTECT - - @staticmethod - def crypto_alg_aes_cbc_128() -> CryptoAlg: - """Return encryption algorithm aes-cbc-128. - - :returns: CryptoAlg enum AES_CBC_128 object. - :rtype: CryptoAlg - """ - return CryptoAlg.AES_CBC_128 - - @staticmethod - def crypto_alg_aes_cbc_256() -> CryptoAlg: - """Return encryption algorithm aes-cbc-256. - - :returns: CryptoAlg enum AES_CBC_256 object. - :rtype: CryptoAlg - """ - return CryptoAlg.AES_CBC_256 - - @staticmethod - def crypto_alg_aes_gcm_128() -> CryptoAlg: - """Return encryption algorithm aes-gcm-128. +# Derived types for type hints, based on capabilities of get_enum_instance. +IpsecSpdAction.InputType = Union[IpsecSpdAction, str, None] +CryptoAlg.InputType = Union[CryptoAlg, str, None] +IntegAlg.InputType = Union[IntegAlg, str, None] +IPsecProto.InputType = Union[IPsecProto, str, int, None] +# TODO: Introduce a metaclass that adds .find and .InputType automatically? - :returns: CryptoAlg enum AES_GCM_128 object. - :rtype: CryptoAlg - """ - return CryptoAlg.AES_GCM_128 - @staticmethod - def crypto_alg_aes_gcm_256() -> CryptoAlg: - """Return encryption algorithm aes-gcm-256. +class IPsecUtil: + """IPsec utilities.""" - :returns: CryptoAlg enum AES_GCM_128 object. - :rtype: CryptoAlg - """ - return CryptoAlg.AES_GCM_256 + # The following 4 methods are Python one-liners, + # but they are useful when called as a Robot keyword. @staticmethod - def get_crypto_alg_key_len(crypto_alg: CryptoAlg) -> int: + def get_crypto_alg_key_len(crypto_alg: CryptoAlg.InputType) -> int: """Return encryption algorithm key length. + This is a Python one-liner, but useful when called as a Robot keyword. + :param crypto_alg: Encryption algorithm. - :type crypto_alg: CryptoAlg + :type crypto_alg: CryptoAlg.InputType :returns: Key length. :rtype: int """ - return crypto_alg.key_len + return get_enum_instance(CryptoAlg, crypto_alg).key_len @staticmethod - def get_crypto_alg_scapy_name(crypto_alg: CryptoAlg) -> str: + def get_crypto_alg_scapy_name(crypto_alg: CryptoAlg.InputType) -> str: """Return encryption algorithm scapy name. + This is a Python one-liner, but useful when called as a Robot keyword. + :param crypto_alg: Encryption algorithm. - :type crypto_alg: CryptoAlg + :type crypto_alg: CryptoAlg.InputType :returns: Algorithm scapy name. :rtype: str """ - return crypto_alg.scapy_name - - @staticmethod - def integ_alg_sha_256_128() -> IntegAlg: - """Return integrity algorithm SHA-256-128. - - :returns: IntegAlg enum SHA_256_128 object. - :rtype: IntegAlg - """ - return IntegAlg.SHA_256_128 + return get_enum_instance(CryptoAlg, crypto_alg).scapy_name + # The below to keywords differ only by enum type conversion from str. @staticmethod - def integ_alg_sha_512_256() -> IntegAlg: - """Return integrity algorithm SHA-512-256. - - :returns: IntegAlg enum SHA_512_256 object. - :rtype: IntegAlg - """ - return IntegAlg.SHA_512_256 - - @staticmethod - def get_integ_alg_key_len(integ_alg: Optional[IntegAlg]) -> int: + def get_integ_alg_key_len(integ_alg: IntegAlg.InputType) -> int: """Return integrity algorithm key length. - None argument is accepted, returning zero. - :param integ_alg: Integrity algorithm. - :type integ_alg: Optional[IntegAlg] + :type integ_alg: IntegAlg.InputType :returns: Key length. :rtype: int """ - return 0 if integ_alg is None else integ_alg.key_len + return get_enum_instance(IntegAlg, integ_alg).key_len @staticmethod - def get_integ_alg_scapy_name(integ_alg: Optional[IntegAlg]) -> str: + def get_integ_alg_scapy_name(integ_alg: IntegAlg.InputType) -> str: """Return integrity algorithm scapy name. :param integ_alg: Integrity algorithm. - :type integ_alg: IntegAlg + :type integ_alg: IntegAlg.InputType :returns: Algorithm scapy name. :rtype: str """ - return integ_alg.scapy_name - - @staticmethod - def ipsec_proto_esp() -> int: - """Return IPSec protocol ESP. - - :returns: IPsecProto enum ESP object. - :rtype: IPsecProto - """ - return int(IPsecProto.IPSEC_API_PROTO_ESP) - - @staticmethod - def ipsec_proto_ah() -> int: - """Return IPSec protocol AH. - - :returns: IPsecProto enum AH object. - :rtype: IPsecProto - """ - return int(IPsecProto.IPSEC_API_PROTO_AH) + return get_enum_instance(IntegAlg, integ_alg).scapy_name @staticmethod def vpp_ipsec_select_backend( - node: dict, protocol: int, index: int = 1 + node: dict, proto: IPsecProto.InputType, index: int = 1 ) -> None: """Select IPsec backend. :param node: VPP node to select IPsec backend on. - :param protocol: IPsec protocol. + :param proto: IPsec protocol. :param index: Backend index. :type node: dict - :type protocol: IPsecProto + :type proto: IPsecProto.InputType :type index: int :raises RuntimeError: If failed to select IPsec backend or if no API reply received. """ + proto = get_enum_instance(IPsecProto, proto) cmd = "ipsec_select_backend" err_msg = f"Failed to select IPsec backend on host {node['host']}" - args = dict(protocol=protocol, index=index) + args = dict(protocol=proto, index=index) with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) @@ -420,9 +375,9 @@ class IPsecUtil: node: dict, sad_id: int, spi: int, - crypto_alg: CryptoAlg, - crypto_key: str, - integ_alg: Optional[IntegAlg] = None, + crypto_alg: CryptoAlg.InputType = None, + crypto_key: str = "", + integ_alg: IntegAlg.InputType = None, integ_key: str = "", tunnel_src: Optional[str] = None, tunnel_dst: Optional[str] = None, @@ -443,13 +398,15 @@ class IPsecUtil: :type node: dict :type sad_id: int :type spi: int - :type crypto_alg: CryptoAlg + :type crypto_alg: CryptoAlg.InputType :type crypto_key: str - :type integ_alg: Optional[IntegAlg] + :type integ_alg: IntegAlg.InputType :type integ_key: str :type tunnel_src: Optional[str] :type tunnel_dst: Optional[str] """ + crypto_alg = get_enum_instance(CryptoAlg, crypto_alg) + integ_alg = get_enum_instance(IntegAlg, integ_alg) if isinstance(crypto_key, str): crypto_key = crypto_key.encode(encoding="utf-8") if isinstance(integ_key, str): @@ -480,7 +437,7 @@ class IPsecUtil: spi=int(spi), crypto_algorithm=crypto_alg.alg_int_repr, crypto_key=ckey, - integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0, + integrity_algorithm=integ_alg.alg_int_repr, integrity_key=ikey, flags=flags, tunnel=dict( @@ -492,7 +449,7 @@ class IPsecUtil: ), dscp=int(IpDscp.IP_API_DSCP_CS0), ), - protocol=int(IPsecProto.IPSEC_API_PROTO_ESP), + protocol=IPsecProto.ESP, udp_src_port=IPSEC_UDP_PORT_DEFAULT, udp_dst_port=IPSEC_UDP_PORT_DEFAULT, anti_replay_window_size=IPSEC_REPLAY_WINDOW_DEFAULT, @@ -507,9 +464,9 @@ class IPsecUtil: n_entries: int, sad_id: int, spi: int, - crypto_alg: CryptoAlg, - crypto_key: str, - integ_alg: Optional[IntegAlg] = None, + crypto_alg: CryptoAlg.InputType = None, + crypto_key: str = "", + integ_alg: IntegAlg.InputType = None, integ_key: str = "", tunnel_src: Optional[str] = None, tunnel_dst: Optional[str] = None, @@ -537,14 +494,16 @@ class IPsecUtil: :type n_entries: int :type sad_id: int :type spi: int - :type crypto_alg: CryptoAlg + :type crypto_alg: CryptoAlg.InputType :type crypto_key: str - :type integ_alg: Optional[IntegAlg] + :type integ_alg: IntegAlg.InputType :type integ_key: str :type tunnel_src: Optional[str] :type tunnel_dst: Optional[str] :type tunnel_addr_incr: bool """ + crypto_alg = get_enum_instance(CryptoAlg, crypto_alg) + integ_alg = get_enum_instance(IntegAlg, integ_alg) if isinstance(crypto_key, str): crypto_key = crypto_key.encode(encoding="utf-8") if isinstance(integ_key, str): @@ -585,7 +544,7 @@ class IPsecUtil: spi=int(spi), crypto_algorithm=crypto_alg.alg_int_repr, crypto_key=ckey, - integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0, + integrity_algorithm=integ_alg.alg_int_repr, integrity_key=ikey, flags=flags, tunnel=dict( @@ -597,7 +556,7 @@ class IPsecUtil: ), dscp=int(IpDscp.IP_API_DSCP_CS0), ), - protocol=int(IPsecProto.IPSEC_API_PROTO_ESP), + protocol=IPsecProto.ESP, udp_src_port=IPSEC_UDP_PORT_DEFAULT, udp_dst_port=IPSEC_UDP_PORT_DEFAULT, anti_replay_window_size=IPSEC_REPLAY_WINDOW_DEFAULT, @@ -774,7 +733,7 @@ class IPsecUtil: entry_amount: int, local_addr_range: Union[str, IPv4Address, IPv6Address], remote_addr_range: Union[str, IPv4Address, IPv6Address], - action: PolicyAction = PolicyAction.BYPASS, + action: IpsecSpdAction.InputType = IpsecSpdAction.BYPASS, inbound: bool = False, bidirectional: bool = True, ) -> None: @@ -801,7 +760,7 @@ class IPsecUtil: :param remote_addr_range: Matching remote address range in direction 1 in format IP/prefix or IP/mask. If no mask is provided, it's considered to be /32. - :param action: Policy action. + :param action: IPsec SPD action. :param inbound: If True policy is for inbound traffic, otherwise outbound. :param bidirectional: When True, will create SPDs in both directions @@ -814,14 +773,16 @@ class IPsecUtil: Union[str, IPv4Address, IPv6Address] :type remote_addr_range: Union[str, IPv4Address, IPv6Address] - :type action: PolicyAction + :type action: IpsecSpdAction.InputType :type inbound: bool :type bidirectional: bool - :raises NotImplementedError: When the action is PolicyAction.PROTECT. + :raises NotImplementedError: When the action is IpsecSpdAction.PROTECT. """ - - if action == PolicyAction.PROTECT: - raise NotImplementedError("Policy action PROTECT is not supported.") + action = get_enum_instance(IpsecSpdAction, action) + if action == IpsecSpdAction.PROTECT: + raise NotImplementedError( + "IPsec SPD action PROTECT is not supported." + ) spd_id_dir1 = 1 spd_id_dir2 = 2 @@ -913,10 +874,10 @@ class IPsecUtil: executor: PapiSocketExecutor, spd_id: int, priority: int, - action: PolicyAction, + action: IpsecSpdAction.InputType, inbound: bool = True, sa_id: Optional[int] = None, - proto: Optional[int] = None, + proto: IPsecProto.InputType = None, laddr_range: Optional[str] = None, raddr_range: Optional[str] = None, lport_range: Optional[str] = None, @@ -932,10 +893,10 @@ class IPsecUtil: :param executor: Open PAPI executor (async handling) to add commands to. :param spd_id: SPD ID to add entry on. :param priority: SPD entry priority, higher number = higher priority. - :param action: Policy action. + :param action: IPsec SPD action. :param inbound: If True policy is for inbound traffic, otherwise outbound. - :param sa_id: SAD entry ID for action PolicyAction.PROTECT. + :param sa_id: SAD entry ID for action IpsecSpdAction.PROTECT. :param proto: Policy selector next layer protocol number. :param laddr_range: Policy selector local IPv4 or IPv6 address range in format IP/prefix or IP/mask. If no mask is provided, @@ -952,16 +913,18 @@ class IPsecUtil: :type executor: PapiSocketExecutor :type spd_id: int :type priority: int - :type action: PolicyAction + :type action: IpsecSpdAction.InputType :type inbound: bool :type sa_id: Optional[int] - :type proto: Optional[int] + :type proto: IPsecProto.InputType :type laddr_range: Optional[str] :type raddr_range: Optional[str] :type lport_range: Optional[str] :type rport_range: Optional[str] :type is_ipv6: bool """ + action = get_enum_instance(IpsecSpdAction, action) + proto = get_enum_instance(IPsecProto, proto) if laddr_range is None: laddr_range = "::/0" if is_ipv6 else "0.0.0.0/0" @@ -979,7 +942,7 @@ class IPsecUtil: is_outbound=not inbound, sa_id=int(sa_id) if sa_id else 0, policy=int(action), - protocol=255 if proto is None else int(proto), + protocol=proto, remote_address_start=IPAddress.create_ip_address_object( remote_net.network_address ), @@ -1013,10 +976,10 @@ class IPsecUtil: node: dict, spd_id: int, priority: int, - action: PolicyAction, + action: IpsecSpdAction.InputType, inbound: bool = True, sa_id: Optional[int] = None, - proto: Optional[int] = None, + proto: IPsecProto.InputType = None, laddr_range: Optional[str] = None, raddr_range: Optional[str] = None, lport_range: Optional[str] = None, @@ -1028,10 +991,10 @@ class IPsecUtil: :param node: VPP node to add SPD entry on. :param spd_id: SPD ID to add entry on. :param priority: SPD entry priority, higher number = higher priority. - :param action: Policy action. + :param action: IPsec SPD action. :param inbound: If True policy is for inbound traffic, otherwise outbound. - :param sa_id: SAD entry ID for action PolicyAction.PROTECT. + :param sa_id: SAD entry ID for action IpsecSpdAction.PROTECT. :param proto: Policy selector next layer protocol number. :param laddr_range: Policy selector local IPv4 or IPv6 address range in format IP/prefix or IP/mask. If no mask is provided, @@ -1048,16 +1011,18 @@ class IPsecUtil: :type node: dict :type spd_id: int :type priority: int - :type action: PolicyAction + :type action: IpsecSpdAction.InputType :type inbound: bool :type sa_id: Optional[int] - :type proto: Optional[int] + :type proto: IPsecProto.InputType :type laddr_range: Optional[str] :type raddr_range: Optional[str] :type lport_range: Optional[str] :type rport_range: Optional[str] :type is_ipv6: bool """ + action = get_enum_instance(IpsecSpdAction, action) + proto = get_enum_instance(IPsecProto, proto) err_msg = ( "Failed to add entry to Security Policy Database" f" {spd_id} on host {node['host']}" @@ -1085,10 +1050,10 @@ class IPsecUtil: n_entries: int, spd_id: int, priority: Optional[ObjIncrement], - action: PolicyAction, + action: IpsecSpdAction.InputType, inbound: bool, sa_id: Optional[ObjIncrement] = None, - proto: Optional[int] = None, + proto: IPsecProto.InputType = None, laddr_range: Optional[NetworkIncrement] = None, raddr_range: Optional[NetworkIncrement] = None, lport_range: Optional[str] = None, @@ -1101,10 +1066,10 @@ class IPsecUtil: :param n_entries: Number of SPD entries to be added. :param spd_id: SPD ID to add entries on. :param priority: SPD entries priority, higher number = higher priority. - :param action: Policy action. + :param action: IPsec SPD action. :param inbound: If True policy is for inbound traffic, otherwise outbound. - :param sa_id: SAD entry ID for action PolicyAction.PROTECT. + :param sa_id: SAD entry ID for action IpsecSpdAction.PROTECT. :param proto: Policy selector next layer protocol number. :param laddr_range: Policy selector local IPv4 or IPv6 address range in format IP/prefix or IP/mask. If no mask is provided, @@ -1122,16 +1087,18 @@ class IPsecUtil: :type n_entries: int :type spd_id: int :type priority: Optional[ObjIncrement] - :type action: PolicyAction + :type action: IpsecSpdAction.InputType :type inbound: bool :type sa_id: Optional[ObjIncrement] - :type proto: Optional[int] + :type proto: IPsecProto.InputType :type laddr_range: Optional[NetworkIncrement] :type raddr_range: Optional[NetworkIncrement] :type lport_range: Optional[str] :type rport_range: Optional[str] :type is_ipv6: bool """ + action = get_enum_instance(IpsecSpdAction, action) + proto = get_enum_instance(IPsecProto, proto) if laddr_range is None: laddr_range = "::/0" if is_ipv6 else "0.0.0.0/0" laddr_range = NetworkIncrement(ip_network(laddr_range), 0) @@ -1253,8 +1220,8 @@ class IPsecUtil: if1_key: str, if2_key: str, n_tunnels: int, - crypto_alg: CryptoAlg, - integ_alg: Optional[IntegAlg], + crypto_alg: CryptoAlg.InputType, + integ_alg: IntegAlg.InputType, raddr_ip2: Union[IPv4Address, IPv6Address], addr_incr: int, spi_d: dict, @@ -1285,8 +1252,8 @@ class IPsecUtil: :type if1_key: str :type if2_key: str :type n_tunnels: int - :type crypto_alg: CryptoAlg - :type integ_alg: Optional[IntegAlg] + :type crypto_alg: CryptoAlg.InputType + :type integ_alg: IntegAlg.InputType :type raddr_ip2: Union[IPv4Address, IPv6Address] :type addr_incr: int :type spi_d: dict @@ -1294,6 +1261,8 @@ class IPsecUtil: :returns: Generated ckeys and ikeys. :rtype: List[bytes], List[bytes] """ + crypto_alg = get_enum_instance(CryptoAlg, crypto_alg) + integ_alg = get_enum_instance(IntegAlg, integ_alg) if not existing_tunnels: loop_sw_if_idx = IPsecUtil._ipsec_create_loopback_dut1_papi( nodes, tun_ips, if1_key, if2_key @@ -1365,10 +1334,10 @@ class IPsecUtil: sad_entry = dict( sad_id=None, spi=None, - protocol=int(IPsecProto.IPSEC_API_PROTO_ESP), + protocol=IPsecProto.ESP, crypto_algorithm=crypto_alg.alg_int_repr, crypto_key=c_key, - integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0, + integrity_algorithm=integ_alg.alg_int_repr, integrity_key=i_key, flags=common_flags, tunnel=dict( @@ -1387,12 +1356,8 @@ class IPsecUtil: ) args = dict(entry=sad_entry) for i in range(existing_tunnels, n_tunnels): - ckeys.append( - gen_key(IPsecUtil.get_crypto_alg_key_len(crypto_alg)) - ) - ikeys.append( - gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg)) - ) + ckeys.append(gen_key(crypto_alg.key_len)) + ikeys.append(gen_key(integ_alg.key_len)) # SAD entry for outband / tx path sad_entry["sad_id"] = i sad_entry["spi"] = spi_d["spi_1"] + i @@ -1497,9 +1462,9 @@ class IPsecUtil: tun_ips: dict, if2_key: str, n_tunnels: int, - crypto_alg: CryptoAlg, + crypto_alg: CryptoAlg.InputType, ckeys: Sequence[bytes], - integ_alg: Optional[IntegAlg], + integ_alg: IntegAlg.InputType, ikeys: Sequence[bytes], raddr_ip1: Union[IPv4Address, IPv6Address], addr_incr: int, @@ -1532,15 +1497,17 @@ class IPsecUtil: :type tun_ips: dict :type if2_key: str :type n_tunnels: int - :type crypto_alg: CryptoAlg + :type crypto_alg: CryptoAlg.InputType :type ckeys: Sequence[bytes] - :type integ_alg: Optional[IntegAlg] + :type integ_alg: IntegAlg.InputType :type ikeys: Sequence[bytes] :type raddr_ip1: Union[IPv4Address, IPv6Address] :type addr_incr: int :type spi_d: dict :type existing_tunnels: int """ + crypto_alg = get_enum_instance(CryptoAlg, crypto_alg) + integ_alg = get_enum_instance(IntegAlg, integ_alg) with PapiSocketExecutor(nodes["DUT2"], is_async=True) as papi_exec: if not existing_tunnels: # Set IP address on VPP node 2 interface @@ -1605,10 +1572,10 @@ class IPsecUtil: sad_entry = dict( sad_id=None, spi=None, - protocol=int(IPsecProto.IPSEC_API_PROTO_ESP), + protocol=IPsecProto.ESP, crypto_algorithm=crypto_alg.alg_int_repr, crypto_key=c_key, - integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0, + integrity_algorithm=integ_alg.alg_int_repr, integrity_key=i_key, flags=common_flags, tunnel=dict( @@ -1627,12 +1594,8 @@ class IPsecUtil: ) args = dict(entry=sad_entry) for i in range(existing_tunnels, n_tunnels): - ckeys.append( - gen_key(IPsecUtil.get_crypto_alg_key_len(crypto_alg)) - ) - ikeys.append( - gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg)) - ) + ckeys.append(gen_key(crypto_alg.key_len)) + ikeys.append(gen_key(integ_alg.key_len)) # SAD entry for outband / tx path sad_entry["sad_id"] = 100000 + i sad_entry["spi"] = spi_d["spi_2"] + i @@ -1749,8 +1712,8 @@ class IPsecUtil: if1_key: str, if2_key: str, n_tunnels: int, - crypto_alg: CryptoAlg, - integ_alg: Optional[IntegAlg], + crypto_alg: CryptoAlg.InputType, + integ_alg: IntegAlg.InputType, raddr_ip1: str, raddr_ip2: str, raddr_range: int, @@ -1790,8 +1753,8 @@ class IPsecUtil: :type if1_key: str :type if2_key: str :type n_tunnels: int - :type crypto_alg: CryptoAlg - :type integ_alg: Optional[IntegAlg] + :type crypto_alg: CryptoAlg.InputType + :type integ_alg: IntegAlg.InputType :type raddr_ip1: str :type raddr_ip2: str :type raddr_range: int @@ -1800,6 +1763,8 @@ class IPsecUtil: :returns: Ckeys, ikeys, spi_1, spi_2. :rtype: Optional[Tuple[List[bytes], List[bytes], int, int]] """ + crypto_alg = get_enum_instance(CryptoAlg, crypto_alg) + integ_alg = get_enum_instance(IntegAlg, integ_alg) n_tunnels = int(n_tunnels) existing_tunnels = int(existing_tunnels) spi_d = dict(spi_1=100000, spi_2=200000) @@ -1896,8 +1861,8 @@ class IPsecUtil: interface1: Union[str, int], interface2: Union[str, int], n_tunnels: int, - crypto_alg: CryptoAlg, - integ_alg: Optional[IntegAlg], + crypto_alg: CryptoAlg.InputType, + integ_alg: IntegAlg.InputType, tunnel_ip1: str, tunnel_ip2: str, raddr_ip1: str, @@ -1927,8 +1892,8 @@ class IPsecUtil: :type interface1: Union[str, int] :type interface2: Union[str, int] :type n_tunnels: int - :type crypto_alg: CryptoAlg - :type integ_alg: Optional[IntegAlg] + :type crypto_alg: CryptoAlg.InputType + :type integ_alg: IntegAlg.InputType :type tunnel_ip1: str :type tunnel_ip2: str :type raddr_ip1: str @@ -1936,6 +1901,9 @@ class IPsecUtil: :type raddr_range: int :type tunnel_addr_incr: bool """ + crypto_alg = get_enum_instance(CryptoAlg, crypto_alg) + integ_alg = get_enum_instance(IntegAlg, integ_alg) + spd_id = 1 p_hi = 100 p_lo = 10 @@ -1944,15 +1912,8 @@ class IPsecUtil: spi_1 = 300000 spi_2 = 400000 - crypto_key = gen_key( - IPsecUtil.get_crypto_alg_key_len(crypto_alg) - ).decode() - integ_key = ( - gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg)).decode() - if integ_alg - else "" - ) - + crypto_key = gen_key(crypto_alg.key_len).decode() + integ_key = gen_key(integ_alg.key_len).decode() rmac = ( Topology.get_interface_mac(nodes["DUT2"], interface2) if "DUT2" in nodes.keys() @@ -1989,9 +1950,9 @@ class IPsecUtil: nodes["DUT1"], spd_id, p_hi, - PolicyAction.BYPASS, + IpsecSpdAction.BYPASS, inbound=False, - proto=50, + proto=IPsecProto.ESP, laddr_range=dut1_local_outbound_range, raddr_range=dut1_remote_outbound_range, ) @@ -1999,9 +1960,9 @@ class IPsecUtil: nodes["DUT1"], spd_id, p_hi, - PolicyAction.BYPASS, + IpsecSpdAction.BYPASS, inbound=True, - proto=50, + proto=IPsecProto.ESP, laddr_range=dut1_remote_outbound_range, raddr_range=dut1_local_outbound_range, ) @@ -2025,7 +1986,7 @@ class IPsecUtil: n_tunnels, spd_id, priority=ObjIncrement(p_lo, 0), - action=PolicyAction.PROTECT, + action=IpsecSpdAction.PROTECT, inbound=False, sa_id=ObjIncrement(sa_id_1, 1), raddr_range=NetworkIncrement(ip_network(raddr_ip2)), @@ -2049,7 +2010,7 @@ class IPsecUtil: n_tunnels, spd_id, priority=ObjIncrement(p_lo, 0), - action=PolicyAction.PROTECT, + action=IpsecSpdAction.PROTECT, inbound=True, sa_id=ObjIncrement(sa_id_2, 1), raddr_range=NetworkIncrement(ip_network(raddr_ip1)), @@ -2082,9 +2043,9 @@ class IPsecUtil: nodes["DUT2"], spd_id, p_hi, - PolicyAction.BYPASS, + IpsecSpdAction.BYPASS, inbound=False, - proto=50, + proto=IPsecProto.ESP, laddr_range=dut2_remote_outbound_range, raddr_range=dut2_local_outbound_range, ) @@ -2092,9 +2053,9 @@ class IPsecUtil: nodes["DUT2"], spd_id, p_hi, - PolicyAction.BYPASS, + IpsecSpdAction.BYPASS, inbound=True, - proto=50, + proto=IPsecProto.ESP, laddr_range=dut2_local_outbound_range, raddr_range=dut2_remote_outbound_range, ) @@ -2117,7 +2078,7 @@ class IPsecUtil: n_tunnels, spd_id, priority=ObjIncrement(p_lo, 0), - action=PolicyAction.PROTECT, + action=IpsecSpdAction.PROTECT, inbound=True, sa_id=ObjIncrement(sa_id_1, 1), raddr_range=NetworkIncrement(ip_network(raddr_ip2)), @@ -2141,7 +2102,7 @@ class IPsecUtil: n_tunnels, spd_id, priority=ObjIncrement(p_lo, 0), - action=PolicyAction.PROTECT, + action=IpsecSpdAction.PROTECT, inbound=False, sa_id=ObjIncrement(sa_id_2, 1), raddr_range=NetworkIncrement(ip_network(raddr_ip1)), @@ -2168,7 +2129,10 @@ class IPsecUtil: @staticmethod def vpp_ipsec_flow_enable_rss( - node: dict, proto: str, rss_type: str, function: str = "default" + node: dict, + proto: str = "IPSEC_ESP", + rss_type: str = "esp", + function: str = "default", ) -> int: """Ipsec flow enable rss action. @@ -2176,14 +2140,18 @@ class IPsecUtil: :param proto: The flow protocol. :param rss_type: RSS type. :param function: RSS function. - :type node: dict - :type proto: str + :type proto: IPsecProto.InputType :type rss_type: str :type function: str :returns: flow_index. :rtype: int """ + # The proto argument does not correspond to IPsecProto. + # The allowed values come from src/vnet/ip/protocols.def + # and we do not have a good enum for that yet. + # FlowUti. and FlowUtil. are close but not exactly the same. + # TODO: to be fixed to use full PAPI when it is ready in VPP cmd = ( f"test flow add src-ip any proto {proto} rss function"