acl: API cleanup
[vpp.git] / src / plugins / acl / test / test_acl_plugin_l2l3.py
index 3379871..30b5372 100644 (file)
 
 """
 
+import copy
 import unittest
 from socket import inet_pton, AF_INET, AF_INET6
 from random import choice, shuffle
 from pprint import pprint
+from ipaddress import ip_network
 
 import scapy.compat
 from scapy.packet import Raw
@@ -40,6 +42,8 @@ from framework import VppTestCase, VppTestRunner
 from vpp_l2 import L2_PORT_TYPE
 import time
 
+from vpp_acl import AclRule, VppAcl, VppAclInterface
+
 
 class TestACLpluginL2L3(VppTestCase):
     """TestACLpluginL2L3 Test Case"""
@@ -259,31 +263,26 @@ class TestACLpluginL2L3(VppTestCase):
             else:
                 rule_l4_proto = p[IP].proto
 
-            new_rule = {
-                        'is_permit': is_permit,
-                        'is_ipv6': p.haslayer(IPv6),
-                        'src_ip_addr': inet_pton(rule_family,
-                                                 p[rule_l3_layer].src),
-                        'src_ip_prefix_len': rule_prefix_len,
-                        'dst_ip_addr': inet_pton(rule_family,
-                                                 p[rule_l3_layer].dst),
-                        'dst_ip_prefix_len': rule_prefix_len,
-                        'srcport_or_icmptype_first': rule_l4_sport,
-                        'srcport_or_icmptype_last': rule_l4_sport,
-                        'dstport_or_icmpcode_first': rule_l4_dport,
-                        'dstport_or_icmpcode_last': rule_l4_dport,
-                        'proto': rule_l4_proto,
-                       }
+            new_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto,
+                               src_prefix=ip_network(
+                                   (p[rule_l3_layer].src, rule_prefix_len)),
+                               dst_prefix=ip_network(
+                                   (p[rule_l3_layer].dst, rule_prefix_len)),
+                               sport_from=rule_l4_sport,
+                               sport_to=rule_l4_sport,
+                               dport_from=rule_l4_dport,
+                               dport_to=rule_l4_dport)
+
             rules.append(new_rule)
-            new_rule_permit = new_rule.copy()
-            new_rule_permit['is_permit'] = 1
+            new_rule_permit = copy.copy(new_rule)
+            new_rule_permit.is_permit = 1
             permit_rules.append(new_rule_permit)
 
-            new_rule_permit_and_reflect = new_rule.copy()
+            new_rule_permit_and_reflect = copy.copy(new_rule)
             if can_reflect_this_packet:
-                new_rule_permit_and_reflect['is_permit'] = 2
+                new_rule_permit_and_reflect.is_permit = 2
             else:
-                new_rule_permit_and_reflect['is_permit'] = is_permit
+                new_rule_permit_and_reflect.is_permit = is_permit
 
             permit_and_reflect_rules.append(new_rule_permit_and_reflect)
             self.logger.info("create_stream pkt#%d: %s" % (i, payload))
@@ -356,79 +355,67 @@ class TestACLpluginL2L3(VppTestCase):
 
             # UDP:
 
-    def applied_acl_shuffle(self, sw_if_index):
-        # first collect what ACLs are applied and what they look like
-        r = self.vapi.acl_interface_list_dump(sw_if_index=sw_if_index)
-        orig_applied_acls = r[0]
-
-        # we will collect these just to save and generate additional rulesets
-        orig_acls = []
-        for acl_num in orig_applied_acls.acls:
-            rr = self.vapi.acl_dump(acl_num)
-            orig_acls.append(rr[0])
+    def applied_acl_shuffle(self, acl_if):
+        saved_n_input = acl_if.n_input
+        # TOTO: maybe copy each one??
+        saved_acls = acl_if.acls
 
         # now create a list of all the rules in all ACLs
         all_rules = []
-        for old_acl in orig_acls:
-            for rule in old_acl.r:
-                all_rules.append(dict(rule._asdict()))
+        for old_acl in saved_acls:
+            for rule in old_acl.rules:
+                all_rules.append(rule)
 
         # Add a few ACLs made from shuffled rules
         shuffle(all_rules)
-        reply = self.vapi.acl_add_replace(acl_index=4294967295,
-                                          r=all_rules[::2],
-                                          tag=b"shuffle 1. acl")
-        shuffle_acl_1 = reply.acl_index
+        acl1 = VppAcl(self, rules=all_rules[::2], tag="shuffle 1. acl")
+        acl1.add_vpp_config()
+
         shuffle(all_rules)
-        reply = self.vapi.acl_add_replace(acl_index=4294967295,
-                                          r=all_rules[::3],
-                                          tag=b"shuffle 2. acl")
-        shuffle_acl_2 = reply.acl_index
+        acl2 = VppAcl(self, rules=all_rules[::3], tag="shuffle 2. acl")
+        acl2.add_vpp_config()
+
         shuffle(all_rules)
-        reply = self.vapi.acl_add_replace(acl_index=4294967295,
-                                          r=all_rules[::2],
-                                          tag=b"shuffle 3. acl")
-        shuffle_acl_3 = reply.acl_index
+        acl3 = VppAcl(self, rules=all_rules[::2], tag="shuffle 3. acl")
+        acl3.add_vpp_config()
 
         # apply the shuffle ACLs in front
-        input_acls = [shuffle_acl_1, shuffle_acl_2]
-        output_acls = [shuffle_acl_1, shuffle_acl_2]
+        input_acls = [acl1, acl2]
+        output_acls = [acl1, acl2]
 
         # add the currently applied ACLs
-        n_input = orig_applied_acls.n_input
-        input_acls.extend(orig_applied_acls.acls[:n_input])
-        output_acls.extend(orig_applied_acls.acls[n_input:])
+        n_input = acl_if.n_input
+        input_acls.extend(saved_acls[:n_input])
+        output_acls.extend(saved_acls[n_input:])
 
         # and the trailing shuffle ACL(s)
-        input_acls.extend([shuffle_acl_3])
-        output_acls.extend([shuffle_acl_3])
+        input_acls.extend([acl3])
+        output_acls.extend([acl3])
 
         # set the interface ACL list to the result
-        self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
-                                             n_input=len(input_acls),
-                                             acls=input_acls + output_acls)
+        acl_if.n_input = len(input_acls)
+        acl_if.acls = input_acls + output_acls
+        acl_if.add_vpp_config()
+
         # change the ACLs a few times
         for i in range(1, 10):
             shuffle(all_rules)
-            reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_1,
-                                              r=all_rules[::1+(i % 2)],
-                                              tag=b"shuffle 1. acl")
+            acl1.modify_vpp_config(all_rules[::1+(i % 2)])
+
             shuffle(all_rules)
-            reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
-                                              r=all_rules[::1+(i % 3)],
-                                              tag=b"shuffle 2. acl")
+            acl2.modify_vpp_config(all_rules[::1+(i % 3)])
+
             shuffle(all_rules)
-            reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
-                                              r=all_rules[::1+(i % 5)],
-                                              tag=b"shuffle 3. acl")
+            acl3.modify_vpp_config(all_rules[::1+(i % 5)])
 
         # restore to how it was before and clean up
-        self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
-                                             n_input=orig_applied_acls.n_input,
-                                             acls=orig_applied_acls.acls)
-        reply = self.vapi.acl_del(acl_index=shuffle_acl_1)
-        reply = self.vapi.acl_del(acl_index=shuffle_acl_2)
-        reply = self.vapi.acl_del(acl_index=shuffle_acl_3)
+        acl_if.n_input = saved_n_input
+        acl_if.acls = saved_acls
+        acl_if.add_vpp_config()
+
+        acl1.remove_vpp_config()
+        acl2.remove_vpp_config()
+        acl3.remove_vpp_config()
 
     def create_acls_for_a_stream(self, stream_dict,
                                  test_l2_action, is_reflect):
@@ -436,15 +423,14 @@ class TestACLpluginL2L3(VppTestCase):
         r_permit = stream_dict['permit_rules']
         r_permit_reflect = stream_dict['permit_and_reflect_rules']
         r_action = r_permit_reflect if is_reflect else r
-        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_action,
-                                          tag=b"act. acl")
-        action_acl_index = reply.acl_index
-        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_permit,
-                                          tag=b"perm. acl")
-        permit_acl_index = reply.acl_index
-        return {'L2': action_acl_index if test_l2_action else permit_acl_index,
-                'L3': permit_acl_index if test_l2_action else action_acl_index,
-                'permit': permit_acl_index, 'action': action_acl_index}
+        action_acl = VppAcl(self, rules=r_action, tag="act. acl")
+        action_acl.add_vpp_config()
+        permit_acl = VppAcl(self, rules=r_permit, tag="perm. acl")
+        permit_acl.add_vpp_config()
+
+        return {'L2': action_acl if test_l2_action else permit_acl,
+                'L3': permit_acl if test_l2_action else action_acl,
+                'permit': permit_acl, 'action': action_acl}
 
     def apply_acl_ip46_x_to_y(self, bridged_to_routed, test_l2_deny,
                               is_ip6, is_reflect, add_eh):
@@ -452,26 +438,30 @@ class TestACLpluginL2L3(VppTestCase):
         """
         self.reset_packet_infos()
         stream_dict = self.create_stream(
-                                         self.pg2, self.loop0,
-                                         bridged_to_routed,
-                                         self.pg_if_packet_sizes, is_ip6,
-                                         not is_reflect, False, add_eh)
+            self.pg2, self.loop0,
+            bridged_to_routed,
+            self.pg_if_packet_sizes, is_ip6,
+            not is_reflect, False, add_eh)
         stream = stream_dict['stream']
         acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny,
                                                 is_reflect)
         n_input_l3 = 0 if bridged_to_routed else 1
         n_input_l2 = 1 if bridged_to_routed else 0
-        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
-                                             n_input=n_input_l3,
-                                             acls=[acl_idx['L3']])
-        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
-                                             n_input=n_input_l2,
-                                             acls=[acl_idx['L2']])
-        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
-                                             n_input=n_input_l2,
-                                             acls=[acl_idx['L2']])
-        self.applied_acl_shuffle(self.pg0.sw_if_index)
-        self.applied_acl_shuffle(self.pg2.sw_if_index)
+
+        acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index,
+                                     n_input=n_input_l3, acls=[acl_idx['L3']])
+        acl_if_pg2.add_vpp_config()
+
+        acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index,
+                                     n_input=n_input_l2, acls=[acl_idx['L2']])
+        acl_if_pg0.add_vpp_config()
+
+        acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
+                                     n_input=n_input_l2, acls=[acl_idx['L2']])
+        acl_if_pg1.add_vpp_config()
+
+        self.applied_acl_shuffle(acl_if_pg0)
+        self.applied_acl_shuffle(acl_if_pg1)
         return {'L2': acl_idx['L2'], 'L3': acl_idx['L3']}
 
     def apply_acl_ip46_both_directions_reflect(self,
@@ -516,20 +506,23 @@ class TestACLpluginL2L3(VppTestCase):
         else:
             outbound_l3_acl = acl_idx_rev['L3']
 
-        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
-                                             n_input=1,
-                                             acls=[inbound_l3_acl,
-                                                   outbound_l3_acl])
-        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
-                                             n_input=1,
-                                             acls=[inbound_l2_acl,
-                                                   outbound_l2_acl])
-        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
-                                             n_input=1,
-                                             acls=[inbound_l2_acl,
-                                                   outbound_l2_acl])
-        self.applied_acl_shuffle(self.pg0.sw_if_index)
-        self.applied_acl_shuffle(self.pg2.sw_if_index)
+        acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index,
+                                     n_input=1,
+                                     acls=[inbound_l3_acl, outbound_l3_acl])
+        acl_if_pg2.add_vpp_config()
+
+        acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index,
+                                     n_input=1,
+                                     acls=[inbound_l2_acl, outbound_l2_acl])
+        acl_if_pg0.add_vpp_config()
+
+        acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
+                                     n_input=1,
+                                     acls=[inbound_l2_acl, outbound_l2_acl])
+        acl_if_pg1.add_vpp_config()
+
+        self.applied_acl_shuffle(acl_if_pg0)
+        self.applied_acl_shuffle(acl_if_pg2)
 
     def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
                                          is_reflect, add_eh):
@@ -594,7 +587,7 @@ class TestACLpluginL2L3(VppTestCase):
         pkts = self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6,
                                                        is_reflect, False,
                                                        add_eh)
-        self.verify_acl_packet_count(acls['L3'], pkts)
+        self.verify_acl_packet_count(acls['L3'].acl_index, pkts)
 
     def run_test_ip46_bridged_to_routed(self, test_l2_deny,
                                         is_ip6, is_reflect, add_eh):
@@ -604,7 +597,7 @@ class TestACLpluginL2L3(VppTestCase):
         pkts = self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6,
                                                        is_reflect, False,
                                                        add_eh)
-        self.verify_acl_packet_count(acls['L2'], pkts)
+        self.verify_acl_packet_count(acls['L2'].acl_index, pkts)
 
     def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action,
                                                  is_ip6, add_eh,