import unittest
from socket import inet_pton, AF_INET, AF_INET6
-from random import choice
+from random import choice, shuffle
from pprint import pprint
from scapy.packet import Raw
# create 3 pg interfaces, 1 loopback interface
cls.create_pg_interfaces(range(3))
- cls.create_loopback_interfaces(range(1))
+ cls.create_loopback_interfaces(1)
cls.interfaces = list(cls.pg_interfaces)
cls.interfaces.extend(cls.lo_interfaces)
self.logger.info(self.vapi.cli("show ip arp"))
self.logger.info(self.vapi.cli("show ip6 neighbors"))
self.logger.info(self.vapi.cli("show acl-plugin sessions"))
-
- def api_acl_add_replace(self, acl_index, r, count, tag="",
- expected_retval=0):
- """Add/replace an ACL
-
- :param int acl_index: ACL index to replace, 4294967295 to create new.
- :param acl_rule r: ACL rules array.
- :param str tag: symbolic tag (description) for this ACL.
- :param int count: number of rules.
- """
- return self.vapi.api(self.vapi.papi.acl_add_replace,
- {'acl_index': acl_index,
- 'r': r,
- 'count': count,
- 'tag': tag
- }, expected_retval=expected_retval)
-
- def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
- expected_retval=0):
- return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
- {'sw_if_index': sw_if_index,
- 'count': count,
- 'n_input': n_input,
- 'acls': acls
- }, expected_retval=expected_retval)
-
- def api_acl_dump(self, acl_index, expected_retval=0):
- return self.vapi.api(self.vapi.papi.acl_dump,
- {'acl_index': acl_index},
- expected_retval=expected_retval)
+ self.logger.info(self.vapi.cli("show acl-plugin acl"))
+ self.logger.info(self.vapi.cli("show acl-plugin interface"))
+ self.logger.info(self.vapi.cli("show acl-plugin tables"))
def create_stream(self, src_ip_if, dst_ip_if, reverse, packet_sizes,
is_ip6, expect_blocked, expect_established,
# UDP:
+ def applied_acl_shuffle(self, sw_if_index):
+ # first collect what ACLs are applied and what they look like
+ r = self.vapi.acl_interface_list_dump(sw_if_index=sw_if_index)
+ orig_applied_acls = r[0]
+
+ # we will collect these just to save and generate additional rulesets
+ orig_acls = []
+ for acl_num in orig_applied_acls.acls:
+ rr = self.vapi.acl_dump(acl_num)
+ orig_acls.append(rr[0])
+
+ # now create a list of all the rules in all ACLs
+ all_rules = []
+ for old_acl in orig_acls:
+ for rule in old_acl.r:
+ all_rules.append(dict(rule._asdict()))
+
+ # Add a few ACLs made from shuffled rules
+ shuffle(all_rules)
+ reply = self.vapi.acl_add_replace(acl_index=4294967295,
+ r=all_rules[::2],
+ tag="shuffle 1. acl")
+ shuffle_acl_1 = reply.acl_index
+ shuffle(all_rules)
+ reply = self.vapi.acl_add_replace(acl_index=4294967295,
+ r=all_rules[::3],
+ tag="shuffle 2. acl")
+ shuffle_acl_2 = reply.acl_index
+ shuffle(all_rules)
+ reply = self.vapi.acl_add_replace(acl_index=4294967295,
+ r=all_rules[::2],
+ tag="shuffle 3. acl")
+ shuffle_acl_3 = reply.acl_index
+
+ # apply the shuffle ACLs in front
+ input_acls = [shuffle_acl_1, shuffle_acl_2]
+ output_acls = [shuffle_acl_1, shuffle_acl_2]
+
+ # add the currently applied ACLs
+ n_input = orig_applied_acls.n_input
+ input_acls.extend(orig_applied_acls.acls[:n_input])
+ output_acls.extend(orig_applied_acls.acls[n_input:])
+
+ # and the trailing shuffle ACL(s)
+ input_acls.extend([shuffle_acl_3])
+ output_acls.extend([shuffle_acl_3])
+
+ # set the interface ACL list to the result
+ self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
+ n_input=len(input_acls),
+ acls=input_acls + output_acls)
+ # change the ACLs a few times
+ for i in range(1, 10):
+ shuffle(all_rules)
+ reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_1,
+ r=all_rules[::1+(i % 2)],
+ tag="shuffle 1. acl")
+ shuffle(all_rules)
+ reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
+ r=all_rules[::1+(i % 3)],
+ tag="shuffle 2. acl")
+ shuffle(all_rules)
+ reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
+ r=all_rules[::1+(i % 5)],
+ tag="shuffle 3. acl")
+
+ # restore to how it was before and clean up
+ self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
+ n_input=orig_applied_acls.n_input,
+ acls=orig_applied_acls.acls)
+ reply = self.vapi.acl_del(acl_index=shuffle_acl_1)
+ reply = self.vapi.acl_del(acl_index=shuffle_acl_2)
+ reply = self.vapi.acl_del(acl_index=shuffle_acl_3)
+
def create_acls_for_a_stream(self, stream_dict,
test_l2_action, is_reflect):
r = stream_dict['rules']
r_permit = stream_dict['permit_rules']
r_permit_reflect = stream_dict['permit_and_reflect_rules']
r_action = r_permit_reflect if is_reflect else r
- reply = self.api_acl_add_replace(acl_index=4294967295, r=r_action,
- count=len(r_action), tag="action acl")
+ reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_action,
+ tag="act. acl")
action_acl_index = reply.acl_index
- reply = self.api_acl_add_replace(acl_index=4294967295, r=r_permit,
- count=len(r_permit), tag="permit acl")
+ reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_permit,
+ tag="perm. acl")
permit_acl_index = reply.acl_index
return {'L2': action_acl_index if test_l2_action else permit_acl_index,
'L3': permit_acl_index if test_l2_action else action_acl_index,
is_reflect)
n_input_l3 = 0 if bridged_to_routed else 1
n_input_l2 = 1 if bridged_to_routed else 0
- self.api_acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
- count=1,
- n_input=n_input_l3,
- acls=[acl_idx['L3']])
- self.api_acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- count=1,
- n_input=n_input_l2,
- acls=[acl_idx['L2']])
- self.api_acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
- count=1,
- n_input=n_input_l2,
- acls=[acl_idx['L2']])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
+ n_input=n_input_l3,
+ acls=[acl_idx['L3']])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+ n_input=n_input_l2,
+ acls=[acl_idx['L2']])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
+ n_input=n_input_l2,
+ acls=[acl_idx['L2']])
+ self.applied_acl_shuffle(self.pg0.sw_if_index)
+ self.applied_acl_shuffle(self.pg2.sw_if_index)
def apply_acl_ip46_both_directions_reflect(self,
primary_is_bridged_to_routed,
else:
outbound_l3_acl = acl_idx_rev['L3']
- self.api_acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
- count=2,
- n_input=1,
- acls=[inbound_l3_acl,
- outbound_l3_acl])
- self.api_acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- count=2,
- n_input=1,
- acls=[inbound_l2_acl,
- outbound_l2_acl])
- self.api_acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
- count=2,
- n_input=1,
- acls=[inbound_l2_acl,
- outbound_l2_acl])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
+ n_input=1,
+ acls=[inbound_l3_acl,
+ outbound_l3_acl])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+ n_input=1,
+ acls=[inbound_l2_acl,
+ outbound_l2_acl])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
+ n_input=1,
+ acls=[inbound_l2_acl,
+ outbound_l2_acl])
+ self.applied_acl_shuffle(self.pg0.sw_if_index)
+ self.applied_acl_shuffle(self.pg2.sw_if_index)
def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
is_reflect, add_eh):
self.run_test_ip46_bridged_to_routed_and_back(False, False,
self.WITH_EH)
- # Old datapath group
- def test_8900_ip6_irb_1(self):
- """ ACL plugin set old L2 datapath"""
- if not self.vpp_dead:
- cmd = "set acl-plugin l2-datapath old"
- self.logger.info(self.vapi.ppcli(cmd))
-
- def test_8901_ip6_irb_1(self):
- """ ACL IPv6 routed -> bridged, L2 ACL deny"""
- self.run_test_ip46_routed_to_bridged(True, True, False,
- self.WITHOUT_EH)
-
- def test_8902_ip6_irb_1(self):
- """ ACL IPv6 routed -> bridged, L3 ACL deny"""
- self.run_test_ip46_routed_to_bridged(False, True, False,
- self.WITHOUT_EH)
-
- def test_8903_ip4_irb_1(self):
- """ ACL IPv4 routed -> bridged, L2 ACL deny"""
- self.run_test_ip46_routed_to_bridged(True, False, False,
- self.WITHOUT_EH)
-
- def test_8904_ip4_irb_1(self):
- """ ACL IPv4 routed -> bridged, L3 ACL deny"""
- self.run_test_ip46_routed_to_bridged(False, False, False,
- self.WITHOUT_EH)
-
- def test_8905_ip6_irb_1(self):
- """ ACL IPv6 bridged -> routed, L2 ACL deny """
- self.run_test_ip46_bridged_to_routed(True, True, False,
- self.WITHOUT_EH)
-
- def test_8906_ip6_irb_1(self):
- """ ACL IPv6 bridged -> routed, L3 ACL deny """
- self.run_test_ip46_bridged_to_routed(False, True, False,
- self.WITHOUT_EH)
-
- def test_8907_ip6_irb_1(self):
- """ ACL IPv4 bridged -> routed, L2 ACL deny """
- self.run_test_ip46_bridged_to_routed(True, False, False,
- self.WITHOUT_EH)
-
- def test_8908_ip6_irb_1(self):
- """ ACL IPv4 bridged -> routed, L3 ACL deny """
- self.run_test_ip46_bridged_to_routed(False, False, False,
- self.WITHOUT_EH)
-
-
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)