acl: revert acl: api cleanup
[vpp.git] / src / plugins / acl / test / test_acl_plugin_l2l3.py
index eaddd0f..3379871 100644 (file)
 
 """
 
-import copy
 import unittest
 from socket import inet_pton, AF_INET, AF_INET6
 from random import choice, shuffle
 from pprint import pprint
-from ipaddress import ip_network
 
 import scapy.compat
 from scapy.packet import Raw
@@ -42,8 +40,6 @@ from framework import VppTestCase, VppTestRunner
 from vpp_l2 import L2_PORT_TYPE
 import time
 
-from vpp_acl import AclRule, VppAcl, VppAclInterface
-
 
 class TestACLpluginL2L3(VppTestCase):
     """TestACLpluginL2L3 Test Case"""
@@ -263,26 +259,31 @@ class TestACLpluginL2L3(VppTestCase):
             else:
                 rule_l4_proto = p[IP].proto
 
-            new_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto,
-                               src_prefix=ip_network(
-                                   (p[rule_l3_layer].src, rule_prefix_len)),
-                               dst_prefix=ip_network(
-                                   (p[rule_l3_layer].dst, rule_prefix_len)))
-            new_rule.sport_from = rule_l4_sport
-            new_rule.sport_fo = rule_l4_sport
-            new_rule.dport_from = rule_l4_dport
-            new_rule.dport_to = rule_l4_dport
-
+            new_rule = {
+                        'is_permit': is_permit,
+                        'is_ipv6': p.haslayer(IPv6),
+                        'src_ip_addr': inet_pton(rule_family,
+                                                 p[rule_l3_layer].src),
+                        'src_ip_prefix_len': rule_prefix_len,
+                        'dst_ip_addr': inet_pton(rule_family,
+                                                 p[rule_l3_layer].dst),
+                        'dst_ip_prefix_len': rule_prefix_len,
+                        'srcport_or_icmptype_first': rule_l4_sport,
+                        'srcport_or_icmptype_last': rule_l4_sport,
+                        'dstport_or_icmpcode_first': rule_l4_dport,
+                        'dstport_or_icmpcode_last': rule_l4_dport,
+                        'proto': rule_l4_proto,
+                       }
             rules.append(new_rule)
-            new_rule_permit = copy.copy(new_rule)
-            new_rule_permit.is_permit = 1
+            new_rule_permit = new_rule.copy()
+            new_rule_permit['is_permit'] = 1
             permit_rules.append(new_rule_permit)
 
-            new_rule_permit_and_reflect = copy.copy(new_rule)
+            new_rule_permit_and_reflect = new_rule.copy()
             if can_reflect_this_packet:
-                new_rule_permit_and_reflect.is_permit = 2
+                new_rule_permit_and_reflect['is_permit'] = 2
             else:
-                new_rule_permit_and_reflect.is_permit = is_permit
+                new_rule_permit_and_reflect['is_permit'] = is_permit
 
             permit_and_reflect_rules.append(new_rule_permit_and_reflect)
             self.logger.info("create_stream pkt#%d: %s" % (i, payload))
@@ -355,70 +356,79 @@ class TestACLpluginL2L3(VppTestCase):
 
             # UDP:
 
-    def applied_acl_shuffle(self, acl_if):
-        saved_n_input = acl_if.n_input
-        # TOTO: maybe copy each one??
-        saved_acls = acl_if.acls
+    def applied_acl_shuffle(self, sw_if_index):
+        # first collect what ACLs are applied and what they look like
+        r = self.vapi.acl_interface_list_dump(sw_if_index=sw_if_index)
+        orig_applied_acls = r[0]
+
+        # we will collect these just to save and generate additional rulesets
+        orig_acls = []
+        for acl_num in orig_applied_acls.acls:
+            rr = self.vapi.acl_dump(acl_num)
+            orig_acls.append(rr[0])
 
         # now create a list of all the rules in all ACLs
         all_rules = []
-        for old_acl in saved_acls:
-            for rule in old_acl.rules:
-                all_rules.append(rule)
+        for old_acl in orig_acls:
+            for rule in old_acl.r:
+                all_rules.append(dict(rule._asdict()))
 
         # Add a few ACLs made from shuffled rules
         shuffle(all_rules)
-        acl1 = VppAcl(self, rules=all_rules[::2], tag="shuffle 1. acl")
-        acl1.add_vpp_config()
-
+        reply = self.vapi.acl_add_replace(acl_index=4294967295,
+                                          r=all_rules[::2],
+                                          tag=b"shuffle 1. acl")
+        shuffle_acl_1 = reply.acl_index
         shuffle(all_rules)
-        acl2 = VppAcl(self, rules=all_rules[::3], tag="shuffle 2. acl")
-        acl2.add_vpp_config()
-
+        reply = self.vapi.acl_add_replace(acl_index=4294967295,
+                                          r=all_rules[::3],
+                                          tag=b"shuffle 2. acl")
+        shuffle_acl_2 = reply.acl_index
         shuffle(all_rules)
-        acl3 = VppAcl(self, rules=all_rules[::2], tag="shuffle 3. acl")
-        acl3.add_vpp_config()
+        reply = self.vapi.acl_add_replace(acl_index=4294967295,
+                                          r=all_rules[::2],
+                                          tag=b"shuffle 3. acl")
+        shuffle_acl_3 = reply.acl_index
 
         # apply the shuffle ACLs in front
-        input_acls = [acl1, acl2]
-        output_acls = [acl1, acl2]
+        input_acls = [shuffle_acl_1, shuffle_acl_2]
+        output_acls = [shuffle_acl_1, shuffle_acl_2]
 
         # add the currently applied ACLs
-        n_input = acl_if.n_input
-        input_acls.extend(saved_acls[:n_input])
-        output_acls.extend(saved_acls[n_input:])
+        n_input = orig_applied_acls.n_input
+        input_acls.extend(orig_applied_acls.acls[:n_input])
+        output_acls.extend(orig_applied_acls.acls[n_input:])
 
         # and the trailing shuffle ACL(s)
-        input_acls.extend([acl3])
-        output_acls.extend([acl3])
+        input_acls.extend([shuffle_acl_3])
+        output_acls.extend([shuffle_acl_3])
 
         # set the interface ACL list to the result
-        acl_if.n_input = len(input_acls)
-        acl_if.acls = input_acls + output_acls
-        acl_if.add_vpp_config()
-
+        self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
+                                             n_input=len(input_acls),
+                                             acls=input_acls + output_acls)
         # change the ACLs a few times
         for i in range(1, 10):
             shuffle(all_rules)
-            acl1.rules = all_rules[::1+(i % 2)]
-            acl1.add_vpp_config()
-
+            reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_1,
+                                              r=all_rules[::1+(i % 2)],
+                                              tag=b"shuffle 1. acl")
             shuffle(all_rules)
-            acl2.rules = all_rules[::1+(i % 3)]
-            acl2.add_vpp_config()
-
+            reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
+                                              r=all_rules[::1+(i % 3)],
+                                              tag=b"shuffle 2. acl")
             shuffle(all_rules)
-            acl3.rules = all_rules[::1+(i % 5)]
-            acl3.add_vpp_config()
+            reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
+                                              r=all_rules[::1+(i % 5)],
+                                              tag=b"shuffle 3. acl")
 
         # restore to how it was before and clean up
-        acl_if.n_input = saved_n_input
-        acl_if.acls = saved_acls
-        acl_if.add_vpp_config()
-
-        acl1.remove_vpp_config()
-        acl2.remove_vpp_config()
-        acl3.remove_vpp_config()
+        self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
+                                             n_input=orig_applied_acls.n_input,
+                                             acls=orig_applied_acls.acls)
+        reply = self.vapi.acl_del(acl_index=shuffle_acl_1)
+        reply = self.vapi.acl_del(acl_index=shuffle_acl_2)
+        reply = self.vapi.acl_del(acl_index=shuffle_acl_3)
 
     def create_acls_for_a_stream(self, stream_dict,
                                  test_l2_action, is_reflect):
@@ -426,14 +436,15 @@ class TestACLpluginL2L3(VppTestCase):
         r_permit = stream_dict['permit_rules']
         r_permit_reflect = stream_dict['permit_and_reflect_rules']
         r_action = r_permit_reflect if is_reflect else r
-        action_acl = VppAcl(self, rules=r_action, tag="act. acl")
-        action_acl.add_vpp_config()
-        permit_acl = VppAcl(self, rules=r_permit, tag="perm. acl")
-        permit_acl.add_vpp_config()
-
-        return {'L2': action_acl if test_l2_action else permit_acl,
-                'L3': permit_acl if test_l2_action else action_acl,
-                'permit': permit_acl, 'action': action_acl}
+        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_action,
+                                          tag=b"act. acl")
+        action_acl_index = reply.acl_index
+        reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_permit,
+                                          tag=b"perm. acl")
+        permit_acl_index = reply.acl_index
+        return {'L2': action_acl_index if test_l2_action else permit_acl_index,
+                'L3': permit_acl_index if test_l2_action else action_acl_index,
+                'permit': permit_acl_index, 'action': action_acl_index}
 
     def apply_acl_ip46_x_to_y(self, bridged_to_routed, test_l2_deny,
                               is_ip6, is_reflect, add_eh):
@@ -441,30 +452,26 @@ class TestACLpluginL2L3(VppTestCase):
         """
         self.reset_packet_infos()
         stream_dict = self.create_stream(
-            self.pg2, self.loop0,
-            bridged_to_routed,
-            self.pg_if_packet_sizes, is_ip6,
-            not is_reflect, False, add_eh)
+                                         self.pg2, self.loop0,
+                                         bridged_to_routed,
+                                         self.pg_if_packet_sizes, is_ip6,
+                                         not is_reflect, False, add_eh)
         stream = stream_dict['stream']
         acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny,
                                                 is_reflect)
         n_input_l3 = 0 if bridged_to_routed else 1
         n_input_l2 = 1 if bridged_to_routed else 0
-
-        acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index,
-                                     n_input=n_input_l3, acls=[acl_idx['L3']])
-        acl_if_pg2.add_vpp_config()
-
-        acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index,
-                                     n_input=n_input_l2, acls=[acl_idx['L2']])
-        acl_if_pg0.add_vpp_config()
-
-        acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
-                                     n_input=n_input_l2, acls=[acl_idx['L2']])
-        acl_if_pg1.add_vpp_config()
-
-        self.applied_acl_shuffle(acl_if_pg0)
-        self.applied_acl_shuffle(acl_if_pg1)
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
+                                             n_input=n_input_l3,
+                                             acls=[acl_idx['L3']])
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+                                             n_input=n_input_l2,
+                                             acls=[acl_idx['L2']])
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
+                                             n_input=n_input_l2,
+                                             acls=[acl_idx['L2']])
+        self.applied_acl_shuffle(self.pg0.sw_if_index)
+        self.applied_acl_shuffle(self.pg2.sw_if_index)
         return {'L2': acl_idx['L2'], 'L3': acl_idx['L3']}
 
     def apply_acl_ip46_both_directions_reflect(self,
@@ -509,23 +516,20 @@ class TestACLpluginL2L3(VppTestCase):
         else:
             outbound_l3_acl = acl_idx_rev['L3']
 
-        acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index,
-                                     n_input=1,
-                                     acls=[inbound_l3_acl, outbound_l3_acl])
-        acl_if_pg2.add_vpp_config()
-
-        acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index,
-                                     n_input=1,
-                                     acls=[inbound_l2_acl, outbound_l2_acl])
-        acl_if_pg0.add_vpp_config()
-
-        acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
-                                     n_input=1,
-                                     acls=[inbound_l2_acl, outbound_l2_acl])
-        acl_if_pg1.add_vpp_config()
-
-        self.applied_acl_shuffle(acl_if_pg0)
-        self.applied_acl_shuffle(acl_if_pg2)
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
+                                             n_input=1,
+                                             acls=[inbound_l3_acl,
+                                                   outbound_l3_acl])
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+                                             n_input=1,
+                                             acls=[inbound_l2_acl,
+                                                   outbound_l2_acl])
+        self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
+                                             n_input=1,
+                                             acls=[inbound_l2_acl,
+                                                   outbound_l2_acl])
+        self.applied_acl_shuffle(self.pg0.sw_if_index)
+        self.applied_acl_shuffle(self.pg2.sw_if_index)
 
     def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
                                          is_reflect, add_eh):
@@ -590,7 +594,7 @@ class TestACLpluginL2L3(VppTestCase):
         pkts = self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6,
                                                        is_reflect, False,
                                                        add_eh)
-        self.verify_acl_packet_count(acls['L3'].acl_index, pkts)
+        self.verify_acl_packet_count(acls['L3'], pkts)
 
     def run_test_ip46_bridged_to_routed(self, test_l2_deny,
                                         is_ip6, is_reflect, add_eh):
@@ -600,7 +604,7 @@ class TestACLpluginL2L3(VppTestCase):
         pkts = self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6,
                                                        is_reflect, False,
                                                        add_eh)
-        self.verify_acl_packet_count(acls['L2'].acl_index, pkts)
+        self.verify_acl_packet_count(acls['L2'], pkts)
 
     def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action,
                                                  is_ip6, add_eh,