from resources.libraries.python.VatJsonUtil import VatJsonUtil
-# pylint: disable=too-few-public-methods
class PolicyAction(Enum):
"""Policy actions."""
BYPASS = 'bypass'
class CryptoAlg(Enum):
"""Encryption algorithms."""
AES_CBC_128 = ('aes-cbc-128', 'AES-CBC', 16)
- AES_CBC_192 = ('aes-cbc-128', 'AES-CBC', 24)
+ AES_CBC_192 = ('aes-cbc-192', 'AES-CBC', 24)
AES_CBC_256 = ('aes-cbc-256', 'AES-CBC', 32)
def __init__(self, alg_name, scapy_name, key_len):
class IPsecUtil(object):
"""IPsec utilities."""
- # pylint: disable=too-many-arguments
- # pylint: disable=too-many-locals
-
@staticmethod
def policy_action_bypass():
"""Return policy action bypass.
- :return: PolicyAction enum BYPASS object.
+ :returns: PolicyAction enum BYPASS object.
:rtype: PolicyAction
"""
return PolicyAction.BYPASS
def policy_action_discard():
"""Return policy action discard.
- :return: PolicyAction enum DISCARD object.
+ :returns: PolicyAction enum DISCARD object.
:rtype: PolicyAction
"""
return PolicyAction.DISCARD
def policy_action_protect():
"""Return policy action protect.
- :return: PolicyAction enum PROTECT object.
+ :returns: PolicyAction enum PROTECT object.
:rtype: PolicyAction
"""
return PolicyAction.PROTECT
def crypto_alg_aes_cbc_128():
"""Return encryption algorithm aes-cbc-128.
- :return: CryptoAlg enum AES_CBC_128 object.
+ :returns: CryptoAlg enum AES_CBC_128 object.
:rtype: CryptoAlg
"""
return CryptoAlg.AES_CBC_128
def crypto_alg_aes_cbc_192():
"""Return encryption algorithm aes-cbc-192.
- :return: CryptoAlg enum AES_CBC_192 objec.
+ :returns: CryptoAlg enum AES_CBC_192 objec.
:rtype: CryptoAlg
"""
return CryptoAlg.AES_CBC_192
def crypto_alg_aes_cbc_256():
"""Return encryption algorithm aes-cbc-256.
- :return: CryptoAlg enum AES_CBC_256 object.
+ :returns: CryptoAlg enum AES_CBC_256 object.
:rtype: CryptoAlg
"""
return CryptoAlg.AES_CBC_256
:param crypto_alg: Encryption algorithm.
:type crypto_alg: CryptoAlg
- :return: Key length.
+ :returns: Key length.
:rtype: int
"""
return crypto_alg.key_len
:param crypto_alg: Encryption algorithm.
:type crypto_alg: CryptoAlg
- :return: Algorithm scapy name.
+ :returns: Algorithm scapy name.
:rtype: str
"""
return crypto_alg.scapy_name
def integ_alg_sha1_96():
"""Return integrity algorithm SHA1-96.
- :return: IntegAlg enum SHA1_96 object.
+ :returns: IntegAlg enum SHA1_96 object.
:rtype: IntegAlg
"""
return IntegAlg.SHA1_96
def integ_alg_sha_256_128():
"""Return integrity algorithm SHA-256-128.
- :return: IntegAlg enum SHA_256_128 object.
+ :returns: IntegAlg enum SHA_256_128 object.
:rtype: IntegAlg
"""
return IntegAlg.SHA_256_128
def integ_alg_sha_384_192():
"""Return integrity algorithm SHA-384-192.
- :return: IntegAlg enum SHA_384_192 object.
+ :returns: IntegAlg enum SHA_384_192 object.
:rtype: IntegAlg
"""
return IntegAlg.SHA_384_192
def integ_alg_sha_512_256():
"""Return integrity algorithm SHA-512-256.
- :return: IntegAlg enum SHA_512_256 object.
+ :returns: IntegAlg enum SHA_512_256 object.
:rtype: IntegAlg
"""
return IntegAlg.SHA_512_256
:param integ_alg: Integrity algorithm.
:type integ_alg: IntegAlg
- :return: Key length.
+ :returns: Key length.
:rtype: int
"""
return integ_alg.key_len
:param integ_alg: Integrity algorithm.
:type integ_alg: IntegAlg
- :return: Algorithm scapy name.
+ :returns: Algorithm scapy name.
:rtype: str
"""
return integ_alg.scapy_name
"""
ckey = crypto_key.encode('hex')
ikey = integ_key.encode('hex')
- tunnel = ''
- if tunnel_src is not None and tunnel_dst is not None:
- tunnel = 'tunnel_src {0} tunnel_dst {1}'.format(tunnel_src,
- tunnel_dst)
+ tunnel = 'tunnel_src {0} tunnel_dst {1}'.format(tunnel_src, tunnel_dst)\
+ if tunnel_src is not None and tunnel_dst is not None else ''
+
out = VatExecutor.cmd_from_template(node,
"ipsec/ipsec_sad_add_entry.vat",
sad_id=sad_id, spi=spi,
out[0],
err_msg='Add SAD entry failed on {0}'.format(node['host']))
+ @staticmethod
+ def vpp_ipsec_sa_set_key(node, sa_id, crypto_key, integ_key):
+ """Update Security Association (SA) keys.
+
+ :param node: VPP node to update SA keys.
+ :param sa_id: SAD entry ID.
+ :param crypto_key: The encryption key string.
+ :param integ_key: The integrity key string.
+ :type node: dict
+ :type sa_id: int
+ :type crypto_key: str
+ :type integ_key: str
+ """
+ ckey = crypto_key.encode('hex')
+ ikey = integ_key.encode('hex')
+
+ out = VatExecutor.cmd_from_template(node,
+ "ipsec/ipsec_sa_set_key.vat",
+ sa_id=sa_id,
+ ckey=ckey, ikey=ikey)
+ VatJsonUtil.verify_vat_retval(
+ out[0],
+ err_msg='Update SA key failed on {0}'.format(node['host']))
+
@staticmethod
def vpp_ipsec_add_spd(node, spd_id):
"""Create Security Policy Database on the VPP node.
@staticmethod
def vpp_ipsec_spd_add_if(node, spd_id, interface):
- """Add interface to the SPD.
+ """Add interface to the Security Policy Database.
:param node: VPP node.
:param spd_id: SPD ID to add interface on.
:type spd_id: int
:type interface: str or int
"""
- if isinstance(interface, basestring):
- sw_if_index = Topology.get_interface_sw_index(node, interface)
- else:
- sw_if_index = interface
+ sw_if_index = Topology.get_interface_sw_index(node, interface)\
+ if isinstance(interface, basestring) else interface
+
out = VatExecutor.cmd_from_template(node,
"ipsec/ipsec_interface_add_spd.vat",
spd_id=spd_id, sw_if_id=sw_if_index)
format IP/prefix or IP/mask. If no mask is provided, it's considered
to be /32.
:param proto: Policy selector next layer protocol number.
- :param lport_range: Policy selector local TCP/UDP port range in foramt
+ :param lport_range: Policy selector local TCP/UDP port range in format
<port_start>-<port_end>.
- :param rport_range: Policy selector remote TCP/UDP port range in foramt
+ :param rport_range: Policy selector remote TCP/UDP port range in format
<port_start>-<port_end>.
:type node: dict
:type spd_id: int
:type lport_range: string
:type rport_range: string
"""
- direction = 'outbound'
- if inbound:
- direction = 'inbound'
+ direction = 'inbound' if inbound else 'outbound'
act_str = action.value
if PolicyAction.PROTECT == action and sa_id is not None: