X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_acl_plugin_l2l3.py;h=31b4058fc698c6fcdc4b578c3c0f611cb8a07f2c;hb=053204ab039d34a990ff0e14c32ce3b294fcce0e;hp=73dd473c67f354117fd5fae0d737a63358daa96e;hpb=4a4cea02ef82793232cab4d878baca5cf0134966;p=vpp.git diff --git a/test/test_acl_plugin_l2l3.py b/test/test_acl_plugin_l2l3.py index 73dd473c67f..31b4058fc69 100644 --- a/test/test_acl_plugin_l2l3.py +++ b/test/test_acl_plugin_l2l3.py @@ -28,6 +28,7 @@ from socket import inet_pton, AF_INET, AF_INET6 from random import choice, shuffle from pprint import pprint +import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP, ICMP, TCP @@ -36,7 +37,7 @@ from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting from scapy.layers.inet6 import IPv6ExtHdrFragment from framework import VppTestCase, VppTestRunner -from vpp_papi_provider import L2_PORT_TYPE +from vpp_l2 import L2_PORT_TYPE import time @@ -71,12 +72,12 @@ class TestACLpluginL2L3(VppTestCase): # Create BD with MAC learning enabled and put interfaces to this BD cls.vapi.sw_interface_set_l2_bridge( - cls.loop0.sw_if_index, bd_id=cls.bd_id, + rx_sw_if_index=cls.loop0.sw_if_index, bd_id=cls.bd_id, port_type=L2_PORT_TYPE.BVI) - cls.vapi.sw_interface_set_l2_bridge( - cls.pg0.sw_if_index, bd_id=cls.bd_id) - cls.vapi.sw_interface_set_l2_bridge( - cls.pg1.sw_if_index, bd_id=cls.bd_id) + cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg0.sw_if_index, + bd_id=cls.bd_id) + cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg1.sw_if_index, + bd_id=cls.bd_id) # Configure IPv4 addresses on loopback interface and routed interface cls.loop0.config_ip4() @@ -102,6 +103,12 @@ class TestACLpluginL2L3(VppTestCase): half = cls.remote_hosts_count // 2 cls.pg0.remote_hosts = cls.loop0.remote_hosts[:half] cls.pg1.remote_hosts = cls.loop0.remote_hosts[half:] + reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=1) + + @classmethod + def tearDownClass(cls): + reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=0) + super(TestACLpluginL2L3, cls).tearDownClass() def tearDown(self): """Run standard test teardown and log ``show l2patch``, @@ -109,19 +116,20 @@ class TestACLpluginL2L3(VppTestCase): ``show ip arp``. """ super(TestACLpluginL2L3, self).tearDown() - if not self.vpp_dead: - self.logger.info(self.vapi.cli("show l2patch")) - self.logger.info(self.vapi.cli("show classify tables")) - self.logger.info(self.vapi.cli("show l2fib verbose")) - self.logger.info(self.vapi.cli("show bridge-domain %s detail" % - self.bd_id)) - self.logger.info(self.vapi.cli("show ip arp")) - self.logger.info(self.vapi.cli("show ip6 neighbors")) - cmd = "show acl-plugin sessions verbose 1" - self.logger.info(self.vapi.cli(cmd)) - self.logger.info(self.vapi.cli("show acl-plugin acl")) - self.logger.info(self.vapi.cli("show acl-plugin interface")) - self.logger.info(self.vapi.cli("show acl-plugin tables")) + + def show_commands_at_teardown(self): + self.logger.info(self.vapi.cli("show l2patch")) + self.logger.info(self.vapi.cli("show classify tables")) + self.logger.info(self.vapi.cli("show l2fib verbose")) + self.logger.info(self.vapi.cli("show bridge-domain %s detail" % + self.bd_id)) + self.logger.info(self.vapi.cli("show ip arp")) + self.logger.info(self.vapi.cli("show ip6 neighbors")) + cmd = "show acl-plugin sessions verbose 1" + self.logger.info(self.vapi.cli(cmd)) + self.logger.info(self.vapi.cli("show acl-plugin acl")) + self.logger.info(self.vapi.cli("show acl-plugin interface")) + self.logger.info(self.vapi.cli("show acl-plugin tables")) def create_stream(self, src_ip_if, dst_ip_if, reverse, packet_sizes, is_ip6, expect_blocked, expect_established, @@ -292,7 +300,6 @@ class TestACLpluginL2L3(VppTestCase): last_info[i.sw_if_index] = None dst_ip_sw_if_index = dst_ip_if.sw_if_index - return for packet in capture: l3 = IP if packet.haslayer(IP) else IPv6 @@ -308,14 +315,18 @@ class TestACLpluginL2L3(VppTestCase): # Scapy IPv6 stuff is too smart for its own good. # So we do this and coerce the ICMP into unknown type if packet.haslayer(UDP): - data = str(packet[UDP][Raw]) + data = scapy.compat.raw(packet[UDP][Raw]) else: if l3 == IP: - data = str(ICMP(str(packet[l3].payload))[Raw]) + data = scapy.compat.raw(ICMP( + scapy.compat.raw(packet[l3].payload))[Raw]) else: - data = str(ICMPv6Unknown(str(packet[l3].payload)).msgbody) + data = scapy.compat.raw(ICMPv6Unknown( + scapy.compat.raw(packet[l3].payload)).msgbody) udp_or_icmp = packet[l3].payload - payload_info = self.payload_to_info(data) + data_obj = Raw(data) + # FIXME: make framework believe we are on object + payload_info = self.payload_to_info(data_obj) packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_ip_sw_if_index) @@ -342,8 +353,6 @@ class TestACLpluginL2L3(VppTestCase): if l4 == UDP: self.assertEqual(udp_or_icmp.sport, saved_packet[l4].sport) self.assertEqual(udp_or_icmp.dport, saved_packet[l4].dport) - else: - print("Saved packet is none") # self.assertEqual(ip.dst, host.ip4) # UDP: @@ -464,6 +473,7 @@ class TestACLpluginL2L3(VppTestCase): acls=[acl_idx['L2']]) self.applied_acl_shuffle(self.pg0.sw_if_index) self.applied_acl_shuffle(self.pg2.sw_if_index) + return {'L2': acl_idx['L2'], 'L3': acl_idx['L3']} def apply_acl_ip46_both_directions_reflect(self, primary_is_bridged_to_routed, @@ -524,13 +534,21 @@ class TestACLpluginL2L3(VppTestCase): def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6, is_reflect, add_eh): - self.apply_acl_ip46_x_to_y(False, test_l2_deny, is_ip6, - is_reflect, add_eh) + return self.apply_acl_ip46_x_to_y(False, test_l2_deny, is_ip6, + is_reflect, add_eh) def apply_acl_ip46_bridged_to_routed(self, test_l2_deny, is_ip6, is_reflect, add_eh): - self.apply_acl_ip46_x_to_y(True, test_l2_deny, is_ip6, - is_reflect, add_eh) + return self.apply_acl_ip46_x_to_y(True, test_l2_deny, is_ip6, + is_reflect, add_eh) + + def verify_acl_packet_count(self, acl_idx, packet_count): + matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx) + self.logger.info("stat seg for ACL %d: %s" % (acl_idx, repr(matches))) + total_count = 0 + for p in matches[0]: + total_count = total_count + p['packets'] + self.assertEqual(total_count, packet_count) def run_traffic_ip46_x_to_y(self, bridged_to_routed, test_l2_deny, is_ip6, @@ -553,34 +571,41 @@ class TestACLpluginL2L3(VppTestCase): packet_count = self.get_packet_count_for_if_idx(self.loop0.sw_if_index) rcvd1 = rx_if.get_capture(packet_count) self.verify_capture(self.loop0, self.pg2, rcvd1, bridged_to_routed) + return len(stream) def run_traffic_ip46_routed_to_bridged(self, test_l2_deny, is_ip6, is_reflect, is_established, add_eh, stateful_icmp=False): - self.run_traffic_ip46_x_to_y(False, test_l2_deny, is_ip6, - is_reflect, is_established, add_eh, - stateful_icmp) + return self.run_traffic_ip46_x_to_y(False, test_l2_deny, is_ip6, + is_reflect, is_established, add_eh, + stateful_icmp) def run_traffic_ip46_bridged_to_routed(self, test_l2_deny, is_ip6, is_reflect, is_established, add_eh, stateful_icmp=False): - self.run_traffic_ip46_x_to_y(True, test_l2_deny, is_ip6, - is_reflect, is_established, add_eh, - stateful_icmp) + return self.run_traffic_ip46_x_to_y(True, test_l2_deny, is_ip6, + is_reflect, is_established, add_eh, + stateful_icmp) def run_test_ip46_routed_to_bridged(self, test_l2_deny, is_ip6, is_reflect, add_eh): - self.apply_acl_ip46_routed_to_bridged(test_l2_deny, - is_ip6, is_reflect, add_eh) - self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6, - is_reflect, False, add_eh) + acls = self.apply_acl_ip46_routed_to_bridged(test_l2_deny, + is_ip6, is_reflect, + add_eh) + pkts = self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6, + is_reflect, False, + add_eh) + self.verify_acl_packet_count(acls['L3'], pkts) def run_test_ip46_bridged_to_routed(self, test_l2_deny, is_ip6, is_reflect, add_eh): - self.apply_acl_ip46_bridged_to_routed(test_l2_deny, - is_ip6, is_reflect, add_eh) - self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6, - is_reflect, False, add_eh) + acls = self.apply_acl_ip46_bridged_to_routed(test_l2_deny, + is_ip6, is_reflect, + add_eh) + pkts = self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6, + is_reflect, False, + add_eh) + self.verify_acl_packet_count(acls['L2'], pkts) def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action, is_ip6, add_eh,