X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_acl_plugin_l2l3.py;h=66d053ff5844bd8cf90570be893b7130b6d395b4;hb=b474380f;hp=04f91cfca5a25119b705389b1fffbbae4472b24e;hpb=51d2651e4aceff4016228c669480437eac84f221;p=vpp.git diff --git a/test/test_acl_plugin_l2l3.py b/test/test_acl_plugin_l2l3.py index 04f91cfca5a..66d053ff584 100644 --- a/test/test_acl_plugin_l2l3.py +++ b/test/test_acl_plugin_l2l3.py @@ -25,7 +25,7 @@ import unittest from socket import inet_pton, AF_INET, AF_INET6 -from random import choice +from random import choice, shuffle from pprint import pprint from scapy.packet import Raw @@ -36,6 +36,7 @@ from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting from scapy.layers.inet6 import IPv6ExtHdrFragment from framework import VppTestCase, VppTestRunner +from vpp_papi_provider import L2_PORT_TYPE import time @@ -60,7 +61,7 @@ class TestIpIrb(VppTestCase): # create 3 pg interfaces, 1 loopback interface cls.create_pg_interfaces(range(3)) - cls.create_loopback_interfaces(range(1)) + cls.create_loopback_interfaces(1) cls.interfaces = list(cls.pg_interfaces) cls.interfaces.extend(cls.lo_interfaces) @@ -70,7 +71,8 @@ class TestIpIrb(VppTestCase): # Create BD with MAC learning enabled and put interfaces to this BD cls.vapi.sw_interface_set_l2_bridge( - cls.loop0.sw_if_index, bd_id=cls.bd_id, bvi=1) + cls.loop0.sw_if_index, bd_id=cls.bd_id, + port_type=L2_PORT_TYPE.BVI) cls.vapi.sw_interface_set_l2_bridge( cls.pg0.sw_if_index, bd_id=cls.bd_id) cls.vapi.sw_interface_set_l2_bridge( @@ -331,6 +333,80 @@ class TestIpIrb(VppTestCase): # UDP: + def applied_acl_shuffle(self, sw_if_index): + # first collect what ACLs are applied and what they look like + r = self.vapi.acl_interface_list_dump(sw_if_index=sw_if_index) + orig_applied_acls = r[0] + + # we will collect these just to save and generate additional rulesets + orig_acls = [] + for acl_num in orig_applied_acls.acls: + rr = self.vapi.acl_dump(acl_num) + orig_acls.append(rr[0]) + + # now create a list of all the rules in all ACLs + all_rules = [] + for old_acl in orig_acls: + for rule in old_acl.r: + all_rules.append(dict(rule._asdict())) + + # Add a few ACLs made from shuffled rules + shuffle(all_rules) + reply = self.vapi.acl_add_replace(acl_index=4294967295, + r=all_rules[::2], + tag="shuffle 1. acl") + shuffle_acl_1 = reply.acl_index + shuffle(all_rules) + reply = self.vapi.acl_add_replace(acl_index=4294967295, + r=all_rules[::3], + tag="shuffle 2. acl") + shuffle_acl_2 = reply.acl_index + shuffle(all_rules) + reply = self.vapi.acl_add_replace(acl_index=4294967295, + r=all_rules[::2], + tag="shuffle 3. acl") + shuffle_acl_3 = reply.acl_index + + # apply the shuffle ACLs in front + input_acls = [shuffle_acl_1, shuffle_acl_2] + output_acls = [shuffle_acl_1, shuffle_acl_2] + + # add the currently applied ACLs + n_input = orig_applied_acls.n_input + input_acls.extend(orig_applied_acls.acls[:n_input]) + output_acls.extend(orig_applied_acls.acls[n_input:]) + + # and the trailing shuffle ACL(s) + input_acls.extend([shuffle_acl_3]) + output_acls.extend([shuffle_acl_3]) + + # set the interface ACL list to the result + self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index, + n_input=len(input_acls), + acls=input_acls + output_acls) + # change the ACLs a few times + for i in range(1, 10): + shuffle(all_rules) + reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_1, + r=all_rules[::1+(i % 2)], + tag="shuffle 1. acl") + shuffle(all_rules) + reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2, + r=all_rules[::1+(i % 3)], + tag="shuffle 2. acl") + shuffle(all_rules) + reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2, + r=all_rules[::1+(i % 5)], + tag="shuffle 3. acl") + + # restore to how it was before and clean up + self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index, + n_input=orig_applied_acls.n_input, + acls=orig_applied_acls.acls) + reply = self.vapi.acl_del(acl_index=shuffle_acl_1) + reply = self.vapi.acl_del(acl_index=shuffle_acl_2) + reply = self.vapi.acl_del(acl_index=shuffle_acl_3) + def create_acls_for_a_stream(self, stream_dict, test_l2_action, is_reflect): r = stream_dict['rules'] @@ -371,6 +447,8 @@ class TestIpIrb(VppTestCase): self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, n_input=n_input_l2, acls=[acl_idx['L2']]) + self.applied_acl_shuffle(self.pg0.sw_if_index) + self.applied_acl_shuffle(self.pg2.sw_if_index) def apply_acl_ip46_both_directions_reflect(self, primary_is_bridged_to_routed, @@ -424,6 +502,8 @@ class TestIpIrb(VppTestCase): n_input=1, acls=[inbound_l2_acl, outbound_l2_acl]) + self.applied_acl_shuffle(self.pg0.sw_if_index) + self.applied_acl_shuffle(self.pg2.sw_if_index) def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6, is_reflect, add_eh):