X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=resources%2Flibraries%2Fpython%2FIPsecUtil.py;h=b08c2a6e48748752efa5235c7c00c06c09725ac3;hb=ee7c9ed2829ff45151934c2558a93de40b4cf608;hp=2479ec182276e9df391232f900429f29fa7c3767;hpb=83e9b227be8316ac06b9104ce9096cfe4b06605e;p=csit.git diff --git a/resources/libraries/python/IPsecUtil.py b/resources/libraries/python/IPsecUtil.py index 2479ec1822..b08c2a6e48 100644 --- a/resources/libraries/python/IPsecUtil.py +++ b/resources/libraries/python/IPsecUtil.py @@ -17,10 +17,8 @@ import os from ipaddress import ip_network, ip_address from enum import Enum, IntEnum -from robot.api import logger from resources.libraries.python.PapiExecutor import PapiExecutor -from resources.libraries.python.PapiErrors import PapiError from resources.libraries.python.topology import Topology from resources.libraries.python.VatExecutor import VatExecutor from resources.libraries.python.VatJsonUtil import VatJsonUtil @@ -42,6 +40,7 @@ class CryptoAlg(Enum): AES_CBC_192 = ('aes-cbc-192', 'AES-CBC', 24) AES_CBC_256 = ('aes-cbc-256', 'AES-CBC', 32) AES_GCM_128 = ('aes-gcm-128', 'AES-GCM', 20) + AES_GCM_256 = ('aes-gcm-256', 'AES-GCM', 40) def __init__(self, alg_name, scapy_name, key_len): self.alg_name = alg_name @@ -56,6 +55,7 @@ class IntegAlg(Enum): SHA_384_192 = ('sha-384-192', 'SHA2-384-192', 48) SHA_512_256 = ('sha-512-256', 'SHA2-512-256', 64) AES_GCM_128 = ('aes-gcm-128', 'AES-GCM', 20) + AES_GCM_256 = ('aes-gcm-256', 'AES-GCM', 40) def __init__(self, alg_name, scapy_name, key_len): self.alg_name = alg_name @@ -135,6 +135,15 @@ class IPsecUtil(object): """ return CryptoAlg.AES_GCM_128 + @staticmethod + def crypto_alg_aes_gcm_256(): + """Return encryption algorithm aes-gcm-256. + + :returns: CryptoAlg enum AES_GCM_128 object. + :rtype: CryptoAlg + """ + return CryptoAlg.AES_GCM_256 + @staticmethod def get_crypto_alg_key_len(crypto_alg): """Return encryption algorithm key length. @@ -202,6 +211,15 @@ class IPsecUtil(object): """ return IntegAlg.AES_GCM_128 + @staticmethod + def integ_alg_aes_gcm_256(): + """Return integrity algorithm AES-GCM-256. + + :returns: IntegAlg enum AES_GCM_256 object. + :rtype: IntegAlg + """ + return IntegAlg.AES_GCM_256 + @staticmethod def get_integ_alg_key_len(integ_alg): """Return integrity algorithm key length. @@ -252,36 +270,21 @@ class IPsecUtil(object): :type node: dict :type protocol: IPsecProto :type index: int + :raises RuntimeError: If failed to select IPsec backend or if no API + reply received. """ - # TODO: move composition of api data to separate method - api_data = list() - api = dict(api_name='ipsec_select_backend') - api_args = dict(protocol=protocol) - api_args['index'] = index - api['api_args'] = api_args - api_data.append(api) - - api_reply = None - with PapiExecutor(node) as papi_executor: - papi_executor.execute_papi(api_data) - try: - papi_executor.papi_should_have_passed() - except AssertionError: - raise PapiError('Failed to select IPsec backend on host {host}'. - format(host=node['host'])) - api_reply = papi_executor.get_papi_reply() - - if api_reply is not None: - api_r = api_reply[0]['api_reply']['ipsec_select_backend_reply'] - if api_r['retval'] == 0: - logger.trace('IPsec backend successfully selected on host ' - '{host}'.format(host=node['host'])) - else: - raise PapiError('Failed to select IPsec backend on host {host}'. - format(host=node['host'])) - else: - raise PapiError('No reply received for ipsec_select_backend API ' - 'command on host {host}'.format(host=node['host'])) + + cmd = 'ipsec_select_backend' + cmd_reply = 'ipsec_select_backend_reply' + err_msg = 'Failed to select IPsec backend on host {host}'.format( + host=node['host']) + args = dict(protocol=protocol, index=index) + with PapiExecutor(node) as papi_exec: + papi_resp = papi_exec.add(cmd, **args).execute_should_pass(err_msg) + data = papi_resp.reply[0]['api_reply'][cmd_reply] + if data['retval'] != 0: + raise RuntimeError('Failed to select IPsec backend on host {host}'. + format(host=node['host'])) @staticmethod def vpp_ipsec_backend_dump(node): @@ -290,34 +293,12 @@ class IPsecUtil(object): :param node: VPP node to dump IPsec backend on. :type node: dict """ - # TODO: move composition of api data to separate method - api_data = list() - api = dict(api_name='ipsec_backend_dump') - api_args = dict() - api['api_args'] = api_args - api_data.append(api) - - api_reply = None - with PapiExecutor(node) as papi_executor: - papi_executor.execute_papi(api_data) - try: - papi_executor.papi_should_have_passed() - except AssertionError: - raise PapiError('Failed to dump IPsec backends on host {host}'. - format(host=node['host'])) - # After API change there is returned VPP internal enum object - # representing VPP IPSEC protocol instead of integer representation - # so JSON fails to decode it - we need to check if it is Python API - # bug or we need to adapt vpp_papi_provider to correctly encode - # such object into JSON - # api_reply = papi_executor.get_papi_reply() - api_reply = papi_executor.get_papi_stdout() - - if api_reply is not None: - logger.trace('IPsec backend dump\n{dump}'.format(dump=api_reply)) - else: - raise PapiError('No reply received for ipsec_select_backend API ' - 'command on host {host}'.format(host=node['host'])) + + err_msg = 'Failed to dump IPsec backends on host {host}'.format( + host=node['host']) + with PapiExecutor(node) as papi_exec: + papi_exec.add('ipsec_backend_dump').execute_should_pass( + err_msg, process_reply=False) @staticmethod def vpp_ipsec_add_sad_entry(node, sad_id, spi, crypto_alg, crypto_key, @@ -399,7 +380,8 @@ class IPsecUtil(object): if tunnel_src is not None and tunnel_dst is not None else '' integ = 'integ-alg {0} integ-key {1}'.format(integ_alg.alg_name, ikey)\ - if crypto_alg.alg_name != 'aes-gcm-128' else '' + if crypto_alg.alg_name != 'aes-gcm-128' and \ + crypto_alg.alg_name != 'aes-gcm-256' else '' with open(tmp_filename, 'w') as tmp_file: for i in range(0, n_entries):