CSIT-1506: Rework IPsec base testcases
[csit.git] / resources / libraries / python / IPsecUtil.py
index 2479ec1..b08c2a6 100644 (file)
@@ -17,10 +17,8 @@ import os
 from ipaddress import ip_network, ip_address
 
 from enum import Enum, IntEnum
-from robot.api import logger
 
 from resources.libraries.python.PapiExecutor import PapiExecutor
-from resources.libraries.python.PapiErrors import PapiError
 from resources.libraries.python.topology import Topology
 from resources.libraries.python.VatExecutor import VatExecutor
 from resources.libraries.python.VatJsonUtil import VatJsonUtil
@@ -42,6 +40,7 @@ class CryptoAlg(Enum):
     AES_CBC_192 = ('aes-cbc-192', 'AES-CBC', 24)
     AES_CBC_256 = ('aes-cbc-256', 'AES-CBC', 32)
     AES_GCM_128 = ('aes-gcm-128', 'AES-GCM', 20)
+    AES_GCM_256 = ('aes-gcm-256', 'AES-GCM', 40)
 
     def __init__(self, alg_name, scapy_name, key_len):
         self.alg_name = alg_name
@@ -56,6 +55,7 @@ class IntegAlg(Enum):
     SHA_384_192 = ('sha-384-192', 'SHA2-384-192', 48)
     SHA_512_256 = ('sha-512-256', 'SHA2-512-256', 64)
     AES_GCM_128 = ('aes-gcm-128', 'AES-GCM', 20)
+    AES_GCM_256 = ('aes-gcm-256', 'AES-GCM', 40)
 
     def __init__(self, alg_name, scapy_name, key_len):
         self.alg_name = alg_name
@@ -135,6 +135,15 @@ class IPsecUtil(object):
         """
         return CryptoAlg.AES_GCM_128
 
+    @staticmethod
+    def crypto_alg_aes_gcm_256():
+        """Return encryption algorithm aes-gcm-256.
+
+        :returns: CryptoAlg enum AES_GCM_128 object.
+        :rtype: CryptoAlg
+        """
+        return CryptoAlg.AES_GCM_256
+
     @staticmethod
     def get_crypto_alg_key_len(crypto_alg):
         """Return encryption algorithm key length.
@@ -202,6 +211,15 @@ class IPsecUtil(object):
         """
         return IntegAlg.AES_GCM_128
 
+    @staticmethod
+    def integ_alg_aes_gcm_256():
+        """Return integrity algorithm AES-GCM-256.
+
+        :returns: IntegAlg enum AES_GCM_256 object.
+        :rtype: IntegAlg
+        """
+        return IntegAlg.AES_GCM_256
+
     @staticmethod
     def get_integ_alg_key_len(integ_alg):
         """Return integrity algorithm key length.
@@ -252,36 +270,21 @@ class IPsecUtil(object):
         :type node: dict
         :type protocol: IPsecProto
         :type index: int
+        :raises RuntimeError: If failed to select IPsec backend or if no API
+            reply received.
         """
-        # TODO: move composition of api data to separate method
-        api_data = list()
-        api = dict(api_name='ipsec_select_backend')
-        api_args = dict(protocol=protocol)
-        api_args['index'] = index
-        api['api_args'] = api_args
-        api_data.append(api)
-
-        api_reply = None
-        with PapiExecutor(node) as papi_executor:
-            papi_executor.execute_papi(api_data)
-            try:
-                papi_executor.papi_should_have_passed()
-            except AssertionError:
-                raise PapiError('Failed to select IPsec backend on host {host}'.
-                                format(host=node['host']))
-            api_reply = papi_executor.get_papi_reply()
-
-        if api_reply is not None:
-            api_r = api_reply[0]['api_reply']['ipsec_select_backend_reply']
-            if api_r['retval'] == 0:
-                logger.trace('IPsec backend successfully selected on host '
-                             '{host}'.format(host=node['host']))
-            else:
-                raise PapiError('Failed to select IPsec backend on host {host}'.
-                                format(host=node['host']))
-        else:
-            raise PapiError('No reply received for ipsec_select_backend API '
-                            'command on host {host}'.format(host=node['host']))
+
+        cmd = 'ipsec_select_backend'
+        cmd_reply = 'ipsec_select_backend_reply'
+        err_msg = 'Failed to select IPsec backend on host {host}'.format(
+            host=node['host'])
+        args = dict(protocol=protocol, index=index)
+        with PapiExecutor(node) as papi_exec:
+            papi_resp = papi_exec.add(cmd, **args).execute_should_pass(err_msg)
+        data = papi_resp.reply[0]['api_reply'][cmd_reply]
+        if data['retval'] != 0:
+            raise RuntimeError('Failed to select IPsec backend on host {host}'.
+                               format(host=node['host']))
 
     @staticmethod
     def vpp_ipsec_backend_dump(node):
@@ -290,34 +293,12 @@ class IPsecUtil(object):
         :param node: VPP node to dump IPsec backend on.
         :type node: dict
         """
-        # TODO: move composition of api data to separate method
-        api_data = list()
-        api = dict(api_name='ipsec_backend_dump')
-        api_args = dict()
-        api['api_args'] = api_args
-        api_data.append(api)
-
-        api_reply = None
-        with PapiExecutor(node) as papi_executor:
-            papi_executor.execute_papi(api_data)
-            try:
-                papi_executor.papi_should_have_passed()
-            except AssertionError:
-                raise PapiError('Failed to dump IPsec backends on host {host}'.
-                                format(host=node['host']))
-            # After API change there is returned VPP internal enum object
-            # representing VPP IPSEC protocol instead of integer representation
-            # so JSON fails to decode it - we need to check if it is Python API
-            # bug or we need to adapt vpp_papi_provider to correctly encode
-            # such object into JSON
-            # api_reply = papi_executor.get_papi_reply()
-            api_reply = papi_executor.get_papi_stdout()
-
-        if api_reply is not None:
-            logger.trace('IPsec backend dump\n{dump}'.format(dump=api_reply))
-        else:
-            raise PapiError('No reply received for ipsec_select_backend API '
-                            'command on host {host}'.format(host=node['host']))
+
+        err_msg = 'Failed to dump IPsec backends on host {host}'.format(
+            host=node['host'])
+        with PapiExecutor(node) as papi_exec:
+            papi_exec.add('ipsec_backend_dump').execute_should_pass(
+                err_msg, process_reply=False)
 
     @staticmethod
     def vpp_ipsec_add_sad_entry(node, sad_id, spi, crypto_alg, crypto_key,
@@ -399,7 +380,8 @@ class IPsecUtil(object):
             if tunnel_src is not None and tunnel_dst is not None else ''
 
         integ = 'integ-alg {0} integ-key {1}'.format(integ_alg.alg_name, ikey)\
-            if crypto_alg.alg_name != 'aes-gcm-128' else ''
+            if crypto_alg.alg_name != 'aes-gcm-128' and \
+               crypto_alg.alg_name != 'aes-gcm-256' else ''
 
         with open(tmp_filename, 'w') as tmp_file:
             for i in range(0, n_entries):