acl-plugin: make test: shuffle applied ACLs as part of the tests
[vpp.git] / test / test_acl_plugin_l2l3.py
index 04f91cf..97675d4 100644 (file)
@@ -25,7 +25,7 @@
 
 import unittest
 from socket import inet_pton, AF_INET, AF_INET6
-from random import choice
+from random import choice, shuffle
 from pprint import pprint
 
 from scapy.packet import Raw
@@ -331,6 +331,80 @@ class TestIpIrb(VppTestCase):
 
             # UDP:
 
+    def applied_acl_shuffle(self, sw_if_index):
+        # first collect what ACLs are applied and what they look like
+        r = self.vapi.acl_interface_list_dump(sw_if_index=sw_if_index)
+        orig_applied_acls = r[0]
+
+        # we will collect these just to save and generate additional rulesets
+        orig_acls = []
+        for acl_num in orig_applied_acls.acls:
+            rr = self.vapi.acl_dump(acl_num)
+            orig_acls.append(rr[0])
+
+        # now create a list of all the rules in all ACLs
+        all_rules = []
+        for old_acl in orig_acls:
+            for rule in old_acl.r:
+                all_rules.append(dict(vars(rule)))
+
+        # Add a few ACLs made from shuffled rules
+        shuffle(all_rules)
+        reply = self.vapi.acl_add_replace(acl_index=4294967295,
+                                          r=all_rules[::2],
+                                          tag="shuffle 1. acl")
+        shuffle_acl_1 = reply.acl_index
+        shuffle(all_rules)
+        reply = self.vapi.acl_add_replace(acl_index=4294967295,
+                                          r=all_rules[::3],
+                                          tag="shuffle 2. acl")
+        shuffle_acl_2 = reply.acl_index
+        shuffle(all_rules)
+        reply = self.vapi.acl_add_replace(acl_index=4294967295,
+                                          r=all_rules[::2],
+                                          tag="shuffle 3. acl")
+        shuffle_acl_3 = reply.acl_index
+
+        # apply the shuffle ACLs in front
+        input_acls = [shuffle_acl_1, shuffle_acl_2]
+        output_acls = [shuffle_acl_1, shuffle_acl_2]
+
+        # add the currently applied ACLs
+        n_input = orig_applied_acls.n_input
+        input_acls.extend(orig_applied_acls.acls[:n_input])
+        output_acls.extend(orig_applied_acls.acls[n_input:])
+
+        # and the trailing shuffle ACL(s)
+        input_acls.extend([shuffle_acl_3])
+        output_acls.extend([shuffle_acl_3])
+
+        # set the interface ACL list to the result
+        self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
+                                             n_input=len(input_acls),
+                                             acls=input_acls + output_acls)
+        # change the ACLs a few times
+        for i in range(1, 10):
+            shuffle(all_rules)
+            reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_1,
+                                              r=all_rules[::1+(i % 2)],
+                                              tag="shuffle 1. acl")
+            shuffle(all_rules)
+            reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
+                                              r=all_rules[::1+(i % 3)],
+                                              tag="shuffle 2. acl")
+            shuffle(all_rules)
+            reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
+                                              r=all_rules[::1+(i % 5)],
+                                              tag="shuffle 3. acl")
+
+        # restore to how it was before and clean up
+        self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
+                                             n_input=orig_applied_acls.n_input,
+                                             acls=orig_applied_acls.acls)
+        reply = self.vapi.acl_del(acl_index=shuffle_acl_1)
+        reply = self.vapi.acl_del(acl_index=shuffle_acl_2)
+        reply = self.vapi.acl_del(acl_index=shuffle_acl_3)
+
     def create_acls_for_a_stream(self, stream_dict,
                                  test_l2_action, is_reflect):
         r = stream_dict['rules']
@@ -371,6 +445,8 @@ class TestIpIrb(VppTestCase):
         self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
                                              n_input=n_input_l2,
                                              acls=[acl_idx['L2']])
+        self.applied_acl_shuffle(self.pg0.sw_if_index)
+        self.applied_acl_shuffle(self.pg2.sw_if_index)
 
     def apply_acl_ip46_both_directions_reflect(self,
                                                primary_is_bridged_to_routed,
@@ -424,6 +500,8 @@ class TestIpIrb(VppTestCase):
                                              n_input=1,
                                              acls=[inbound_l2_acl,
                                                    outbound_l2_acl])
+        self.applied_acl_shuffle(self.pg0.sw_if_index)
+        self.applied_acl_shuffle(self.pg2.sw_if_index)
 
     def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
                                          is_reflect, add_eh):