from scapy.layers.inet6 import IPv6ExtHdrFragment
from framework import VppTestCase, VppTestRunner
-from vpp_papi_provider import L2_PORT_TYPE
+from vpp_l2 import L2_PORT_TYPE
import time
half = cls.remote_hosts_count // 2
cls.pg0.remote_hosts = cls.loop0.remote_hosts[:half]
cls.pg1.remote_hosts = cls.loop0.remote_hosts[half:]
+ reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=1)
+
+ @classmethod
+ def tearDownClass(cls):
+ reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=0)
+ super(TestACLpluginL2L3, cls).tearDownClass()
def tearDown(self):
"""Run standard test teardown and log ``show l2patch``,
``show ip arp``.
"""
super(TestACLpluginL2L3, self).tearDown()
- if not self.vpp_dead:
- self.logger.info(self.vapi.cli("show l2patch"))
- self.logger.info(self.vapi.cli("show classify tables"))
- self.logger.info(self.vapi.cli("show l2fib verbose"))
- self.logger.info(self.vapi.cli("show bridge-domain %s detail" %
- self.bd_id))
- self.logger.info(self.vapi.cli("show ip arp"))
- self.logger.info(self.vapi.cli("show ip6 neighbors"))
- cmd = "show acl-plugin sessions verbose 1"
- self.logger.info(self.vapi.cli(cmd))
- self.logger.info(self.vapi.cli("show acl-plugin acl"))
- self.logger.info(self.vapi.cli("show acl-plugin interface"))
- self.logger.info(self.vapi.cli("show acl-plugin tables"))
+
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.cli("show l2patch"))
+ self.logger.info(self.vapi.cli("show classify tables"))
+ self.logger.info(self.vapi.cli("show l2fib verbose"))
+ self.logger.info(self.vapi.cli("show bridge-domain %s detail" %
+ self.bd_id))
+ self.logger.info(self.vapi.cli("show ip arp"))
+ self.logger.info(self.vapi.cli("show ip6 neighbors"))
+ cmd = "show acl-plugin sessions verbose 1"
+ self.logger.info(self.vapi.cli(cmd))
+ self.logger.info(self.vapi.cli("show acl-plugin acl"))
+ self.logger.info(self.vapi.cli("show acl-plugin interface"))
+ self.logger.info(self.vapi.cli("show acl-plugin tables"))
def create_stream(self, src_ip_if, dst_ip_if, reverse, packet_sizes,
is_ip6, expect_blocked, expect_established,
acls=[acl_idx['L2']])
self.applied_acl_shuffle(self.pg0.sw_if_index)
self.applied_acl_shuffle(self.pg2.sw_if_index)
+ return {'L2': acl_idx['L2'], 'L3': acl_idx['L3']}
def apply_acl_ip46_both_directions_reflect(self,
primary_is_bridged_to_routed,
def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
is_reflect, add_eh):
- self.apply_acl_ip46_x_to_y(False, test_l2_deny, is_ip6,
- is_reflect, add_eh)
+ return self.apply_acl_ip46_x_to_y(False, test_l2_deny, is_ip6,
+ is_reflect, add_eh)
def apply_acl_ip46_bridged_to_routed(self, test_l2_deny, is_ip6,
is_reflect, add_eh):
- self.apply_acl_ip46_x_to_y(True, test_l2_deny, is_ip6,
- is_reflect, add_eh)
+ return self.apply_acl_ip46_x_to_y(True, test_l2_deny, is_ip6,
+ is_reflect, add_eh)
+
+ def verify_acl_packet_count(self, acl_idx, packet_count):
+ matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
+ self.logger.info("stat seg for ACL %d: %s" % (acl_idx, repr(matches)))
+ total_count = 0
+ for p in matches[0]:
+ total_count = total_count + p['packets']
+ self.assertEqual(total_count, packet_count)
def run_traffic_ip46_x_to_y(self, bridged_to_routed,
test_l2_deny, is_ip6,
packet_count = self.get_packet_count_for_if_idx(self.loop0.sw_if_index)
rcvd1 = rx_if.get_capture(packet_count)
self.verify_capture(self.loop0, self.pg2, rcvd1, bridged_to_routed)
+ return len(stream)
def run_traffic_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
is_reflect, is_established, add_eh,
stateful_icmp=False):
- self.run_traffic_ip46_x_to_y(False, test_l2_deny, is_ip6,
- is_reflect, is_established, add_eh,
- stateful_icmp)
+ return self.run_traffic_ip46_x_to_y(False, test_l2_deny, is_ip6,
+ is_reflect, is_established, add_eh,
+ stateful_icmp)
def run_traffic_ip46_bridged_to_routed(self, test_l2_deny, is_ip6,
is_reflect, is_established, add_eh,
stateful_icmp=False):
- self.run_traffic_ip46_x_to_y(True, test_l2_deny, is_ip6,
- is_reflect, is_established, add_eh,
- stateful_icmp)
+ return self.run_traffic_ip46_x_to_y(True, test_l2_deny, is_ip6,
+ is_reflect, is_established, add_eh,
+ stateful_icmp)
def run_test_ip46_routed_to_bridged(self, test_l2_deny,
is_ip6, is_reflect, add_eh):
- self.apply_acl_ip46_routed_to_bridged(test_l2_deny,
- is_ip6, is_reflect, add_eh)
- self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6,
- is_reflect, False, add_eh)
+ acls = self.apply_acl_ip46_routed_to_bridged(test_l2_deny,
+ is_ip6, is_reflect,
+ add_eh)
+ pkts = self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6,
+ is_reflect, False,
+ add_eh)
+ self.verify_acl_packet_count(acls['L3'], pkts)
def run_test_ip46_bridged_to_routed(self, test_l2_deny,
is_ip6, is_reflect, add_eh):
- self.apply_acl_ip46_bridged_to_routed(test_l2_deny,
- is_ip6, is_reflect, add_eh)
- self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6,
- is_reflect, False, add_eh)
+ acls = self.apply_acl_ip46_bridged_to_routed(test_l2_deny,
+ is_ip6, is_reflect,
+ add_eh)
+ pkts = self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6,
+ is_reflect, False,
+ add_eh)
+ self.verify_acl_packet_count(acls['L2'], pkts)
def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action,
is_ip6, add_eh,