X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_acl_plugin_l2l3.py;h=343e611751baad43a2a85cce93657dd0b65266ff;hb=d9b0c6fbf7aa5bd9af84264105b39c82028a4a29;hp=48faafb7398f2366c41ffd08cf0795c09abc968e;hpb=f90348bcb4afd0af2611cefc43b17ef3042b511c;p=vpp.git diff --git a/test/test_acl_plugin_l2l3.py b/test/test_acl_plugin_l2l3.py index 48faafb7398..343e611751b 100644 --- a/test/test_acl_plugin_l2l3.py +++ b/test/test_acl_plugin_l2l3.py @@ -76,12 +76,16 @@ class TestACLpluginL2L3(VppTestCase): # Create BD with MAC learning enabled and put interfaces to this BD cls.vapi.sw_interface_set_l2_bridge( - rx_sw_if_index=cls.loop0.sw_if_index, bd_id=cls.bd_id, - port_type=L2_PORT_TYPE.BVI) - cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg0.sw_if_index, - bd_id=cls.bd_id) - cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg1.sw_if_index, - bd_id=cls.bd_id) + rx_sw_if_index=cls.loop0.sw_if_index, + bd_id=cls.bd_id, + port_type=L2_PORT_TYPE.BVI, + ) + cls.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=cls.pg0.sw_if_index, bd_id=cls.bd_id + ) + cls.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=cls.pg1.sw_if_index, bd_id=cls.bd_id + ) # Configure IPv4 addresses on loopback interface and routed interface cls.loop0.config_ip4() @@ -125,8 +129,7 @@ class TestACLpluginL2L3(VppTestCase): self.logger.info(self.vapi.cli("show l2patch")) self.logger.info(self.vapi.cli("show classify tables")) self.logger.info(self.vapi.cli("show l2fib verbose")) - self.logger.info(self.vapi.cli("show bridge-domain %s detail" % - self.bd_id)) + self.logger.info(self.vapi.cli("show bridge-domain %s detail" % self.bd_id)) self.logger.info(self.vapi.cli("show ip neighbors")) cmd = "show acl-plugin sessions verbose 1" self.logger.info(self.vapi.cli(cmd)) @@ -134,23 +137,33 @@ class TestACLpluginL2L3(VppTestCase): self.logger.info(self.vapi.cli("show acl-plugin interface")) self.logger.info(self.vapi.cli("show acl-plugin tables")) - def create_stream(self, src_ip_if, dst_ip_if, reverse, packet_sizes, - is_ip6, expect_blocked, expect_established, - add_extension_header, icmp_stateful=False): + def create_stream( + self, + src_ip_if, + dst_ip_if, + reverse, + packet_sizes, + is_ip6, + expect_blocked, + expect_established, + add_extension_header, + icmp_stateful=False, + ): pkts = [] rules = [] permit_rules = [] permit_and_reflect_rules = [] total_packet_count = 8 for i in range(0, total_packet_count): - modulo = (i//2) % 2 + modulo = (i // 2) % 2 icmp_type_delta = i % 2 icmp_code = i - is_udp_packet = (modulo == 0) + is_udp_packet = modulo == 0 if is_udp_packet and icmp_stateful: continue - is_reflectable_icmp = (icmp_stateful and icmp_type_delta == 0 and - not is_udp_packet) + is_reflectable_icmp = ( + icmp_stateful and icmp_type_delta == 0 and not is_udp_packet + ) is_reflected_icmp = is_reflectable_icmp and expect_established can_reflect_this_packet = is_udp_packet or is_reflectable_icmp is_permit = i % 2 @@ -161,9 +174,9 @@ class TestACLpluginL2L3(VppTestCase): payload = self.info_to_payload(info) else: to_be_blocked = False - if (expect_blocked and not expect_established): + if expect_blocked and not expect_established: to_be_blocked = True - if (not can_reflect_this_packet): + if not can_reflect_this_packet: to_be_blocked = True if to_be_blocked: payload = "to be blocked" @@ -171,7 +184,7 @@ class TestACLpluginL2L3(VppTestCase): info = self.create_packet_info(src_ip_if, dst_ip_if) payload = self.info_to_payload(info) if reverse: - dst_mac = 'de:ad:00:00:00:00' + dst_mac = "de:ad:00:00:00:00" src_mac = remote_dst_host._mac dst_ip6 = src_ip_if.remote_ip6 src_ip6 = remote_dst_host.ip6 @@ -201,42 +214,53 @@ class TestACLpluginL2L3(VppTestCase): ulp_l4 = UDP(sport=src_l4, dport=dst_l4) if add_extension_header: # prepend some extension headers - ulp = (IPv6ExtHdrRouting() / IPv6ExtHdrRouting() / - IPv6ExtHdrFragment(offset=0, m=1) / ulp_l4) + ulp = ( + IPv6ExtHdrRouting() + / IPv6ExtHdrRouting() + / IPv6ExtHdrFragment(offset=0, m=1) + / ulp_l4 + ) # uncomment below to test invalid ones # ulp = IPv6ExtHdrRouting(len = 200) / ulp_l4 else: ulp = ulp_l4 - p = (Ether(dst=dst_mac, src=src_mac) / - IPv6(src=src_ip6, dst=dst_ip6) / - ulp / - Raw(payload)) + p = ( + Ether(dst=dst_mac, src=src_mac) + / IPv6(src=src_ip6, dst=dst_ip6) + / ulp + / Raw(payload) + ) else: ulp_l4 = UDP(sport=src_l4, dport=dst_l4) # IPv4 does not allow extension headers, # but we rather make it a first fragment flags = 1 if add_extension_header else 0 ulp = ulp_l4 - p = (Ether(dst=dst_mac, src=src_mac) / - IP(src=src_ip4, dst=dst_ip4, frag=0, flags=flags) / - ulp / - Raw(payload)) + p = ( + Ether(dst=dst_mac, src=src_mac) + / IP(src=src_ip4, dst=dst_ip4, frag=0, flags=flags) + / ulp + / Raw(payload) + ) elif modulo == 1: if is_ip6: - ulp_l4 = ICMPv6Unknown(type=128 + icmp_type_delta, - code=icmp_code) + ulp_l4 = ICMPv6Unknown(type=128 + icmp_type_delta, code=icmp_code) ulp = ulp_l4 - p = (Ether(dst=dst_mac, src=src_mac) / - IPv6(src=src_ip6, dst=dst_ip6) / - ulp / - Raw(payload)) + p = ( + Ether(dst=dst_mac, src=src_mac) + / IPv6(src=src_ip6, dst=dst_ip6) + / ulp + / Raw(payload) + ) else: - ulp_l4 = ICMP(type=8 - 8*icmp_type_delta, code=icmp_code) + ulp_l4 = ICMP(type=8 - 8 * icmp_type_delta, code=icmp_code) ulp = ulp_l4 - p = (Ether(dst=dst_mac, src=src_mac) / - IP(src=src_ip4, dst=dst_ip4) / - ulp / - Raw(payload)) + p = ( + Ether(dst=dst_mac, src=src_mac) + / IP(src=src_ip4, dst=dst_ip4) + / ulp + / Raw(payload) + ) if i % 2 == 1: info.data = p.copy() @@ -259,19 +283,20 @@ class TestACLpluginL2L3(VppTestCase): rule_l4_sport = p[ICMPv6Unknown].type rule_l4_dport = p[ICMPv6Unknown].code if p.haslayer(IPv6): - rule_l4_proto = ulp_l4.overload_fields[IPv6]['nh'] + rule_l4_proto = ulp_l4.overload_fields[IPv6]["nh"] else: rule_l4_proto = p[IP].proto - new_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto, - src_prefix=ip_network( - (p[rule_l3_layer].src, rule_prefix_len)), - dst_prefix=ip_network( - (p[rule_l3_layer].dst, rule_prefix_len)), - sport_from=rule_l4_sport, - sport_to=rule_l4_sport, - dport_from=rule_l4_dport, - dport_to=rule_l4_dport) + new_rule = AclRule( + is_permit=is_permit, + proto=rule_l4_proto, + src_prefix=ip_network((p[rule_l3_layer].src, rule_prefix_len)), + dst_prefix=ip_network((p[rule_l3_layer].dst, rule_prefix_len)), + sport_from=rule_l4_sport, + sport_to=rule_l4_sport, + dport_from=rule_l4_dport, + dport_to=rule_l4_dport, + ) rules.append(new_rule) new_rule_permit = copy.copy(new_rule) @@ -287,10 +312,12 @@ class TestACLpluginL2L3(VppTestCase): permit_and_reflect_rules.append(new_rule_permit_and_reflect) self.logger.info("create_stream pkt#%d: %s" % (i, payload)) - return {'stream': pkts, - 'rules': rules, - 'permit_rules': permit_rules, - 'permit_and_reflect_rules': permit_and_reflect_rules} + return { + "stream": pkts, + "rules": rules, + "permit_rules": permit_rules, + "permit_and_reflect_rules": permit_and_reflect_rules, + } def verify_capture(self, dst_ip_if, src_ip_if, capture, reverse): last_info = dict() @@ -316,11 +343,13 @@ class TestACLpluginL2L3(VppTestCase): data = scapy.compat.raw(packet[UDP][Raw]) else: if l3 == IP: - data = scapy.compat.raw(ICMP( - scapy.compat.raw(packet[l3].payload))[Raw]) + data = scapy.compat.raw( + ICMP(scapy.compat.raw(packet[l3].payload))[Raw] + ) else: - data = scapy.compat.raw(ICMPv6Unknown( - scapy.compat.raw(packet[l3].payload)).msgbody) + data = scapy.compat.raw( + ICMPv6Unknown(scapy.compat.raw(packet[l3].payload)).msgbody + ) udp_or_icmp = packet[l3].payload data_obj = Raw(data) # FIXME: make framework believe we are on object @@ -330,8 +359,8 @@ class TestACLpluginL2L3(VppTestCase): self.assertEqual(payload_info.dst, dst_ip_sw_if_index) next_info = self.get_next_packet_info_for_interface2( - payload_info.src, dst_ip_sw_if_index, - last_info[payload_info.src]) + payload_info.src, dst_ip_sw_if_index, last_info[payload_info.src] + ) last_info[payload_info.src] = next_info self.assertTrue(next_info is not None) self.assertEqual(packet_index, next_info.index) @@ -400,13 +429,13 @@ class TestACLpluginL2L3(VppTestCase): # change the ACLs a few times for i in range(1, 10): shuffle(all_rules) - acl1.modify_vpp_config(all_rules[::1+(i % 2)]) + acl1.modify_vpp_config(all_rules[:: 1 + (i % 2)]) shuffle(all_rules) - acl2.modify_vpp_config(all_rules[::1+(i % 3)]) + acl2.modify_vpp_config(all_rules[:: 1 + (i % 3)]) shuffle(all_rules) - acl3.modify_vpp_config(all_rules[::1+(i % 5)]) + acl3.modify_vpp_config(all_rules[:: 1 + (i % 5)]) # restore to how it was before and clean up acl_if.n_input = saved_n_input @@ -417,143 +446,200 @@ class TestACLpluginL2L3(VppTestCase): acl2.remove_vpp_config() acl3.remove_vpp_config() - def create_acls_for_a_stream(self, stream_dict, - test_l2_action, is_reflect): - r = stream_dict['rules'] - r_permit = stream_dict['permit_rules'] - r_permit_reflect = stream_dict['permit_and_reflect_rules'] + def create_acls_for_a_stream(self, stream_dict, test_l2_action, is_reflect): + r = stream_dict["rules"] + r_permit = stream_dict["permit_rules"] + r_permit_reflect = stream_dict["permit_and_reflect_rules"] r_action = r_permit_reflect if is_reflect else r action_acl = VppAcl(self, rules=r_action, tag="act. acl") action_acl.add_vpp_config() permit_acl = VppAcl(self, rules=r_permit, tag="perm. acl") permit_acl.add_vpp_config() - return {'L2': action_acl if test_l2_action else permit_acl, - 'L3': permit_acl if test_l2_action else action_acl, - 'permit': permit_acl, 'action': action_acl} - - def apply_acl_ip46_x_to_y(self, bridged_to_routed, test_l2_deny, - is_ip6, is_reflect, add_eh): - """ Apply the ACLs - """ + return { + "L2": action_acl if test_l2_action else permit_acl, + "L3": permit_acl if test_l2_action else action_acl, + "permit": permit_acl, + "action": action_acl, + } + + def apply_acl_ip46_x_to_y( + self, bridged_to_routed, test_l2_deny, is_ip6, is_reflect, add_eh + ): + """Apply the ACLs""" self.reset_packet_infos() stream_dict = self.create_stream( - self.pg2, self.loop0, + self.pg2, + self.loop0, bridged_to_routed, - self.pg_if_packet_sizes, is_ip6, - not is_reflect, False, add_eh) - stream = stream_dict['stream'] - acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny, - is_reflect) + self.pg_if_packet_sizes, + is_ip6, + not is_reflect, + False, + add_eh, + ) + stream = stream_dict["stream"] + acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny, is_reflect) n_input_l3 = 0 if bridged_to_routed else 1 n_input_l2 = 1 if bridged_to_routed else 0 - acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index, - n_input=n_input_l3, acls=[acl_idx['L3']]) + acl_if_pg2 = VppAclInterface( + self, + sw_if_index=self.pg2.sw_if_index, + n_input=n_input_l3, + acls=[acl_idx["L3"]], + ) acl_if_pg2.add_vpp_config() - acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index, - n_input=n_input_l2, acls=[acl_idx['L2']]) + acl_if_pg0 = VppAclInterface( + self, + sw_if_index=self.pg0.sw_if_index, + n_input=n_input_l2, + acls=[acl_idx["L2"]], + ) acl_if_pg0.add_vpp_config() - acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index, - n_input=n_input_l2, acls=[acl_idx['L2']]) + acl_if_pg1 = VppAclInterface( + self, + sw_if_index=self.pg1.sw_if_index, + n_input=n_input_l2, + acls=[acl_idx["L2"]], + ) acl_if_pg1.add_vpp_config() self.applied_acl_shuffle(acl_if_pg0) self.applied_acl_shuffle(acl_if_pg1) - return {'L2': acl_idx['L2'], 'L3': acl_idx['L3']} + return {"L2": acl_idx["L2"], "L3": acl_idx["L3"]} - def apply_acl_ip46_both_directions_reflect(self, - primary_is_bridged_to_routed, - reflect_on_l2, is_ip6, add_eh, - stateful_icmp): + def apply_acl_ip46_both_directions_reflect( + self, primary_is_bridged_to_routed, reflect_on_l2, is_ip6, add_eh, stateful_icmp + ): primary_is_routed_to_bridged = not primary_is_bridged_to_routed self.reset_packet_infos() - stream_dict_fwd = self.create_stream(self.pg2, self.loop0, - primary_is_bridged_to_routed, - self.pg_if_packet_sizes, is_ip6, - False, False, add_eh, - stateful_icmp) - acl_idx_fwd = self.create_acls_for_a_stream(stream_dict_fwd, - reflect_on_l2, True) - - stream_dict_rev = self.create_stream(self.pg2, self.loop0, - not primary_is_bridged_to_routed, - self.pg_if_packet_sizes, is_ip6, - True, True, add_eh, stateful_icmp) + stream_dict_fwd = self.create_stream( + self.pg2, + self.loop0, + primary_is_bridged_to_routed, + self.pg_if_packet_sizes, + is_ip6, + False, + False, + add_eh, + stateful_icmp, + ) + acl_idx_fwd = self.create_acls_for_a_stream( + stream_dict_fwd, reflect_on_l2, True + ) + + stream_dict_rev = self.create_stream( + self.pg2, + self.loop0, + not primary_is_bridged_to_routed, + self.pg_if_packet_sizes, + is_ip6, + True, + True, + add_eh, + stateful_icmp, + ) # We want the primary action to be "deny" rather than reflect - acl_idx_rev = self.create_acls_for_a_stream(stream_dict_rev, - reflect_on_l2, False) + acl_idx_rev = self.create_acls_for_a_stream( + stream_dict_rev, reflect_on_l2, False + ) if primary_is_bridged_to_routed: - inbound_l2_acl = acl_idx_fwd['L2'] + inbound_l2_acl = acl_idx_fwd["L2"] else: - inbound_l2_acl = acl_idx_rev['L2'] + inbound_l2_acl = acl_idx_rev["L2"] if primary_is_routed_to_bridged: - outbound_l2_acl = acl_idx_fwd['L2'] + outbound_l2_acl = acl_idx_fwd["L2"] else: - outbound_l2_acl = acl_idx_rev['L2'] + outbound_l2_acl = acl_idx_rev["L2"] if primary_is_routed_to_bridged: - inbound_l3_acl = acl_idx_fwd['L3'] + inbound_l3_acl = acl_idx_fwd["L3"] else: - inbound_l3_acl = acl_idx_rev['L3'] + inbound_l3_acl = acl_idx_rev["L3"] if primary_is_bridged_to_routed: - outbound_l3_acl = acl_idx_fwd['L3'] + outbound_l3_acl = acl_idx_fwd["L3"] else: - outbound_l3_acl = acl_idx_rev['L3'] - - acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index, - n_input=1, - acls=[inbound_l3_acl, outbound_l3_acl]) + outbound_l3_acl = acl_idx_rev["L3"] + + acl_if_pg2 = VppAclInterface( + self, + sw_if_index=self.pg2.sw_if_index, + n_input=1, + acls=[inbound_l3_acl, outbound_l3_acl], + ) acl_if_pg2.add_vpp_config() - acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index, - n_input=1, - acls=[inbound_l2_acl, outbound_l2_acl]) + acl_if_pg0 = VppAclInterface( + self, + sw_if_index=self.pg0.sw_if_index, + n_input=1, + acls=[inbound_l2_acl, outbound_l2_acl], + ) acl_if_pg0.add_vpp_config() - acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index, - n_input=1, - acls=[inbound_l2_acl, outbound_l2_acl]) + acl_if_pg1 = VppAclInterface( + self, + sw_if_index=self.pg1.sw_if_index, + n_input=1, + acls=[inbound_l2_acl, outbound_l2_acl], + ) acl_if_pg1.add_vpp_config() self.applied_acl_shuffle(acl_if_pg0) self.applied_acl_shuffle(acl_if_pg2) - def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6, - is_reflect, add_eh): - return self.apply_acl_ip46_x_to_y(False, test_l2_deny, is_ip6, - is_reflect, add_eh) + def apply_acl_ip46_routed_to_bridged( + self, test_l2_deny, is_ip6, is_reflect, add_eh + ): + return self.apply_acl_ip46_x_to_y( + False, test_l2_deny, is_ip6, is_reflect, add_eh + ) - def apply_acl_ip46_bridged_to_routed(self, test_l2_deny, is_ip6, - is_reflect, add_eh): - return self.apply_acl_ip46_x_to_y(True, test_l2_deny, is_ip6, - is_reflect, add_eh) + def apply_acl_ip46_bridged_to_routed( + self, test_l2_deny, is_ip6, is_reflect, add_eh + ): + return self.apply_acl_ip46_x_to_y( + True, test_l2_deny, is_ip6, is_reflect, add_eh + ) def verify_acl_packet_count(self, acl_idx, packet_count): - matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx) + matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx) self.logger.info("stat seg for ACL %d: %s" % (acl_idx, repr(matches))) total_count = 0 for m in matches: for p in m: - total_count = total_count + p['packets'] + total_count = total_count + p["packets"] self.assertEqual(total_count, packet_count) - def run_traffic_ip46_x_to_y(self, bridged_to_routed, - test_l2_deny, is_ip6, - is_reflect, is_established, add_eh, - stateful_icmp=False): + def run_traffic_ip46_x_to_y( + self, + bridged_to_routed, + test_l2_deny, + is_ip6, + is_reflect, + is_established, + add_eh, + stateful_icmp=False, + ): self.reset_packet_infos() - stream_dict = self.create_stream(self.pg2, self.loop0, - bridged_to_routed, - self.pg_if_packet_sizes, is_ip6, - not is_reflect, is_established, - add_eh, stateful_icmp) - stream = stream_dict['stream'] + stream_dict = self.create_stream( + self.pg2, + self.loop0, + bridged_to_routed, + self.pg_if_packet_sizes, + is_ip6, + not is_reflect, + is_established, + add_eh, + stateful_icmp, + ) + stream = stream_dict["stream"] tx_if = self.pg0 if bridged_to_routed else self.pg2 rx_if = self.pg2 if bridged_to_routed else self.pg0 @@ -566,68 +652,90 @@ class TestACLpluginL2L3(VppTestCase): self.verify_capture(self.loop0, self.pg2, rcvd1, bridged_to_routed) return len(stream) - def run_traffic_ip46_routed_to_bridged(self, test_l2_deny, is_ip6, - is_reflect, is_established, add_eh, - stateful_icmp=False): - return self.run_traffic_ip46_x_to_y(False, test_l2_deny, is_ip6, - is_reflect, is_established, add_eh, - stateful_icmp) - - def run_traffic_ip46_bridged_to_routed(self, test_l2_deny, is_ip6, - is_reflect, is_established, add_eh, - stateful_icmp=False): - return self.run_traffic_ip46_x_to_y(True, test_l2_deny, is_ip6, - is_reflect, is_established, add_eh, - stateful_icmp) - - def run_test_ip46_routed_to_bridged(self, test_l2_deny, - is_ip6, is_reflect, add_eh): - acls = self.apply_acl_ip46_routed_to_bridged(test_l2_deny, - is_ip6, is_reflect, - add_eh) - pkts = self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6, - is_reflect, False, - add_eh) - self.verify_acl_packet_count(acls['L3'].acl_index, pkts) - - def run_test_ip46_bridged_to_routed(self, test_l2_deny, - is_ip6, is_reflect, add_eh): - acls = self.apply_acl_ip46_bridged_to_routed(test_l2_deny, - is_ip6, is_reflect, - add_eh) - pkts = self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6, - is_reflect, False, - add_eh) - self.verify_acl_packet_count(acls['L2'].acl_index, pkts) - - def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action, - is_ip6, add_eh, - stateful_icmp=False): - self.apply_acl_ip46_both_directions_reflect(False, test_l2_action, - is_ip6, add_eh, - stateful_icmp) - self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6, - True, False, add_eh, - stateful_icmp) - self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6, - False, True, add_eh, - stateful_icmp) - - def run_test_ip46_bridged_to_routed_and_back(self, test_l2_action, - is_ip6, add_eh, - stateful_icmp=False): - self.apply_acl_ip46_both_directions_reflect(True, test_l2_action, - is_ip6, add_eh, - stateful_icmp) - self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6, - True, False, add_eh, - stateful_icmp) - self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6, - False, True, add_eh, - stateful_icmp) + def run_traffic_ip46_routed_to_bridged( + self, + test_l2_deny, + is_ip6, + is_reflect, + is_established, + add_eh, + stateful_icmp=False, + ): + return self.run_traffic_ip46_x_to_y( + False, + test_l2_deny, + is_ip6, + is_reflect, + is_established, + add_eh, + stateful_icmp, + ) + + def run_traffic_ip46_bridged_to_routed( + self, + test_l2_deny, + is_ip6, + is_reflect, + is_established, + add_eh, + stateful_icmp=False, + ): + return self.run_traffic_ip46_x_to_y( + True, + test_l2_deny, + is_ip6, + is_reflect, + is_established, + add_eh, + stateful_icmp, + ) + + def run_test_ip46_routed_to_bridged(self, test_l2_deny, is_ip6, is_reflect, add_eh): + acls = self.apply_acl_ip46_routed_to_bridged( + test_l2_deny, is_ip6, is_reflect, add_eh + ) + pkts = self.run_traffic_ip46_routed_to_bridged( + test_l2_deny, is_ip6, is_reflect, False, add_eh + ) + self.verify_acl_packet_count(acls["L3"].acl_index, pkts) + + def run_test_ip46_bridged_to_routed(self, test_l2_deny, is_ip6, is_reflect, add_eh): + acls = self.apply_acl_ip46_bridged_to_routed( + test_l2_deny, is_ip6, is_reflect, add_eh + ) + pkts = self.run_traffic_ip46_bridged_to_routed( + test_l2_deny, is_ip6, is_reflect, False, add_eh + ) + self.verify_acl_packet_count(acls["L2"].acl_index, pkts) + + def run_test_ip46_routed_to_bridged_and_back( + self, test_l2_action, is_ip6, add_eh, stateful_icmp=False + ): + self.apply_acl_ip46_both_directions_reflect( + False, test_l2_action, is_ip6, add_eh, stateful_icmp + ) + self.run_traffic_ip46_routed_to_bridged( + test_l2_action, is_ip6, True, False, add_eh, stateful_icmp + ) + self.run_traffic_ip46_bridged_to_routed( + test_l2_action, is_ip6, False, True, add_eh, stateful_icmp + ) + + def run_test_ip46_bridged_to_routed_and_back( + self, test_l2_action, is_ip6, add_eh, stateful_icmp=False + ): + self.apply_acl_ip46_both_directions_reflect( + True, test_l2_action, is_ip6, add_eh, stateful_icmp + ) + self.run_traffic_ip46_bridged_to_routed( + test_l2_action, is_ip6, True, False, add_eh, stateful_icmp + ) + self.run_traffic_ip46_routed_to_bridged( + test_l2_action, is_ip6, False, True, add_eh, stateful_icmp + ) def test_0000_ip6_irb_1(self): - """ ACL plugin prepare""" + """ACL plugin prepare""" if not self.vpp_dead: cmd = "set acl-plugin session timeout udp idle 2000" self.logger.info(self.vapi.ppcli(cmd)) @@ -646,219 +754,188 @@ class TestACLpluginL2L3(VppTestCase): # "set acl-plugin l2-datapath old")) def test_0001_ip6_irb_1(self): - """ ACL IPv6 routed -> bridged, L2 ACL deny""" - self.run_test_ip46_routed_to_bridged(True, True, False, - self.WITHOUT_EH) + """ACL IPv6 routed -> bridged, L2 ACL deny""" + self.run_test_ip46_routed_to_bridged(True, True, False, self.WITHOUT_EH) def test_0002_ip6_irb_1(self): - """ ACL IPv6 routed -> bridged, L3 ACL deny""" - self.run_test_ip46_routed_to_bridged(False, True, False, - self.WITHOUT_EH) + """ACL IPv6 routed -> bridged, L3 ACL deny""" + self.run_test_ip46_routed_to_bridged(False, True, False, self.WITHOUT_EH) def test_0003_ip4_irb_1(self): - """ ACL IPv4 routed -> bridged, L2 ACL deny""" - self.run_test_ip46_routed_to_bridged(True, False, False, - self.WITHOUT_EH) + """ACL IPv4 routed -> bridged, L2 ACL deny""" + self.run_test_ip46_routed_to_bridged(True, False, False, self.WITHOUT_EH) def test_0004_ip4_irb_1(self): - """ ACL IPv4 routed -> bridged, L3 ACL deny""" - self.run_test_ip46_routed_to_bridged(False, False, False, - self.WITHOUT_EH) + """ACL IPv4 routed -> bridged, L3 ACL deny""" + self.run_test_ip46_routed_to_bridged(False, False, False, self.WITHOUT_EH) def test_0005_ip6_irb_1(self): - """ ACL IPv6 bridged -> routed, L2 ACL deny """ - self.run_test_ip46_bridged_to_routed(True, True, False, - self.WITHOUT_EH) + """ACL IPv6 bridged -> routed, L2 ACL deny""" + self.run_test_ip46_bridged_to_routed(True, True, False, self.WITHOUT_EH) def test_0006_ip6_irb_1(self): - """ ACL IPv6 bridged -> routed, L3 ACL deny """ - self.run_test_ip46_bridged_to_routed(False, True, False, - self.WITHOUT_EH) + """ACL IPv6 bridged -> routed, L3 ACL deny""" + self.run_test_ip46_bridged_to_routed(False, True, False, self.WITHOUT_EH) def test_0007_ip6_irb_1(self): - """ ACL IPv4 bridged -> routed, L2 ACL deny """ - self.run_test_ip46_bridged_to_routed(True, False, False, - self.WITHOUT_EH) + """ACL IPv4 bridged -> routed, L2 ACL deny""" + self.run_test_ip46_bridged_to_routed(True, False, False, self.WITHOUT_EH) def test_0008_ip6_irb_1(self): - """ ACL IPv4 bridged -> routed, L3 ACL deny """ - self.run_test_ip46_bridged_to_routed(False, False, False, - self.WITHOUT_EH) + """ACL IPv4 bridged -> routed, L3 ACL deny""" + self.run_test_ip46_bridged_to_routed(False, False, False, self.WITHOUT_EH) # Stateful ACL tests def test_0101_ip6_irb_1(self): - """ ACL IPv6 routed -> bridged, L2 ACL permit+reflect""" - self.run_test_ip46_routed_to_bridged_and_back(True, True, - self.WITHOUT_EH) + """ACL IPv6 routed -> bridged, L2 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(True, True, self.WITHOUT_EH) def test_0102_ip6_irb_1(self): - """ ACL IPv6 bridged -> routed, L2 ACL permit+reflect""" - self.run_test_ip46_bridged_to_routed_and_back(True, True, - self.WITHOUT_EH) + """ACL IPv6 bridged -> routed, L2 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(True, True, self.WITHOUT_EH) def test_0103_ip6_irb_1(self): - """ ACL IPv4 routed -> bridged, L2 ACL permit+reflect""" - self.run_test_ip46_routed_to_bridged_and_back(True, False, - self.WITHOUT_EH) + """ACL IPv4 routed -> bridged, L2 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(True, False, self.WITHOUT_EH) def test_0104_ip6_irb_1(self): - """ ACL IPv4 bridged -> routed, L2 ACL permit+reflect""" - self.run_test_ip46_bridged_to_routed_and_back(True, False, - self.WITHOUT_EH) + """ACL IPv4 bridged -> routed, L2 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(True, False, self.WITHOUT_EH) def test_0111_ip6_irb_1(self): - """ ACL IPv6 routed -> bridged, L3 ACL permit+reflect""" - self.run_test_ip46_routed_to_bridged_and_back(False, True, - self.WITHOUT_EH) + """ACL IPv6 routed -> bridged, L3 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(False, True, self.WITHOUT_EH) def test_0112_ip6_irb_1(self): - """ ACL IPv6 bridged -> routed, L3 ACL permit+reflect""" - self.run_test_ip46_bridged_to_routed_and_back(False, True, - self.WITHOUT_EH) + """ACL IPv6 bridged -> routed, L3 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(False, True, self.WITHOUT_EH) def test_0113_ip6_irb_1(self): - """ ACL IPv4 routed -> bridged, L3 ACL permit+reflect""" - self.run_test_ip46_routed_to_bridged_and_back(False, False, - self.WITHOUT_EH) + """ACL IPv4 routed -> bridged, L3 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(False, False, self.WITHOUT_EH) def test_0114_ip6_irb_1(self): - """ ACL IPv4 bridged -> routed, L3 ACL permit+reflect""" - self.run_test_ip46_bridged_to_routed_and_back(False, False, - self.WITHOUT_EH) + """ACL IPv4 bridged -> routed, L3 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(False, False, self.WITHOUT_EH) # A block of tests with extension headers def test_1001_ip6_irb_1(self): - """ ACL IPv6+EH routed -> bridged, L2 ACL deny""" - self.run_test_ip46_routed_to_bridged(True, True, False, - self.WITH_EH) + """ACL IPv6+EH routed -> bridged, L2 ACL deny""" + self.run_test_ip46_routed_to_bridged(True, True, False, self.WITH_EH) def test_1002_ip6_irb_1(self): - """ ACL IPv6+EH routed -> bridged, L3 ACL deny""" - self.run_test_ip46_routed_to_bridged(False, True, False, - self.WITH_EH) + """ACL IPv6+EH routed -> bridged, L3 ACL deny""" + self.run_test_ip46_routed_to_bridged(False, True, False, self.WITH_EH) def test_1005_ip6_irb_1(self): - """ ACL IPv6+EH bridged -> routed, L2 ACL deny """ - self.run_test_ip46_bridged_to_routed(True, True, False, - self.WITH_EH) + """ACL IPv6+EH bridged -> routed, L2 ACL deny""" + self.run_test_ip46_bridged_to_routed(True, True, False, self.WITH_EH) def test_1006_ip6_irb_1(self): - """ ACL IPv6+EH bridged -> routed, L3 ACL deny """ - self.run_test_ip46_bridged_to_routed(False, True, False, - self.WITH_EH) + """ACL IPv6+EH bridged -> routed, L3 ACL deny""" + self.run_test_ip46_bridged_to_routed(False, True, False, self.WITH_EH) def test_1101_ip6_irb_1(self): - """ ACL IPv6+EH routed -> bridged, L2 ACL permit+reflect""" - self.run_test_ip46_routed_to_bridged_and_back(True, True, - self.WITH_EH) + """ACL IPv6+EH routed -> bridged, L2 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(True, True, self.WITH_EH) def test_1102_ip6_irb_1(self): - """ ACL IPv6+EH bridged -> routed, L2 ACL permit+reflect""" - self.run_test_ip46_bridged_to_routed_and_back(True, True, - self.WITH_EH) + """ACL IPv6+EH bridged -> routed, L2 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(True, True, self.WITH_EH) def test_1111_ip6_irb_1(self): - """ ACL IPv6+EH routed -> bridged, L3 ACL permit+reflect""" - self.run_test_ip46_routed_to_bridged_and_back(False, True, - self.WITH_EH) + """ACL IPv6+EH routed -> bridged, L3 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(False, True, self.WITH_EH) def test_1112_ip6_irb_1(self): - """ ACL IPv6+EH bridged -> routed, L3 ACL permit+reflect""" - self.run_test_ip46_bridged_to_routed_and_back(False, True, - self.WITH_EH) + """ACL IPv6+EH bridged -> routed, L3 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(False, True, self.WITH_EH) # IPv4 with "MF" bit set def test_1201_ip6_irb_1(self): - """ ACL IPv4+MF routed -> bridged, L2 ACL deny""" - self.run_test_ip46_routed_to_bridged(True, False, False, - self.WITH_EH) + """ACL IPv4+MF routed -> bridged, L2 ACL deny""" + self.run_test_ip46_routed_to_bridged(True, False, False, self.WITH_EH) def test_1202_ip6_irb_1(self): - """ ACL IPv4+MF routed -> bridged, L3 ACL deny""" - self.run_test_ip46_routed_to_bridged(False, False, False, - self.WITH_EH) + """ACL IPv4+MF routed -> bridged, L3 ACL deny""" + self.run_test_ip46_routed_to_bridged(False, False, False, self.WITH_EH) def test_1205_ip6_irb_1(self): - """ ACL IPv4+MF bridged -> routed, L2 ACL deny """ - self.run_test_ip46_bridged_to_routed(True, False, False, - self.WITH_EH) + """ACL IPv4+MF bridged -> routed, L2 ACL deny""" + self.run_test_ip46_bridged_to_routed(True, False, False, self.WITH_EH) def test_1206_ip6_irb_1(self): - """ ACL IPv4+MF bridged -> routed, L3 ACL deny """ - self.run_test_ip46_bridged_to_routed(False, False, False, - self.WITH_EH) + """ACL IPv4+MF bridged -> routed, L3 ACL deny""" + self.run_test_ip46_bridged_to_routed(False, False, False, self.WITH_EH) def test_1301_ip6_irb_1(self): - """ ACL IPv4+MF routed -> bridged, L2 ACL permit+reflect""" - self.run_test_ip46_routed_to_bridged_and_back(True, False, - self.WITH_EH) + """ACL IPv4+MF routed -> bridged, L2 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(True, False, self.WITH_EH) def test_1302_ip6_irb_1(self): - """ ACL IPv4+MF bridged -> routed, L2 ACL permit+reflect""" - self.run_test_ip46_bridged_to_routed_and_back(True, False, - self.WITH_EH) + """ACL IPv4+MF bridged -> routed, L2 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(True, False, self.WITH_EH) def test_1311_ip6_irb_1(self): - """ ACL IPv4+MF routed -> bridged, L3 ACL permit+reflect""" - self.run_test_ip46_routed_to_bridged_and_back(False, False, - self.WITH_EH) + """ACL IPv4+MF routed -> bridged, L3 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(False, False, self.WITH_EH) def test_1312_ip6_irb_1(self): - """ ACL IPv4+MF bridged -> routed, L3 ACL permit+reflect""" - self.run_test_ip46_bridged_to_routed_and_back(False, False, - self.WITH_EH) + """ACL IPv4+MF bridged -> routed, L3 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(False, False, self.WITH_EH) + # Stateful ACL tests with stateful ICMP def test_1401_ip6_irb_1(self): - """ IPv6 routed -> bridged, L2 ACL permit+reflect, ICMP reflect""" - self.run_test_ip46_routed_to_bridged_and_back(True, True, - self.WITHOUT_EH, - self.STATEFUL_ICMP) + """IPv6 routed -> bridged, L2 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_routed_to_bridged_and_back( + True, True, self.WITHOUT_EH, self.STATEFUL_ICMP + ) def test_1402_ip6_irb_1(self): - """ IPv6 bridged -> routed, L2 ACL permit+reflect, ICMP reflect""" - self.run_test_ip46_bridged_to_routed_and_back(True, True, - self.WITHOUT_EH, - self.STATEFUL_ICMP) + """IPv6 bridged -> routed, L2 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_bridged_to_routed_and_back( + True, True, self.WITHOUT_EH, self.STATEFUL_ICMP + ) def test_1403_ip4_irb_1(self): - """ IPv4 routed -> bridged, L2 ACL permit+reflect, ICMP reflect""" - self.run_test_ip46_routed_to_bridged_and_back(True, False, - self.WITHOUT_EH, - self.STATEFUL_ICMP) + """IPv4 routed -> bridged, L2 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_routed_to_bridged_and_back( + True, False, self.WITHOUT_EH, self.STATEFUL_ICMP + ) def test_1404_ip4_irb_1(self): - """ IPv4 bridged -> routed, L2 ACL permit+reflect, ICMP reflect""" - self.run_test_ip46_bridged_to_routed_and_back(True, False, - self.WITHOUT_EH, - self.STATEFUL_ICMP) + """IPv4 bridged -> routed, L2 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_bridged_to_routed_and_back( + True, False, self.WITHOUT_EH, self.STATEFUL_ICMP + ) def test_1411_ip6_irb_1(self): - """ IPv6 routed -> bridged, L3 ACL permit+reflect, ICMP reflect""" - self.run_test_ip46_routed_to_bridged_and_back(False, True, - self.WITHOUT_EH, - self.STATEFUL_ICMP) + """IPv6 routed -> bridged, L3 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_routed_to_bridged_and_back( + False, True, self.WITHOUT_EH, self.STATEFUL_ICMP + ) def test_1412_ip6_irb_1(self): - """ IPv6 bridged -> routed, L3 ACL permit+reflect, ICMP reflect""" - self.run_test_ip46_bridged_to_routed_and_back(False, True, - self.WITHOUT_EH, - self.STATEFUL_ICMP) + """IPv6 bridged -> routed, L3 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_bridged_to_routed_and_back( + False, True, self.WITHOUT_EH, self.STATEFUL_ICMP + ) def test_1413_ip4_irb_1(self): - """ IPv4 routed -> bridged, L3 ACL permit+reflect, ICMP reflect""" - self.run_test_ip46_routed_to_bridged_and_back(False, False, - self.WITHOUT_EH, - self.STATEFUL_ICMP) + """IPv4 routed -> bridged, L3 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_routed_to_bridged_and_back( + False, False, self.WITHOUT_EH, self.STATEFUL_ICMP + ) def test_1414_ip4_irb_1(self): - """ IPv4 bridged -> routed, L3 ACL permit+reflect, ICMP reflect""" - self.run_test_ip46_bridged_to_routed_and_back(False, False, - self.WITHOUT_EH, - self.STATEFUL_ICMP) + """IPv4 bridged -> routed, L3 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_bridged_to_routed_and_back( + False, False, self.WITHOUT_EH, self.STATEFUL_ICMP + ) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)