X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_acl_plugin_l2l3.py;h=343e611751baad43a2a85cce93657dd0b65266ff;hb=d9b0c6fbf7aa5bd9af84264105b39c82028a4a29;hp=73dd473c67f354117fd5fae0d737a63358daa96e;hpb=4a4cea02ef82793232cab4d878baca5cf0134966;p=vpp.git diff --git a/test/test_acl_plugin_l2l3.py b/test/test_acl_plugin_l2l3.py index 73dd473c67f..343e611751b 100644 --- a/test/test_acl_plugin_l2l3.py +++ b/test/test_acl_plugin_l2l3.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 """ACL IRB Test Case HLD: **config** @@ -23,11 +23,14 @@ """ +import copy import unittest from socket import inet_pton, AF_INET, AF_INET6 from random import choice, shuffle from pprint import pprint +from ipaddress import ip_network +import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP, ICMP, TCP @@ -36,9 +39,11 @@ from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting from scapy.layers.inet6 import IPv6ExtHdrFragment from framework import VppTestCase, VppTestRunner -from vpp_papi_provider import L2_PORT_TYPE +from vpp_l2 import L2_PORT_TYPE import time +from vpp_acl import AclRule, VppAcl, VppAclInterface + class TestACLpluginL2L3(VppTestCase): """TestACLpluginL2L3 Test Case""" @@ -71,12 +76,16 @@ class TestACLpluginL2L3(VppTestCase): # Create BD with MAC learning enabled and put interfaces to this BD cls.vapi.sw_interface_set_l2_bridge( - cls.loop0.sw_if_index, bd_id=cls.bd_id, - port_type=L2_PORT_TYPE.BVI) + rx_sw_if_index=cls.loop0.sw_if_index, + bd_id=cls.bd_id, + port_type=L2_PORT_TYPE.BVI, + ) cls.vapi.sw_interface_set_l2_bridge( - cls.pg0.sw_if_index, bd_id=cls.bd_id) + rx_sw_if_index=cls.pg0.sw_if_index, bd_id=cls.bd_id + ) cls.vapi.sw_interface_set_l2_bridge( - cls.pg1.sw_if_index, bd_id=cls.bd_id) + rx_sw_if_index=cls.pg1.sw_if_index, bd_id=cls.bd_id + ) # Configure IPv4 addresses on loopback interface and routed interface cls.loop0.config_ip4() @@ -102,44 +111,59 @@ class TestACLpluginL2L3(VppTestCase): half = cls.remote_hosts_count // 2 cls.pg0.remote_hosts = cls.loop0.remote_hosts[:half] cls.pg1.remote_hosts = cls.loop0.remote_hosts[half:] + reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=1) + + @classmethod + def tearDownClass(cls): + reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=0) + super(TestACLpluginL2L3, cls).tearDownClass() def tearDown(self): """Run standard test teardown and log ``show l2patch``, ``show l2fib verbose``,``show bridge-domain detail``, - ``show ip arp``. + ``show ip neighbors``. """ super(TestACLpluginL2L3, self).tearDown() - if not self.vpp_dead: - self.logger.info(self.vapi.cli("show l2patch")) - self.logger.info(self.vapi.cli("show classify tables")) - self.logger.info(self.vapi.cli("show l2fib verbose")) - self.logger.info(self.vapi.cli("show bridge-domain %s detail" % - self.bd_id)) - self.logger.info(self.vapi.cli("show ip arp")) - self.logger.info(self.vapi.cli("show ip6 neighbors")) - cmd = "show acl-plugin sessions verbose 1" - self.logger.info(self.vapi.cli(cmd)) - self.logger.info(self.vapi.cli("show acl-plugin acl")) - self.logger.info(self.vapi.cli("show acl-plugin interface")) - self.logger.info(self.vapi.cli("show acl-plugin tables")) - - def create_stream(self, src_ip_if, dst_ip_if, reverse, packet_sizes, - is_ip6, expect_blocked, expect_established, - add_extension_header, icmp_stateful=False): + + def show_commands_at_teardown(self): + self.logger.info(self.vapi.cli("show l2patch")) + self.logger.info(self.vapi.cli("show classify tables")) + self.logger.info(self.vapi.cli("show l2fib verbose")) + self.logger.info(self.vapi.cli("show bridge-domain %s detail" % self.bd_id)) + self.logger.info(self.vapi.cli("show ip neighbors")) + cmd = "show acl-plugin sessions verbose 1" + self.logger.info(self.vapi.cli(cmd)) + self.logger.info(self.vapi.cli("show acl-plugin acl")) + self.logger.info(self.vapi.cli("show acl-plugin interface")) + self.logger.info(self.vapi.cli("show acl-plugin tables")) + + def create_stream( + self, + src_ip_if, + dst_ip_if, + reverse, + packet_sizes, + is_ip6, + expect_blocked, + expect_established, + add_extension_header, + icmp_stateful=False, + ): pkts = [] rules = [] permit_rules = [] permit_and_reflect_rules = [] total_packet_count = 8 for i in range(0, total_packet_count): - modulo = (i//2) % 2 + modulo = (i // 2) % 2 icmp_type_delta = i % 2 icmp_code = i - is_udp_packet = (modulo == 0) + is_udp_packet = modulo == 0 if is_udp_packet and icmp_stateful: continue - is_reflectable_icmp = (icmp_stateful and icmp_type_delta == 0 and - not is_udp_packet) + is_reflectable_icmp = ( + icmp_stateful and icmp_type_delta == 0 and not is_udp_packet + ) is_reflected_icmp = is_reflectable_icmp and expect_established can_reflect_this_packet = is_udp_packet or is_reflectable_icmp is_permit = i % 2 @@ -150,9 +174,9 @@ class TestACLpluginL2L3(VppTestCase): payload = self.info_to_payload(info) else: to_be_blocked = False - if (expect_blocked and not expect_established): + if expect_blocked and not expect_established: to_be_blocked = True - if (not can_reflect_this_packet): + if not can_reflect_this_packet: to_be_blocked = True if to_be_blocked: payload = "to be blocked" @@ -160,7 +184,7 @@ class TestACLpluginL2L3(VppTestCase): info = self.create_packet_info(src_ip_if, dst_ip_if) payload = self.info_to_payload(info) if reverse: - dst_mac = 'de:ad:00:00:00:00' + dst_mac = "de:ad:00:00:00:00" src_mac = remote_dst_host._mac dst_ip6 = src_ip_if.remote_ip6 src_ip6 = remote_dst_host.ip6 @@ -190,42 +214,53 @@ class TestACLpluginL2L3(VppTestCase): ulp_l4 = UDP(sport=src_l4, dport=dst_l4) if add_extension_header: # prepend some extension headers - ulp = (IPv6ExtHdrRouting() / IPv6ExtHdrRouting() / - IPv6ExtHdrFragment(offset=0, m=1) / ulp_l4) + ulp = ( + IPv6ExtHdrRouting() + / IPv6ExtHdrRouting() + / IPv6ExtHdrFragment(offset=0, m=1) + / ulp_l4 + ) # uncomment below to test invalid ones # ulp = IPv6ExtHdrRouting(len = 200) / ulp_l4 else: ulp = ulp_l4 - p = (Ether(dst=dst_mac, src=src_mac) / - IPv6(src=src_ip6, dst=dst_ip6) / - ulp / - Raw(payload)) + p = ( + Ether(dst=dst_mac, src=src_mac) + / IPv6(src=src_ip6, dst=dst_ip6) + / ulp + / Raw(payload) + ) else: ulp_l4 = UDP(sport=src_l4, dport=dst_l4) # IPv4 does not allow extension headers, # but we rather make it a first fragment flags = 1 if add_extension_header else 0 ulp = ulp_l4 - p = (Ether(dst=dst_mac, src=src_mac) / - IP(src=src_ip4, dst=dst_ip4, frag=0, flags=flags) / - ulp / - Raw(payload)) + p = ( + Ether(dst=dst_mac, src=src_mac) + / IP(src=src_ip4, dst=dst_ip4, frag=0, flags=flags) + / ulp + / Raw(payload) + ) elif modulo == 1: if is_ip6: - ulp_l4 = ICMPv6Unknown(type=128 + icmp_type_delta, - code=icmp_code) + ulp_l4 = ICMPv6Unknown(type=128 + icmp_type_delta, code=icmp_code) ulp = ulp_l4 - p = (Ether(dst=dst_mac, src=src_mac) / - IPv6(src=src_ip6, dst=dst_ip6) / - ulp / - Raw(payload)) + p = ( + Ether(dst=dst_mac, src=src_mac) + / IPv6(src=src_ip6, dst=dst_ip6) + / ulp + / Raw(payload) + ) else: - ulp_l4 = ICMP(type=8 - 8*icmp_type_delta, code=icmp_code) + ulp_l4 = ICMP(type=8 - 8 * icmp_type_delta, code=icmp_code) ulp = ulp_l4 - p = (Ether(dst=dst_mac, src=src_mac) / - IP(src=src_ip4, dst=dst_ip4) / - ulp / - Raw(payload)) + p = ( + Ether(dst=dst_mac, src=src_mac) + / IP(src=src_ip4, dst=dst_ip4) + / ulp + / Raw(payload) + ) if i % 2 == 1: info.data = p.copy() @@ -248,43 +283,41 @@ class TestACLpluginL2L3(VppTestCase): rule_l4_sport = p[ICMPv6Unknown].type rule_l4_dport = p[ICMPv6Unknown].code if p.haslayer(IPv6): - rule_l4_proto = ulp_l4.overload_fields[IPv6]['nh'] + rule_l4_proto = ulp_l4.overload_fields[IPv6]["nh"] else: rule_l4_proto = p[IP].proto - new_rule = { - 'is_permit': is_permit, - 'is_ipv6': p.haslayer(IPv6), - 'src_ip_addr': inet_pton(rule_family, - p[rule_l3_layer].src), - 'src_ip_prefix_len': rule_prefix_len, - 'dst_ip_addr': inet_pton(rule_family, - p[rule_l3_layer].dst), - 'dst_ip_prefix_len': rule_prefix_len, - 'srcport_or_icmptype_first': rule_l4_sport, - 'srcport_or_icmptype_last': rule_l4_sport, - 'dstport_or_icmpcode_first': rule_l4_dport, - 'dstport_or_icmpcode_last': rule_l4_dport, - 'proto': rule_l4_proto, - } + new_rule = AclRule( + is_permit=is_permit, + proto=rule_l4_proto, + src_prefix=ip_network((p[rule_l3_layer].src, rule_prefix_len)), + dst_prefix=ip_network((p[rule_l3_layer].dst, rule_prefix_len)), + sport_from=rule_l4_sport, + sport_to=rule_l4_sport, + dport_from=rule_l4_dport, + dport_to=rule_l4_dport, + ) + rules.append(new_rule) - new_rule_permit = new_rule.copy() - new_rule_permit['is_permit'] = 1 + new_rule_permit = copy.copy(new_rule) + new_rule_permit.is_permit = 1 permit_rules.append(new_rule_permit) - new_rule_permit_and_reflect = new_rule.copy() + new_rule_permit_and_reflect = copy.copy(new_rule) if can_reflect_this_packet: - new_rule_permit_and_reflect['is_permit'] = 2 + new_rule_permit_and_reflect.is_permit = 2 else: - new_rule_permit_and_reflect['is_permit'] = is_permit + new_rule_permit_and_reflect.is_permit = is_permit permit_and_reflect_rules.append(new_rule_permit_and_reflect) self.logger.info("create_stream pkt#%d: %s" % (i, payload)) - return {'stream': pkts, - 'rules': rules, - 'permit_rules': permit_rules, - 'permit_and_reflect_rules': permit_and_reflect_rules} + return { + "stream": pkts, + "rules": rules, + "permit_rules": permit_rules, + "permit_and_reflect_rules": permit_and_reflect_rules, + } def verify_capture(self, dst_ip_if, src_ip_if, capture, reverse): last_info = dict() @@ -292,7 +325,6 @@ class TestACLpluginL2L3(VppTestCase): last_info[i.sw_if_index] = None dst_ip_sw_if_index = dst_ip_if.sw_if_index - return for packet in capture: l3 = IP if packet.haslayer(IP) else IPv6 @@ -308,21 +340,27 @@ class TestACLpluginL2L3(VppTestCase): # Scapy IPv6 stuff is too smart for its own good. # So we do this and coerce the ICMP into unknown type if packet.haslayer(UDP): - data = str(packet[UDP][Raw]) + data = scapy.compat.raw(packet[UDP][Raw]) else: if l3 == IP: - data = str(ICMP(str(packet[l3].payload))[Raw]) + data = scapy.compat.raw( + ICMP(scapy.compat.raw(packet[l3].payload))[Raw] + ) else: - data = str(ICMPv6Unknown(str(packet[l3].payload)).msgbody) + data = scapy.compat.raw( + ICMPv6Unknown(scapy.compat.raw(packet[l3].payload)).msgbody + ) udp_or_icmp = packet[l3].payload - payload_info = self.payload_to_info(data) + data_obj = Raw(data) + # FIXME: make framework believe we are on object + payload_info = self.payload_to_info(data_obj) packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_ip_sw_if_index) next_info = self.get_next_packet_info_for_interface2( - payload_info.src, dst_ip_sw_if_index, - last_info[payload_info.src]) + payload_info.src, dst_ip_sw_if_index, last_info[payload_info.src] + ) last_info[payload_info.src] = next_info self.assertTrue(next_info is not None) self.assertEqual(packet_index, next_info.index) @@ -342,207 +380,266 @@ class TestACLpluginL2L3(VppTestCase): if l4 == UDP: self.assertEqual(udp_or_icmp.sport, saved_packet[l4].sport) self.assertEqual(udp_or_icmp.dport, saved_packet[l4].dport) - else: - print("Saved packet is none") # self.assertEqual(ip.dst, host.ip4) # UDP: - def applied_acl_shuffle(self, sw_if_index): - # first collect what ACLs are applied and what they look like - r = self.vapi.acl_interface_list_dump(sw_if_index=sw_if_index) - orig_applied_acls = r[0] - - # we will collect these just to save and generate additional rulesets - orig_acls = [] - for acl_num in orig_applied_acls.acls: - rr = self.vapi.acl_dump(acl_num) - orig_acls.append(rr[0]) + def applied_acl_shuffle(self, acl_if): + saved_n_input = acl_if.n_input + # TOTO: maybe copy each one?? + saved_acls = acl_if.acls # now create a list of all the rules in all ACLs all_rules = [] - for old_acl in orig_acls: - for rule in old_acl.r: - all_rules.append(dict(rule._asdict())) + for old_acl in saved_acls: + for rule in old_acl.rules: + all_rules.append(rule) # Add a few ACLs made from shuffled rules shuffle(all_rules) - reply = self.vapi.acl_add_replace(acl_index=4294967295, - r=all_rules[::2], - tag=b"shuffle 1. acl") - shuffle_acl_1 = reply.acl_index + acl1 = VppAcl(self, rules=all_rules[::2], tag="shuffle 1. acl") + acl1.add_vpp_config() + shuffle(all_rules) - reply = self.vapi.acl_add_replace(acl_index=4294967295, - r=all_rules[::3], - tag=b"shuffle 2. acl") - shuffle_acl_2 = reply.acl_index + acl2 = VppAcl(self, rules=all_rules[::3], tag="shuffle 2. acl") + acl2.add_vpp_config() + shuffle(all_rules) - reply = self.vapi.acl_add_replace(acl_index=4294967295, - r=all_rules[::2], - tag=b"shuffle 3. acl") - shuffle_acl_3 = reply.acl_index + acl3 = VppAcl(self, rules=all_rules[::2], tag="shuffle 3. acl") + acl3.add_vpp_config() # apply the shuffle ACLs in front - input_acls = [shuffle_acl_1, shuffle_acl_2] - output_acls = [shuffle_acl_1, shuffle_acl_2] + input_acls = [acl1, acl2] + output_acls = [acl1, acl2] # add the currently applied ACLs - n_input = orig_applied_acls.n_input - input_acls.extend(orig_applied_acls.acls[:n_input]) - output_acls.extend(orig_applied_acls.acls[n_input:]) + n_input = acl_if.n_input + input_acls.extend(saved_acls[:n_input]) + output_acls.extend(saved_acls[n_input:]) # and the trailing shuffle ACL(s) - input_acls.extend([shuffle_acl_3]) - output_acls.extend([shuffle_acl_3]) + input_acls.extend([acl3]) + output_acls.extend([acl3]) # set the interface ACL list to the result - self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index, - n_input=len(input_acls), - acls=input_acls + output_acls) + acl_if.n_input = len(input_acls) + acl_if.acls = input_acls + output_acls + acl_if.add_vpp_config() + # change the ACLs a few times for i in range(1, 10): shuffle(all_rules) - reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_1, - r=all_rules[::1+(i % 2)], - tag=b"shuffle 1. acl") + acl1.modify_vpp_config(all_rules[:: 1 + (i % 2)]) + shuffle(all_rules) - reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2, - r=all_rules[::1+(i % 3)], - tag=b"shuffle 2. acl") + acl2.modify_vpp_config(all_rules[:: 1 + (i % 3)]) + shuffle(all_rules) - reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2, - r=all_rules[::1+(i % 5)], - tag=b"shuffle 3. acl") + acl3.modify_vpp_config(all_rules[:: 1 + (i % 5)]) # restore to how it was before and clean up - self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index, - n_input=orig_applied_acls.n_input, - acls=orig_applied_acls.acls) - reply = self.vapi.acl_del(acl_index=shuffle_acl_1) - reply = self.vapi.acl_del(acl_index=shuffle_acl_2) - reply = self.vapi.acl_del(acl_index=shuffle_acl_3) - - def create_acls_for_a_stream(self, stream_dict, - test_l2_action, is_reflect): - r = stream_dict['rules'] - r_permit = stream_dict['permit_rules'] - r_permit_reflect = stream_dict['permit_and_reflect_rules'] + acl_if.n_input = saved_n_input + acl_if.acls = saved_acls + acl_if.add_vpp_config() + + acl1.remove_vpp_config() + acl2.remove_vpp_config() + acl3.remove_vpp_config() + + def create_acls_for_a_stream(self, stream_dict, test_l2_action, is_reflect): + r = stream_dict["rules"] + r_permit = stream_dict["permit_rules"] + r_permit_reflect = stream_dict["permit_and_reflect_rules"] r_action = r_permit_reflect if is_reflect else r - reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_action, - tag=b"act. acl") - action_acl_index = reply.acl_index - reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_permit, - tag=b"perm. acl") - permit_acl_index = reply.acl_index - return {'L2': action_acl_index if test_l2_action else permit_acl_index, - 'L3': permit_acl_index if test_l2_action else action_acl_index, - 'permit': permit_acl_index, 'action': action_acl_index} - - def apply_acl_ip46_x_to_y(self, bridged_to_routed, test_l2_deny, - is_ip6, is_reflect, add_eh): - """ Apply the ACLs - """ + action_acl = VppAcl(self, rules=r_action, tag="act. acl") + action_acl.add_vpp_config() + permit_acl = VppAcl(self, rules=r_permit, tag="perm. acl") + permit_acl.add_vpp_config() + + return { + "L2": action_acl if test_l2_action else permit_acl, + "L3": permit_acl if test_l2_action else action_acl, + "permit": permit_acl, + "action": action_acl, + } + + def apply_acl_ip46_x_to_y( + self, bridged_to_routed, test_l2_deny, is_ip6, is_reflect, add_eh + ): + """Apply the ACLs""" self.reset_packet_infos() stream_dict = self.create_stream( - self.pg2, self.loop0, - bridged_to_routed, - self.pg_if_packet_sizes, is_ip6, - not is_reflect, False, add_eh) - stream = stream_dict['stream'] - acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny, - is_reflect) + self.pg2, + self.loop0, + bridged_to_routed, + self.pg_if_packet_sizes, + is_ip6, + not is_reflect, + False, + add_eh, + ) + stream = stream_dict["stream"] + acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny, is_reflect) n_input_l3 = 0 if bridged_to_routed else 1 n_input_l2 = 1 if bridged_to_routed else 0 - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index, - n_input=n_input_l3, - acls=[acl_idx['L3']]) - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, - n_input=n_input_l2, - acls=[acl_idx['L2']]) - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, - n_input=n_input_l2, - acls=[acl_idx['L2']]) - self.applied_acl_shuffle(self.pg0.sw_if_index) - self.applied_acl_shuffle(self.pg2.sw_if_index) - - def apply_acl_ip46_both_directions_reflect(self, - primary_is_bridged_to_routed, - reflect_on_l2, is_ip6, add_eh, - stateful_icmp): + + acl_if_pg2 = VppAclInterface( + self, + sw_if_index=self.pg2.sw_if_index, + n_input=n_input_l3, + acls=[acl_idx["L3"]], + ) + acl_if_pg2.add_vpp_config() + + acl_if_pg0 = VppAclInterface( + self, + sw_if_index=self.pg0.sw_if_index, + n_input=n_input_l2, + acls=[acl_idx["L2"]], + ) + acl_if_pg0.add_vpp_config() + + acl_if_pg1 = VppAclInterface( + self, + sw_if_index=self.pg1.sw_if_index, + n_input=n_input_l2, + acls=[acl_idx["L2"]], + ) + acl_if_pg1.add_vpp_config() + + self.applied_acl_shuffle(acl_if_pg0) + self.applied_acl_shuffle(acl_if_pg1) + return {"L2": acl_idx["L2"], "L3": acl_idx["L3"]} + + def apply_acl_ip46_both_directions_reflect( + self, primary_is_bridged_to_routed, reflect_on_l2, is_ip6, add_eh, stateful_icmp + ): primary_is_routed_to_bridged = not primary_is_bridged_to_routed self.reset_packet_infos() - stream_dict_fwd = self.create_stream(self.pg2, self.loop0, - primary_is_bridged_to_routed, - self.pg_if_packet_sizes, is_ip6, - False, False, add_eh, - stateful_icmp) - acl_idx_fwd = self.create_acls_for_a_stream(stream_dict_fwd, - reflect_on_l2, True) - - stream_dict_rev = self.create_stream(self.pg2, self.loop0, - not primary_is_bridged_to_routed, - self.pg_if_packet_sizes, is_ip6, - True, True, add_eh, stateful_icmp) + stream_dict_fwd = self.create_stream( + self.pg2, + self.loop0, + primary_is_bridged_to_routed, + self.pg_if_packet_sizes, + is_ip6, + False, + False, + add_eh, + stateful_icmp, + ) + acl_idx_fwd = self.create_acls_for_a_stream( + stream_dict_fwd, reflect_on_l2, True + ) + + stream_dict_rev = self.create_stream( + self.pg2, + self.loop0, + not primary_is_bridged_to_routed, + self.pg_if_packet_sizes, + is_ip6, + True, + True, + add_eh, + stateful_icmp, + ) # We want the primary action to be "deny" rather than reflect - acl_idx_rev = self.create_acls_for_a_stream(stream_dict_rev, - reflect_on_l2, False) + acl_idx_rev = self.create_acls_for_a_stream( + stream_dict_rev, reflect_on_l2, False + ) if primary_is_bridged_to_routed: - inbound_l2_acl = acl_idx_fwd['L2'] + inbound_l2_acl = acl_idx_fwd["L2"] else: - inbound_l2_acl = acl_idx_rev['L2'] + inbound_l2_acl = acl_idx_rev["L2"] if primary_is_routed_to_bridged: - outbound_l2_acl = acl_idx_fwd['L2'] + outbound_l2_acl = acl_idx_fwd["L2"] else: - outbound_l2_acl = acl_idx_rev['L2'] + outbound_l2_acl = acl_idx_rev["L2"] if primary_is_routed_to_bridged: - inbound_l3_acl = acl_idx_fwd['L3'] + inbound_l3_acl = acl_idx_fwd["L3"] else: - inbound_l3_acl = acl_idx_rev['L3'] + inbound_l3_acl = acl_idx_rev["L3"] if primary_is_bridged_to_routed: - outbound_l3_acl = acl_idx_fwd['L3'] + outbound_l3_acl = acl_idx_fwd["L3"] else: - outbound_l3_acl = acl_idx_rev['L3'] - - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index, - n_input=1, - acls=[inbound_l3_acl, - outbound_l3_acl]) - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, - n_input=1, - acls=[inbound_l2_acl, - outbound_l2_acl]) - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, - n_input=1, - acls=[inbound_l2_acl, - outbound_l2_acl]) - self.applied_acl_shuffle(self.pg0.sw_if_index) - self.applied_acl_shuffle(self.pg2.sw_if_index) - - def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6, - is_reflect, add_eh): - self.apply_acl_ip46_x_to_y(False, test_l2_deny, is_ip6, - is_reflect, add_eh) - - def apply_acl_ip46_bridged_to_routed(self, test_l2_deny, is_ip6, - is_reflect, add_eh): - self.apply_acl_ip46_x_to_y(True, test_l2_deny, is_ip6, - is_reflect, add_eh) - - def run_traffic_ip46_x_to_y(self, bridged_to_routed, - test_l2_deny, is_ip6, - is_reflect, is_established, add_eh, - stateful_icmp=False): + outbound_l3_acl = acl_idx_rev["L3"] + + acl_if_pg2 = VppAclInterface( + self, + sw_if_index=self.pg2.sw_if_index, + n_input=1, + acls=[inbound_l3_acl, outbound_l3_acl], + ) + acl_if_pg2.add_vpp_config() + + acl_if_pg0 = VppAclInterface( + self, + sw_if_index=self.pg0.sw_if_index, + n_input=1, + acls=[inbound_l2_acl, outbound_l2_acl], + ) + acl_if_pg0.add_vpp_config() + + acl_if_pg1 = VppAclInterface( + self, + sw_if_index=self.pg1.sw_if_index, + n_input=1, + acls=[inbound_l2_acl, outbound_l2_acl], + ) + acl_if_pg1.add_vpp_config() + + self.applied_acl_shuffle(acl_if_pg0) + self.applied_acl_shuffle(acl_if_pg2) + + def apply_acl_ip46_routed_to_bridged( + self, test_l2_deny, is_ip6, is_reflect, add_eh + ): + return self.apply_acl_ip46_x_to_y( + False, test_l2_deny, is_ip6, is_reflect, add_eh + ) + + def apply_acl_ip46_bridged_to_routed( + self, test_l2_deny, is_ip6, is_reflect, add_eh + ): + return self.apply_acl_ip46_x_to_y( + True, test_l2_deny, is_ip6, is_reflect, add_eh + ) + + def verify_acl_packet_count(self, acl_idx, packet_count): + matches = self.statistics.get_counter("/acl/%d/matches" % acl_idx) + self.logger.info("stat seg for ACL %d: %s" % (acl_idx, repr(matches))) + total_count = 0 + for m in matches: + for p in m: + total_count = total_count + p["packets"] + self.assertEqual(total_count, packet_count) + + def run_traffic_ip46_x_to_y( + self, + bridged_to_routed, + test_l2_deny, + is_ip6, + is_reflect, + is_established, + add_eh, + stateful_icmp=False, + ): self.reset_packet_infos() - stream_dict = self.create_stream(self.pg2, self.loop0, - bridged_to_routed, - self.pg_if_packet_sizes, is_ip6, - not is_reflect, is_established, - add_eh, stateful_icmp) - stream = stream_dict['stream'] + stream_dict = self.create_stream( + self.pg2, + self.loop0, + bridged_to_routed, + self.pg_if_packet_sizes, + is_ip6, + not is_reflect, + is_established, + add_eh, + stateful_icmp, + ) + stream = stream_dict["stream"] tx_if = self.pg0 if bridged_to_routed else self.pg2 rx_if = self.pg2 if bridged_to_routed else self.pg0 @@ -553,63 +650,92 @@ class TestACLpluginL2L3(VppTestCase): packet_count = self.get_packet_count_for_if_idx(self.loop0.sw_if_index) rcvd1 = rx_if.get_capture(packet_count) self.verify_capture(self.loop0, self.pg2, rcvd1, bridged_to_routed) - - def run_traffic_ip46_routed_to_bridged(self, test_l2_deny, is_ip6, - is_reflect, is_established, add_eh, - stateful_icmp=False): - self.run_traffic_ip46_x_to_y(False, test_l2_deny, is_ip6, - is_reflect, is_established, add_eh, - stateful_icmp) - - def run_traffic_ip46_bridged_to_routed(self, test_l2_deny, is_ip6, - is_reflect, is_established, add_eh, - stateful_icmp=False): - self.run_traffic_ip46_x_to_y(True, test_l2_deny, is_ip6, - is_reflect, is_established, add_eh, - stateful_icmp) - - def run_test_ip46_routed_to_bridged(self, test_l2_deny, - is_ip6, is_reflect, add_eh): - self.apply_acl_ip46_routed_to_bridged(test_l2_deny, - is_ip6, is_reflect, add_eh) - self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6, - is_reflect, False, add_eh) - - def run_test_ip46_bridged_to_routed(self, test_l2_deny, - is_ip6, is_reflect, add_eh): - self.apply_acl_ip46_bridged_to_routed(test_l2_deny, - is_ip6, is_reflect, add_eh) - self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6, - is_reflect, False, add_eh) - - def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action, - is_ip6, add_eh, - stateful_icmp=False): - self.apply_acl_ip46_both_directions_reflect(False, test_l2_action, - is_ip6, add_eh, - stateful_icmp) - self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6, - True, False, add_eh, - stateful_icmp) - self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6, - False, True, add_eh, - stateful_icmp) - - def run_test_ip46_bridged_to_routed_and_back(self, test_l2_action, - is_ip6, add_eh, - stateful_icmp=False): - self.apply_acl_ip46_both_directions_reflect(True, test_l2_action, - is_ip6, add_eh, - stateful_icmp) - self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6, - True, False, add_eh, - stateful_icmp) - self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6, - False, True, add_eh, - stateful_icmp) + return len(stream) + + def run_traffic_ip46_routed_to_bridged( + self, + test_l2_deny, + is_ip6, + is_reflect, + is_established, + add_eh, + stateful_icmp=False, + ): + return self.run_traffic_ip46_x_to_y( + False, + test_l2_deny, + is_ip6, + is_reflect, + is_established, + add_eh, + stateful_icmp, + ) + + def run_traffic_ip46_bridged_to_routed( + self, + test_l2_deny, + is_ip6, + is_reflect, + is_established, + add_eh, + stateful_icmp=False, + ): + return self.run_traffic_ip46_x_to_y( + True, + test_l2_deny, + is_ip6, + is_reflect, + is_established, + add_eh, + stateful_icmp, + ) + + def run_test_ip46_routed_to_bridged(self, test_l2_deny, is_ip6, is_reflect, add_eh): + acls = self.apply_acl_ip46_routed_to_bridged( + test_l2_deny, is_ip6, is_reflect, add_eh + ) + pkts = self.run_traffic_ip46_routed_to_bridged( + test_l2_deny, is_ip6, is_reflect, False, add_eh + ) + self.verify_acl_packet_count(acls["L3"].acl_index, pkts) + + def run_test_ip46_bridged_to_routed(self, test_l2_deny, is_ip6, is_reflect, add_eh): + acls = self.apply_acl_ip46_bridged_to_routed( + test_l2_deny, is_ip6, is_reflect, add_eh + ) + pkts = self.run_traffic_ip46_bridged_to_routed( + test_l2_deny, is_ip6, is_reflect, False, add_eh + ) + self.verify_acl_packet_count(acls["L2"].acl_index, pkts) + + def run_test_ip46_routed_to_bridged_and_back( + self, test_l2_action, is_ip6, add_eh, stateful_icmp=False + ): + self.apply_acl_ip46_both_directions_reflect( + False, test_l2_action, is_ip6, add_eh, stateful_icmp + ) + self.run_traffic_ip46_routed_to_bridged( + test_l2_action, is_ip6, True, False, add_eh, stateful_icmp + ) + self.run_traffic_ip46_bridged_to_routed( + test_l2_action, is_ip6, False, True, add_eh, stateful_icmp + ) + + def run_test_ip46_bridged_to_routed_and_back( + self, test_l2_action, is_ip6, add_eh, stateful_icmp=False + ): + self.apply_acl_ip46_both_directions_reflect( + True, test_l2_action, is_ip6, add_eh, stateful_icmp + ) + self.run_traffic_ip46_bridged_to_routed( + test_l2_action, is_ip6, True, False, add_eh, stateful_icmp + ) + self.run_traffic_ip46_routed_to_bridged( + test_l2_action, is_ip6, False, True, add_eh, stateful_icmp + ) def test_0000_ip6_irb_1(self): - """ ACL plugin prepare""" + """ACL plugin prepare""" if not self.vpp_dead: cmd = "set acl-plugin session timeout udp idle 2000" self.logger.info(self.vapi.ppcli(cmd)) @@ -628,219 +754,188 @@ class TestACLpluginL2L3(VppTestCase): # "set acl-plugin l2-datapath old")) def test_0001_ip6_irb_1(self): - """ ACL IPv6 routed -> bridged, L2 ACL deny""" - self.run_test_ip46_routed_to_bridged(True, True, False, - self.WITHOUT_EH) + """ACL IPv6 routed -> bridged, L2 ACL deny""" + self.run_test_ip46_routed_to_bridged(True, True, False, self.WITHOUT_EH) def test_0002_ip6_irb_1(self): - """ ACL IPv6 routed -> bridged, L3 ACL deny""" - self.run_test_ip46_routed_to_bridged(False, True, False, - self.WITHOUT_EH) + """ACL IPv6 routed -> bridged, L3 ACL deny""" + self.run_test_ip46_routed_to_bridged(False, True, False, self.WITHOUT_EH) def test_0003_ip4_irb_1(self): - """ ACL IPv4 routed -> bridged, L2 ACL deny""" - self.run_test_ip46_routed_to_bridged(True, False, False, - self.WITHOUT_EH) + """ACL IPv4 routed -> bridged, L2 ACL deny""" + self.run_test_ip46_routed_to_bridged(True, False, False, self.WITHOUT_EH) def test_0004_ip4_irb_1(self): - """ ACL IPv4 routed -> bridged, L3 ACL deny""" - self.run_test_ip46_routed_to_bridged(False, False, False, - self.WITHOUT_EH) + """ACL IPv4 routed -> bridged, L3 ACL deny""" + self.run_test_ip46_routed_to_bridged(False, False, False, self.WITHOUT_EH) def test_0005_ip6_irb_1(self): - """ ACL IPv6 bridged -> routed, L2 ACL deny """ - self.run_test_ip46_bridged_to_routed(True, True, False, - self.WITHOUT_EH) + """ACL IPv6 bridged -> routed, L2 ACL deny""" + self.run_test_ip46_bridged_to_routed(True, True, False, self.WITHOUT_EH) def test_0006_ip6_irb_1(self): - """ ACL IPv6 bridged -> routed, L3 ACL deny """ - self.run_test_ip46_bridged_to_routed(False, True, False, - self.WITHOUT_EH) + """ACL IPv6 bridged -> routed, L3 ACL deny""" + self.run_test_ip46_bridged_to_routed(False, True, False, self.WITHOUT_EH) def test_0007_ip6_irb_1(self): - """ ACL IPv4 bridged -> routed, L2 ACL deny """ - self.run_test_ip46_bridged_to_routed(True, False, False, - self.WITHOUT_EH) + """ACL IPv4 bridged -> routed, L2 ACL deny""" + self.run_test_ip46_bridged_to_routed(True, False, False, self.WITHOUT_EH) def test_0008_ip6_irb_1(self): - """ ACL IPv4 bridged -> routed, L3 ACL deny """ - self.run_test_ip46_bridged_to_routed(False, False, False, - self.WITHOUT_EH) + """ACL IPv4 bridged -> routed, L3 ACL deny""" + self.run_test_ip46_bridged_to_routed(False, False, False, self.WITHOUT_EH) # Stateful ACL tests def test_0101_ip6_irb_1(self): - """ ACL IPv6 routed -> bridged, L2 ACL permit+reflect""" - self.run_test_ip46_routed_to_bridged_and_back(True, True, - self.WITHOUT_EH) + """ACL IPv6 routed -> bridged, L2 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(True, True, self.WITHOUT_EH) def test_0102_ip6_irb_1(self): - """ ACL IPv6 bridged -> routed, L2 ACL permit+reflect""" - self.run_test_ip46_bridged_to_routed_and_back(True, True, - self.WITHOUT_EH) + """ACL IPv6 bridged -> routed, L2 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(True, True, self.WITHOUT_EH) def test_0103_ip6_irb_1(self): - """ ACL IPv4 routed -> bridged, L2 ACL permit+reflect""" - self.run_test_ip46_routed_to_bridged_and_back(True, False, - self.WITHOUT_EH) + """ACL IPv4 routed -> bridged, L2 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(True, False, self.WITHOUT_EH) def test_0104_ip6_irb_1(self): - """ ACL IPv4 bridged -> routed, L2 ACL permit+reflect""" - self.run_test_ip46_bridged_to_routed_and_back(True, False, - self.WITHOUT_EH) + """ACL IPv4 bridged -> routed, L2 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(True, False, self.WITHOUT_EH) def test_0111_ip6_irb_1(self): - """ ACL IPv6 routed -> bridged, L3 ACL permit+reflect""" - self.run_test_ip46_routed_to_bridged_and_back(False, True, - self.WITHOUT_EH) + """ACL IPv6 routed -> bridged, L3 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(False, True, self.WITHOUT_EH) def test_0112_ip6_irb_1(self): - """ ACL IPv6 bridged -> routed, L3 ACL permit+reflect""" - self.run_test_ip46_bridged_to_routed_and_back(False, True, - self.WITHOUT_EH) + """ACL IPv6 bridged -> routed, L3 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(False, True, self.WITHOUT_EH) def test_0113_ip6_irb_1(self): - """ ACL IPv4 routed -> bridged, L3 ACL permit+reflect""" - self.run_test_ip46_routed_to_bridged_and_back(False, False, - self.WITHOUT_EH) + """ACL IPv4 routed -> bridged, L3 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(False, False, self.WITHOUT_EH) def test_0114_ip6_irb_1(self): - """ ACL IPv4 bridged -> routed, L3 ACL permit+reflect""" - self.run_test_ip46_bridged_to_routed_and_back(False, False, - self.WITHOUT_EH) + """ACL IPv4 bridged -> routed, L3 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(False, False, self.WITHOUT_EH) # A block of tests with extension headers def test_1001_ip6_irb_1(self): - """ ACL IPv6+EH routed -> bridged, L2 ACL deny""" - self.run_test_ip46_routed_to_bridged(True, True, False, - self.WITH_EH) + """ACL IPv6+EH routed -> bridged, L2 ACL deny""" + self.run_test_ip46_routed_to_bridged(True, True, False, self.WITH_EH) def test_1002_ip6_irb_1(self): - """ ACL IPv6+EH routed -> bridged, L3 ACL deny""" - self.run_test_ip46_routed_to_bridged(False, True, False, - self.WITH_EH) + """ACL IPv6+EH routed -> bridged, L3 ACL deny""" + self.run_test_ip46_routed_to_bridged(False, True, False, self.WITH_EH) def test_1005_ip6_irb_1(self): - """ ACL IPv6+EH bridged -> routed, L2 ACL deny """ - self.run_test_ip46_bridged_to_routed(True, True, False, - self.WITH_EH) + """ACL IPv6+EH bridged -> routed, L2 ACL deny""" + self.run_test_ip46_bridged_to_routed(True, True, False, self.WITH_EH) def test_1006_ip6_irb_1(self): - """ ACL IPv6+EH bridged -> routed, L3 ACL deny """ - self.run_test_ip46_bridged_to_routed(False, True, False, - self.WITH_EH) + """ACL IPv6+EH bridged -> routed, L3 ACL deny""" + self.run_test_ip46_bridged_to_routed(False, True, False, self.WITH_EH) def test_1101_ip6_irb_1(self): - """ ACL IPv6+EH routed -> bridged, L2 ACL permit+reflect""" - self.run_test_ip46_routed_to_bridged_and_back(True, True, - self.WITH_EH) + """ACL IPv6+EH routed -> bridged, L2 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(True, True, self.WITH_EH) def test_1102_ip6_irb_1(self): - """ ACL IPv6+EH bridged -> routed, L2 ACL permit+reflect""" - self.run_test_ip46_bridged_to_routed_and_back(True, True, - self.WITH_EH) + """ACL IPv6+EH bridged -> routed, L2 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(True, True, self.WITH_EH) def test_1111_ip6_irb_1(self): - """ ACL IPv6+EH routed -> bridged, L3 ACL permit+reflect""" - self.run_test_ip46_routed_to_bridged_and_back(False, True, - self.WITH_EH) + """ACL IPv6+EH routed -> bridged, L3 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(False, True, self.WITH_EH) def test_1112_ip6_irb_1(self): - """ ACL IPv6+EH bridged -> routed, L3 ACL permit+reflect""" - self.run_test_ip46_bridged_to_routed_and_back(False, True, - self.WITH_EH) + """ACL IPv6+EH bridged -> routed, L3 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(False, True, self.WITH_EH) # IPv4 with "MF" bit set def test_1201_ip6_irb_1(self): - """ ACL IPv4+MF routed -> bridged, L2 ACL deny""" - self.run_test_ip46_routed_to_bridged(True, False, False, - self.WITH_EH) + """ACL IPv4+MF routed -> bridged, L2 ACL deny""" + self.run_test_ip46_routed_to_bridged(True, False, False, self.WITH_EH) def test_1202_ip6_irb_1(self): - """ ACL IPv4+MF routed -> bridged, L3 ACL deny""" - self.run_test_ip46_routed_to_bridged(False, False, False, - self.WITH_EH) + """ACL IPv4+MF routed -> bridged, L3 ACL deny""" + self.run_test_ip46_routed_to_bridged(False, False, False, self.WITH_EH) def test_1205_ip6_irb_1(self): - """ ACL IPv4+MF bridged -> routed, L2 ACL deny """ - self.run_test_ip46_bridged_to_routed(True, False, False, - self.WITH_EH) + """ACL IPv4+MF bridged -> routed, L2 ACL deny""" + self.run_test_ip46_bridged_to_routed(True, False, False, self.WITH_EH) def test_1206_ip6_irb_1(self): - """ ACL IPv4+MF bridged -> routed, L3 ACL deny """ - self.run_test_ip46_bridged_to_routed(False, False, False, - self.WITH_EH) + """ACL IPv4+MF bridged -> routed, L3 ACL deny""" + self.run_test_ip46_bridged_to_routed(False, False, False, self.WITH_EH) def test_1301_ip6_irb_1(self): - """ ACL IPv4+MF routed -> bridged, L2 ACL permit+reflect""" - self.run_test_ip46_routed_to_bridged_and_back(True, False, - self.WITH_EH) + """ACL IPv4+MF routed -> bridged, L2 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(True, False, self.WITH_EH) def test_1302_ip6_irb_1(self): - """ ACL IPv4+MF bridged -> routed, L2 ACL permit+reflect""" - self.run_test_ip46_bridged_to_routed_and_back(True, False, - self.WITH_EH) + """ACL IPv4+MF bridged -> routed, L2 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(True, False, self.WITH_EH) def test_1311_ip6_irb_1(self): - """ ACL IPv4+MF routed -> bridged, L3 ACL permit+reflect""" - self.run_test_ip46_routed_to_bridged_and_back(False, False, - self.WITH_EH) + """ACL IPv4+MF routed -> bridged, L3 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(False, False, self.WITH_EH) def test_1312_ip6_irb_1(self): - """ ACL IPv4+MF bridged -> routed, L3 ACL permit+reflect""" - self.run_test_ip46_bridged_to_routed_and_back(False, False, - self.WITH_EH) + """ACL IPv4+MF bridged -> routed, L3 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(False, False, self.WITH_EH) + # Stateful ACL tests with stateful ICMP def test_1401_ip6_irb_1(self): - """ IPv6 routed -> bridged, L2 ACL permit+reflect, ICMP reflect""" - self.run_test_ip46_routed_to_bridged_and_back(True, True, - self.WITHOUT_EH, - self.STATEFUL_ICMP) + """IPv6 routed -> bridged, L2 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_routed_to_bridged_and_back( + True, True, self.WITHOUT_EH, self.STATEFUL_ICMP + ) def test_1402_ip6_irb_1(self): - """ IPv6 bridged -> routed, L2 ACL permit+reflect, ICMP reflect""" - self.run_test_ip46_bridged_to_routed_and_back(True, True, - self.WITHOUT_EH, - self.STATEFUL_ICMP) + """IPv6 bridged -> routed, L2 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_bridged_to_routed_and_back( + True, True, self.WITHOUT_EH, self.STATEFUL_ICMP + ) def test_1403_ip4_irb_1(self): - """ IPv4 routed -> bridged, L2 ACL permit+reflect, ICMP reflect""" - self.run_test_ip46_routed_to_bridged_and_back(True, False, - self.WITHOUT_EH, - self.STATEFUL_ICMP) + """IPv4 routed -> bridged, L2 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_routed_to_bridged_and_back( + True, False, self.WITHOUT_EH, self.STATEFUL_ICMP + ) def test_1404_ip4_irb_1(self): - """ IPv4 bridged -> routed, L2 ACL permit+reflect, ICMP reflect""" - self.run_test_ip46_bridged_to_routed_and_back(True, False, - self.WITHOUT_EH, - self.STATEFUL_ICMP) + """IPv4 bridged -> routed, L2 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_bridged_to_routed_and_back( + True, False, self.WITHOUT_EH, self.STATEFUL_ICMP + ) def test_1411_ip6_irb_1(self): - """ IPv6 routed -> bridged, L3 ACL permit+reflect, ICMP reflect""" - self.run_test_ip46_routed_to_bridged_and_back(False, True, - self.WITHOUT_EH, - self.STATEFUL_ICMP) + """IPv6 routed -> bridged, L3 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_routed_to_bridged_and_back( + False, True, self.WITHOUT_EH, self.STATEFUL_ICMP + ) def test_1412_ip6_irb_1(self): - """ IPv6 bridged -> routed, L3 ACL permit+reflect, ICMP reflect""" - self.run_test_ip46_bridged_to_routed_and_back(False, True, - self.WITHOUT_EH, - self.STATEFUL_ICMP) + """IPv6 bridged -> routed, L3 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_bridged_to_routed_and_back( + False, True, self.WITHOUT_EH, self.STATEFUL_ICMP + ) def test_1413_ip4_irb_1(self): - """ IPv4 routed -> bridged, L3 ACL permit+reflect, ICMP reflect""" - self.run_test_ip46_routed_to_bridged_and_back(False, False, - self.WITHOUT_EH, - self.STATEFUL_ICMP) + """IPv4 routed -> bridged, L3 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_routed_to_bridged_and_back( + False, False, self.WITHOUT_EH, self.STATEFUL_ICMP + ) def test_1414_ip4_irb_1(self): - """ IPv4 bridged -> routed, L3 ACL permit+reflect, ICMP reflect""" - self.run_test_ip46_bridged_to_routed_and_back(False, False, - self.WITHOUT_EH, - self.STATEFUL_ICMP) + """IPv4 bridged -> routed, L3 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_bridged_to_routed_and_back( + False, False, self.WITHOUT_EH, self.STATEFUL_ICMP + ) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)