Improve pf layer
[csit.git] / resources / libraries / python / IPsecUtil.py
index e8fdad3..2e6574f 100644 (file)
@@ -22,9 +22,10 @@ from string import ascii_letters
 
 from ipaddress import ip_network, ip_address
 
-from resources.libraries.python.IPUtil import IPUtil
 from resources.libraries.python.InterfaceUtil import InterfaceUtil, \
     InterfaceStatusFlags
+from resources.libraries.python.IPAddress import IPAddress
+from resources.libraries.python.IPUtil import IPUtil
 from resources.libraries.python.PapiExecutor import PapiSocketExecutor
 from resources.libraries.python.ssh import scp_node
 from resources.libraries.python.topology import Topology
@@ -83,8 +84,8 @@ class IntegAlg(Enum):
 
 class IPsecProto(IntEnum):
     """IPsec protocol."""
-    ESP = 1
-    SEC_AH = 0
+    IPSEC_API_PROTO_ESP = 50
+    IPSEC_API_PROTO_AH = 51
 
 
 class IPsecSadFlags(IntEnum):
@@ -229,7 +230,7 @@ class IPsecUtil:
         :returns: IPsecProto enum ESP object.
         :rtype: IPsecProto
         """
-        return int(IPsecProto.ESP)
+        return int(IPsecProto.IPSEC_API_PROTO_ESP)
 
     @staticmethod
     def ipsec_proto_ah():
@@ -238,7 +239,7 @@ class IPsecUtil:
         :returns: IPsecProto enum AH object.
         :rtype: IPsecProto
         """
-        return int(IPsecProto.SEC_AH)
+        return int(IPsecProto.IPSEC_API_PROTO_AH)
 
     @staticmethod
     def vpp_ipsec_select_backend(node, protocol, index=1):
@@ -327,10 +328,10 @@ class IPsecUtil:
             flags=flags,
             tunnel_src=str(src_addr),
             tunnel_dst=str(dst_addr),
-            protocol=int(IPsecProto.ESP)
+            protocol=int(IPsecProto.IPSEC_API_PROTO_ESP)
         )
         args = dict(
-            is_add=1,
+            is_add=True,
             entry=sad_entry
         )
         with PapiSocketExecutor(node) as papi_exec:
@@ -436,10 +437,10 @@ class IPsecUtil:
             flags=flags,
             tunnel_src=str(src_addr),
             tunnel_dst=str(dst_addr),
-            protocol=int(IPsecProto.ESP)
+            protocol=int(IPsecProto.IPSEC_API_PROTO_ESP)
         )
         args = dict(
-            is_add=1,
+            is_add=True,
             entry=sad_entry
         )
         with PapiSocketExecutor(node) as papi_exec:
@@ -547,7 +548,7 @@ class IPsecUtil:
         err_msg = f"Failed to add Security Policy Database " \
             f"on host {node[u'host']}"
         args = dict(
-            is_add=1,
+            is_add=True,
             spd_id=int(spd_id)
         )
         with PapiSocketExecutor(node) as papi_exec:
@@ -568,7 +569,7 @@ class IPsecUtil:
         err_msg = f"Failed to add interface {interface} to Security Policy " \
             f"Database {spd_id} on host {node[u'host']}"
         args = dict(
-            is_add=1,
+            is_add=True,
             sw_if_index=InterfaceUtil.get_interface_index(node, interface),
             spd_id=int(spd_id)
         )
@@ -628,20 +629,20 @@ class IPsecUtil:
         spd_entry = dict(
             spd_id=int(spd_id),
             priority=int(priority),
-            is_outbound=0 if inbound else 1,
+            is_outbound=not inbound,
             sa_id=int(sa_id) if sa_id else 0,
             policy=action.policy_int_repr,
             protocol=int(proto) if proto else 0,
-            remote_address_start=IPUtil.create_ip_address_object(
+            remote_address_start=IPAddress.create_ip_address_object(
                 ip_network(raddr_range, strict=False).network_address
             ),
-            remote_address_stop=IPUtil.create_ip_address_object(
+            remote_address_stop=IPAddress.create_ip_address_object(
                 ip_network(raddr_range, strict=False).broadcast_address
             ),
-            local_address_start=IPUtil.create_ip_address_object(
+            local_address_start=IPAddress.create_ip_address_object(
                 ip_network(laddr_range, strict=False).network_address
             ),
-            local_address_stop=IPUtil.create_ip_address_object(
+            local_address_stop=IPAddress.create_ip_address_object(
                 ip_network(laddr_range, strict=False).broadcast_address
             ),
             remote_port_start=int(rport_range.split(u"-")[0]) if rport_range
@@ -654,7 +655,7 @@ class IPsecUtil:
             else 65535
         )
         args = dict(
-            is_add=1,
+            is_add=True,
             entry=spd_entry
         )
         with PapiSocketExecutor(node) as papi_exec:
@@ -718,16 +719,16 @@ class IPsecUtil:
         spd_entry = dict(
             spd_id=int(spd_id),
             priority=int(priority),
-            is_outbound=0 if inbound else 1,
+            is_outbound=not inbound,
             sa_id=int(sa_id) if sa_id else 0,
             policy=IPsecUtil.policy_action_protect().policy_int_repr,
             protocol=0,
-            remote_address_start=IPUtil.create_ip_address_object(raddr_ip),
-            remote_address_stop=IPUtil.create_ip_address_object(raddr_ip),
-            local_address_start=IPUtil.create_ip_address_object(
+            remote_address_start=IPAddress.create_ip_address_object(raddr_ip),
+            remote_address_stop=IPAddress.create_ip_address_object(raddr_ip),
+            local_address_start=IPAddress.create_ip_address_object(
                 ip_network(laddr_range, strict=False).network_address
             ),
-            local_address_stop=IPUtil.create_ip_address_object(
+            local_address_stop=IPAddress.create_ip_address_object(
                 ip_network(laddr_range, strict=False).broadcast_address
             ),
             remote_port_start=0,
@@ -736,16 +737,16 @@ class IPsecUtil:
             local_port_stop=65535
         )
         args = dict(
-            is_add=1,
+            is_add=True,
             entry=spd_entry
         )
 
         with PapiSocketExecutor(node) as papi_exec:
             for i in range(n_entries):
                 args[u"entry"][u"remote_address_start"][u"un"] = \
-                    IPUtil.union_addr(raddr_ip + i)
+                    IPAddress.union_addr(raddr_ip + i)
                 args[u"entry"][u"remote_address_stop"][u"un"] = \
-                    IPUtil.union_addr(raddr_ip + i)
+                    IPAddress.union_addr(raddr_ip + i)
                 history = bool(not 1 < i < n_entries - 2)
                 papi_exec.add(cmd, history=history, **args)
             papi_exec.get_replies(err_msg)
@@ -966,7 +967,7 @@ class IPsecUtil:
             )
             cmd2 = u"ipsec_tunnel_if_add_del"
             args2 = dict(
-                is_add=1,
+                is_add=True,
                 local_ip=None,
                 remote_ip=None,
                 local_spi=0,
@@ -1001,10 +1002,10 @@ class IPsecUtil:
                 )
                 args2[u"local_spi"] = spi_1 + i
                 args2[u"remote_spi"] = spi_2 + i
-                args2[u"local_ip"] = IPUtil.create_ip_address_object(
+                args2[u"local_ip"] = IPAddress.create_ip_address_object(
                     if1_ip + i * addr_incr
                 )
-                args2[u"remote_ip"] = IPUtil.create_ip_address_object(if2_ip)
+                args2[u"remote_ip"] = IPAddress.create_ip_address_object(if2_ip)
                 args2[u"local_crypto_key_len"] = len(ckeys[i])
                 args2[u"local_crypto_key"] = ckeys[i]
                 args2[u"remote_crypto_key_len"] = len(ckeys[i])
@@ -1077,8 +1078,8 @@ class IPsecUtil:
             # Configure IPsec tunnel interfaces
             cmd2 = u"ipsec_tunnel_if_add_del"
             args2 = dict(
-                is_add=1,
-                local_ip=IPUtil.create_ip_address_object(if2_ip),
+                is_add=True,
+                local_ip=IPAddress.create_ip_address_object(if2_ip),
                 remote_ip=None,
                 local_spi=0,
                 remote_spi=0,
@@ -1100,8 +1101,8 @@ class IPsecUtil:
             for i in range(existing_tunnels, n_tunnels):
                 args2[u"local_spi"] = spi_2 + i
                 args2[u"remote_spi"] = spi_1 + i
-                args2[u"local_ip"] = IPUtil.create_ip_address_object(if2_ip)
-                args2[u"remote_ip"] = IPUtil.create_ip_address_object(
+                args2[u"local_ip"] = IPAddress.create_ip_address_object(if2_ip)
+                args2[u"remote_ip"] = IPAddress.create_ip_address_object(
                     if1_ip + i * addr_incr)
                 args2[u"local_crypto_key_len"] = len(ckeys[i])
                 args2[u"local_crypto_key"] = ckeys[i]