X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fplugins%2Facl%2Ftest%2Ftest_acl_plugin_l2l3.py;h=30b53728c6336f1fd5849700906f98bcbb166bdf;hb=2f8cd9145;hp=31b4058fc698c6fcdc4b578c3c0f611cb8a07f2c;hpb=a43c93f8554ad7418e31be3791b3fb71232f60ac;p=vpp.git diff --git a/src/plugins/acl/test/test_acl_plugin_l2l3.py b/src/plugins/acl/test/test_acl_plugin_l2l3.py index 31b4058fc69..30b53728c63 100644 --- a/src/plugins/acl/test/test_acl_plugin_l2l3.py +++ b/src/plugins/acl/test/test_acl_plugin_l2l3.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 """ACL IRB Test Case HLD: **config** @@ -23,10 +23,12 @@ """ +import copy import unittest from socket import inet_pton, AF_INET, AF_INET6 from random import choice, shuffle from pprint import pprint +from ipaddress import ip_network import scapy.compat from scapy.packet import Raw @@ -40,6 +42,8 @@ from framework import VppTestCase, VppTestRunner from vpp_l2 import L2_PORT_TYPE import time +from vpp_acl import AclRule, VppAcl, VppAclInterface + class TestACLpluginL2L3(VppTestCase): """TestACLpluginL2L3 Test Case""" @@ -113,7 +117,7 @@ class TestACLpluginL2L3(VppTestCase): def tearDown(self): """Run standard test teardown and log ``show l2patch``, ``show l2fib verbose``,``show bridge-domain detail``, - ``show ip arp``. + ``show ip neighbors``. """ super(TestACLpluginL2L3, self).tearDown() @@ -123,8 +127,7 @@ class TestACLpluginL2L3(VppTestCase): self.logger.info(self.vapi.cli("show l2fib verbose")) self.logger.info(self.vapi.cli("show bridge-domain %s detail" % self.bd_id)) - self.logger.info(self.vapi.cli("show ip arp")) - self.logger.info(self.vapi.cli("show ip6 neighbors")) + self.logger.info(self.vapi.cli("show ip neighbors")) cmd = "show acl-plugin sessions verbose 1" self.logger.info(self.vapi.cli(cmd)) self.logger.info(self.vapi.cli("show acl-plugin acl")) @@ -260,31 +263,26 @@ class TestACLpluginL2L3(VppTestCase): else: rule_l4_proto = p[IP].proto - new_rule = { - 'is_permit': is_permit, - 'is_ipv6': p.haslayer(IPv6), - 'src_ip_addr': inet_pton(rule_family, - p[rule_l3_layer].src), - 'src_ip_prefix_len': rule_prefix_len, - 'dst_ip_addr': inet_pton(rule_family, - p[rule_l3_layer].dst), - 'dst_ip_prefix_len': rule_prefix_len, - 'srcport_or_icmptype_first': rule_l4_sport, - 'srcport_or_icmptype_last': rule_l4_sport, - 'dstport_or_icmpcode_first': rule_l4_dport, - 'dstport_or_icmpcode_last': rule_l4_dport, - 'proto': rule_l4_proto, - } + new_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto, + src_prefix=ip_network( + (p[rule_l3_layer].src, rule_prefix_len)), + dst_prefix=ip_network( + (p[rule_l3_layer].dst, rule_prefix_len)), + sport_from=rule_l4_sport, + sport_to=rule_l4_sport, + dport_from=rule_l4_dport, + dport_to=rule_l4_dport) + rules.append(new_rule) - new_rule_permit = new_rule.copy() - new_rule_permit['is_permit'] = 1 + new_rule_permit = copy.copy(new_rule) + new_rule_permit.is_permit = 1 permit_rules.append(new_rule_permit) - new_rule_permit_and_reflect = new_rule.copy() + new_rule_permit_and_reflect = copy.copy(new_rule) if can_reflect_this_packet: - new_rule_permit_and_reflect['is_permit'] = 2 + new_rule_permit_and_reflect.is_permit = 2 else: - new_rule_permit_and_reflect['is_permit'] = is_permit + new_rule_permit_and_reflect.is_permit = is_permit permit_and_reflect_rules.append(new_rule_permit_and_reflect) self.logger.info("create_stream pkt#%d: %s" % (i, payload)) @@ -357,79 +355,67 @@ class TestACLpluginL2L3(VppTestCase): # UDP: - def applied_acl_shuffle(self, sw_if_index): - # first collect what ACLs are applied and what they look like - r = self.vapi.acl_interface_list_dump(sw_if_index=sw_if_index) - orig_applied_acls = r[0] - - # we will collect these just to save and generate additional rulesets - orig_acls = [] - for acl_num in orig_applied_acls.acls: - rr = self.vapi.acl_dump(acl_num) - orig_acls.append(rr[0]) + def applied_acl_shuffle(self, acl_if): + saved_n_input = acl_if.n_input + # TOTO: maybe copy each one?? + saved_acls = acl_if.acls # now create a list of all the rules in all ACLs all_rules = [] - for old_acl in orig_acls: - for rule in old_acl.r: - all_rules.append(dict(rule._asdict())) + for old_acl in saved_acls: + for rule in old_acl.rules: + all_rules.append(rule) # Add a few ACLs made from shuffled rules shuffle(all_rules) - reply = self.vapi.acl_add_replace(acl_index=4294967295, - r=all_rules[::2], - tag=b"shuffle 1. acl") - shuffle_acl_1 = reply.acl_index + acl1 = VppAcl(self, rules=all_rules[::2], tag="shuffle 1. acl") + acl1.add_vpp_config() + shuffle(all_rules) - reply = self.vapi.acl_add_replace(acl_index=4294967295, - r=all_rules[::3], - tag=b"shuffle 2. acl") - shuffle_acl_2 = reply.acl_index + acl2 = VppAcl(self, rules=all_rules[::3], tag="shuffle 2. acl") + acl2.add_vpp_config() + shuffle(all_rules) - reply = self.vapi.acl_add_replace(acl_index=4294967295, - r=all_rules[::2], - tag=b"shuffle 3. acl") - shuffle_acl_3 = reply.acl_index + acl3 = VppAcl(self, rules=all_rules[::2], tag="shuffle 3. acl") + acl3.add_vpp_config() # apply the shuffle ACLs in front - input_acls = [shuffle_acl_1, shuffle_acl_2] - output_acls = [shuffle_acl_1, shuffle_acl_2] + input_acls = [acl1, acl2] + output_acls = [acl1, acl2] # add the currently applied ACLs - n_input = orig_applied_acls.n_input - input_acls.extend(orig_applied_acls.acls[:n_input]) - output_acls.extend(orig_applied_acls.acls[n_input:]) + n_input = acl_if.n_input + input_acls.extend(saved_acls[:n_input]) + output_acls.extend(saved_acls[n_input:]) # and the trailing shuffle ACL(s) - input_acls.extend([shuffle_acl_3]) - output_acls.extend([shuffle_acl_3]) + input_acls.extend([acl3]) + output_acls.extend([acl3]) # set the interface ACL list to the result - self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index, - n_input=len(input_acls), - acls=input_acls + output_acls) + acl_if.n_input = len(input_acls) + acl_if.acls = input_acls + output_acls + acl_if.add_vpp_config() + # change the ACLs a few times for i in range(1, 10): shuffle(all_rules) - reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_1, - r=all_rules[::1+(i % 2)], - tag=b"shuffle 1. acl") + acl1.modify_vpp_config(all_rules[::1+(i % 2)]) + shuffle(all_rules) - reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2, - r=all_rules[::1+(i % 3)], - tag=b"shuffle 2. acl") + acl2.modify_vpp_config(all_rules[::1+(i % 3)]) + shuffle(all_rules) - reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2, - r=all_rules[::1+(i % 5)], - tag=b"shuffle 3. acl") + acl3.modify_vpp_config(all_rules[::1+(i % 5)]) # restore to how it was before and clean up - self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index, - n_input=orig_applied_acls.n_input, - acls=orig_applied_acls.acls) - reply = self.vapi.acl_del(acl_index=shuffle_acl_1) - reply = self.vapi.acl_del(acl_index=shuffle_acl_2) - reply = self.vapi.acl_del(acl_index=shuffle_acl_3) + acl_if.n_input = saved_n_input + acl_if.acls = saved_acls + acl_if.add_vpp_config() + + acl1.remove_vpp_config() + acl2.remove_vpp_config() + acl3.remove_vpp_config() def create_acls_for_a_stream(self, stream_dict, test_l2_action, is_reflect): @@ -437,15 +423,14 @@ class TestACLpluginL2L3(VppTestCase): r_permit = stream_dict['permit_rules'] r_permit_reflect = stream_dict['permit_and_reflect_rules'] r_action = r_permit_reflect if is_reflect else r - reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_action, - tag=b"act. acl") - action_acl_index = reply.acl_index - reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_permit, - tag=b"perm. acl") - permit_acl_index = reply.acl_index - return {'L2': action_acl_index if test_l2_action else permit_acl_index, - 'L3': permit_acl_index if test_l2_action else action_acl_index, - 'permit': permit_acl_index, 'action': action_acl_index} + action_acl = VppAcl(self, rules=r_action, tag="act. acl") + action_acl.add_vpp_config() + permit_acl = VppAcl(self, rules=r_permit, tag="perm. acl") + permit_acl.add_vpp_config() + + return {'L2': action_acl if test_l2_action else permit_acl, + 'L3': permit_acl if test_l2_action else action_acl, + 'permit': permit_acl, 'action': action_acl} def apply_acl_ip46_x_to_y(self, bridged_to_routed, test_l2_deny, is_ip6, is_reflect, add_eh): @@ -453,26 +438,30 @@ class TestACLpluginL2L3(VppTestCase): """ self.reset_packet_infos() stream_dict = self.create_stream( - self.pg2, self.loop0, - bridged_to_routed, - self.pg_if_packet_sizes, is_ip6, - not is_reflect, False, add_eh) + self.pg2, self.loop0, + bridged_to_routed, + self.pg_if_packet_sizes, is_ip6, + not is_reflect, False, add_eh) stream = stream_dict['stream'] acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny, is_reflect) n_input_l3 = 0 if bridged_to_routed else 1 n_input_l2 = 1 if bridged_to_routed else 0 - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index, - n_input=n_input_l3, - acls=[acl_idx['L3']]) - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, - n_input=n_input_l2, - acls=[acl_idx['L2']]) - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, - n_input=n_input_l2, - acls=[acl_idx['L2']]) - self.applied_acl_shuffle(self.pg0.sw_if_index) - self.applied_acl_shuffle(self.pg2.sw_if_index) + + acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index, + n_input=n_input_l3, acls=[acl_idx['L3']]) + acl_if_pg2.add_vpp_config() + + acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index, + n_input=n_input_l2, acls=[acl_idx['L2']]) + acl_if_pg0.add_vpp_config() + + acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index, + n_input=n_input_l2, acls=[acl_idx['L2']]) + acl_if_pg1.add_vpp_config() + + self.applied_acl_shuffle(acl_if_pg0) + self.applied_acl_shuffle(acl_if_pg1) return {'L2': acl_idx['L2'], 'L3': acl_idx['L3']} def apply_acl_ip46_both_directions_reflect(self, @@ -517,20 +506,23 @@ class TestACLpluginL2L3(VppTestCase): else: outbound_l3_acl = acl_idx_rev['L3'] - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index, - n_input=1, - acls=[inbound_l3_acl, - outbound_l3_acl]) - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, - n_input=1, - acls=[inbound_l2_acl, - outbound_l2_acl]) - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, - n_input=1, - acls=[inbound_l2_acl, - outbound_l2_acl]) - self.applied_acl_shuffle(self.pg0.sw_if_index) - self.applied_acl_shuffle(self.pg2.sw_if_index) + acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index, + n_input=1, + acls=[inbound_l3_acl, outbound_l3_acl]) + acl_if_pg2.add_vpp_config() + + acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index, + n_input=1, + acls=[inbound_l2_acl, outbound_l2_acl]) + acl_if_pg0.add_vpp_config() + + acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index, + n_input=1, + acls=[inbound_l2_acl, outbound_l2_acl]) + acl_if_pg1.add_vpp_config() + + self.applied_acl_shuffle(acl_if_pg0) + self.applied_acl_shuffle(acl_if_pg2) def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6, is_reflect, add_eh): @@ -595,7 +587,7 @@ class TestACLpluginL2L3(VppTestCase): pkts = self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6, is_reflect, False, add_eh) - self.verify_acl_packet_count(acls['L3'], pkts) + self.verify_acl_packet_count(acls['L3'].acl_index, pkts) def run_test_ip46_bridged_to_routed(self, test_l2_deny, is_ip6, is_reflect, add_eh): @@ -605,7 +597,7 @@ class TestACLpluginL2L3(VppTestCase): pkts = self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6, is_reflect, False, add_eh) - self.verify_acl_packet_count(acls['L2'], pkts) + self.verify_acl_packet_count(acls['L2'].acl_index, pkts) def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action, is_ip6, add_eh,