from ipaddress import ip_network, ip_address
from enum import Enum, IntEnum
-from robot.api import logger
from resources.libraries.python.PapiExecutor import PapiExecutor
-from resources.libraries.python.PapiErrors import PapiError
from resources.libraries.python.topology import Topology
from resources.libraries.python.VatExecutor import VatExecutor
from resources.libraries.python.VatJsonUtil import VatJsonUtil
AES_CBC_192 = ('aes-cbc-192', 'AES-CBC', 24)
AES_CBC_256 = ('aes-cbc-256', 'AES-CBC', 32)
AES_GCM_128 = ('aes-gcm-128', 'AES-GCM', 20)
+ AES_GCM_256 = ('aes-gcm-256', 'AES-GCM', 40)
def __init__(self, alg_name, scapy_name, key_len):
self.alg_name = alg_name
SHA_384_192 = ('sha-384-192', 'SHA2-384-192', 48)
SHA_512_256 = ('sha-512-256', 'SHA2-512-256', 64)
AES_GCM_128 = ('aes-gcm-128', 'AES-GCM', 20)
+ AES_GCM_256 = ('aes-gcm-256', 'AES-GCM', 40)
def __init__(self, alg_name, scapy_name, key_len):
self.alg_name = alg_name
"""
return CryptoAlg.AES_GCM_128
+ @staticmethod
+ def crypto_alg_aes_gcm_256():
+ """Return encryption algorithm aes-gcm-256.
+
+ :returns: CryptoAlg enum AES_GCM_128 object.
+ :rtype: CryptoAlg
+ """
+ return CryptoAlg.AES_GCM_256
+
@staticmethod
def get_crypto_alg_key_len(crypto_alg):
"""Return encryption algorithm key length.
"""
return IntegAlg.AES_GCM_128
+ @staticmethod
+ def integ_alg_aes_gcm_256():
+ """Return integrity algorithm AES-GCM-256.
+
+ :returns: IntegAlg enum AES_GCM_256 object.
+ :rtype: IntegAlg
+ """
+ return IntegAlg.AES_GCM_256
+
@staticmethod
def get_integ_alg_key_len(integ_alg):
"""Return integrity algorithm key length.
:type node: dict
:type protocol: IPsecProto
:type index: int
+ :raises RuntimeError: If failed to select IPsec backend or if no API
+ reply received.
"""
- # TODO: move composition of api data to separate method
- api_data = list()
- api = dict(api_name='ipsec_select_backend')
- api_args = dict(protocol=protocol)
- api_args['index'] = index
- api['api_args'] = api_args
- api_data.append(api)
-
- api_reply = None
- with PapiExecutor(node) as papi_executor:
- papi_executor.execute_papi(api_data)
- try:
- papi_executor.papi_should_have_passed()
- except AssertionError:
- raise PapiError('Failed to select IPsec backend on host {host}'.
- format(host=node['host']))
- api_reply = papi_executor.get_papi_reply()
-
- if api_reply is not None:
- api_r = api_reply[0]['api_reply']['ipsec_select_backend_reply']
- if api_r['retval'] == 0:
- logger.trace('IPsec backend successfully selected on host '
- '{host}'.format(host=node['host']))
- else:
- raise PapiError('Failed to select IPsec backend on host {host}'.
- format(host=node['host']))
- else:
- raise PapiError('No reply received for ipsec_select_backend API '
- 'command on host {host}'.format(host=node['host']))
+
+ cmd = 'ipsec_select_backend'
+ cmd_reply = 'ipsec_select_backend_reply'
+ err_msg = 'Failed to select IPsec backend on host {host}'.format(
+ host=node['host'])
+ args = dict(protocol=protocol, index=index)
+ with PapiExecutor(node) as papi_exec:
+ papi_resp = papi_exec.add(cmd, **args).execute_should_pass(err_msg)
+ data = papi_resp.reply[0]['api_reply'][cmd_reply]
+ if data['retval'] != 0:
+ raise RuntimeError('Failed to select IPsec backend on host {host}'.
+ format(host=node['host']))
@staticmethod
def vpp_ipsec_backend_dump(node):
:param node: VPP node to dump IPsec backend on.
:type node: dict
"""
- # TODO: move composition of api data to separate method
- api_data = list()
- api = dict(api_name='ipsec_backend_dump')
- api_args = dict()
- api['api_args'] = api_args
- api_data.append(api)
-
- api_reply = None
- with PapiExecutor(node) as papi_executor:
- papi_executor.execute_papi(api_data)
- try:
- papi_executor.papi_should_have_passed()
- except AssertionError:
- raise PapiError('Failed to dump IPsec backends on host {host}'.
- format(host=node['host']))
- # After API change there is returned VPP internal enum object
- # representing VPP IPSEC protocol instead of integer representation
- # so JSON fails to decode it - we need to check if it is Python API
- # bug or we need to adapt vpp_papi_provider to correctly encode
- # such object into JSON
- # api_reply = papi_executor.get_papi_reply()
- api_reply = papi_executor.get_papi_stdout()
-
- if api_reply is not None:
- logger.trace('IPsec backend dump\n{dump}'.format(dump=api_reply))
- else:
- raise PapiError('No reply received for ipsec_select_backend API '
- 'command on host {host}'.format(host=node['host']))
+
+ err_msg = 'Failed to dump IPsec backends on host {host}'.format(
+ host=node['host'])
+ with PapiExecutor(node) as papi_exec:
+ papi_exec.add('ipsec_backend_dump').execute_should_pass(
+ err_msg, process_reply=False)
@staticmethod
def vpp_ipsec_add_sad_entry(node, sad_id, spi, crypto_alg, crypto_key,
if tunnel_src is not None and tunnel_dst is not None else ''
integ = 'integ-alg {0} integ-key {1}'.format(integ_alg.alg_name, ikey)\
- if crypto_alg.alg_name != 'aes-gcm-128' else ''
+ if crypto_alg.alg_name != 'aes-gcm-128' and \
+ crypto_alg.alg_name != 'aes-gcm-256' else ''
with open(tmp_filename, 'w') as tmp_file:
for i in range(0, n_entries):