from scapy.layers.inet6 import IPv6ExtHdrFragment
from framework import VppTestCase, VppTestRunner
-from vpp_papi_provider import L2_PORT_TYPE
+from vpp_l2 import L2_PORT_TYPE
import time
# Create BD with MAC learning enabled and put interfaces to this BD
cls.vapi.sw_interface_set_l2_bridge(
- cls.loop0.sw_if_index, bd_id=cls.bd_id,
+ rx_sw_if_index=cls.loop0.sw_if_index, bd_id=cls.bd_id,
port_type=L2_PORT_TYPE.BVI)
- cls.vapi.sw_interface_set_l2_bridge(
- cls.pg0.sw_if_index, bd_id=cls.bd_id)
- cls.vapi.sw_interface_set_l2_bridge(
- cls.pg1.sw_if_index, bd_id=cls.bd_id)
+ cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg0.sw_if_index,
+ bd_id=cls.bd_id)
+ cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg1.sw_if_index,
+ bd_id=cls.bd_id)
# Configure IPv4 addresses on loopback interface and routed interface
cls.loop0.config_ip4()
half = cls.remote_hosts_count // 2
cls.pg0.remote_hosts = cls.loop0.remote_hosts[:half]
cls.pg1.remote_hosts = cls.loop0.remote_hosts[half:]
+ reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=1)
+
+ @classmethod
+ def tearDownClass(cls):
+ reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=0)
+ super(TestACLpluginL2L3, cls).tearDownClass()
def tearDown(self):
"""Run standard test teardown and log ``show l2patch``,
``show ip arp``.
"""
super(TestACLpluginL2L3, self).tearDown()
- if not self.vpp_dead:
- self.logger.info(self.vapi.cli("show l2patch"))
- self.logger.info(self.vapi.cli("show classify tables"))
- self.logger.info(self.vapi.cli("show l2fib verbose"))
- self.logger.info(self.vapi.cli("show bridge-domain %s detail" %
- self.bd_id))
- self.logger.info(self.vapi.cli("show ip arp"))
- self.logger.info(self.vapi.cli("show ip6 neighbors"))
- cmd = "show acl-plugin sessions verbose 1"
- self.logger.info(self.vapi.cli(cmd))
- self.logger.info(self.vapi.cli("show acl-plugin acl"))
- self.logger.info(self.vapi.cli("show acl-plugin interface"))
- self.logger.info(self.vapi.cli("show acl-plugin tables"))
+
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.cli("show l2patch"))
+ self.logger.info(self.vapi.cli("show classify tables"))
+ self.logger.info(self.vapi.cli("show l2fib verbose"))
+ self.logger.info(self.vapi.cli("show bridge-domain %s detail" %
+ self.bd_id))
+ self.logger.info(self.vapi.cli("show ip arp"))
+ self.logger.info(self.vapi.cli("show ip6 neighbors"))
+ cmd = "show acl-plugin sessions verbose 1"
+ self.logger.info(self.vapi.cli(cmd))
+ self.logger.info(self.vapi.cli("show acl-plugin acl"))
+ self.logger.info(self.vapi.cli("show acl-plugin interface"))
+ self.logger.info(self.vapi.cli("show acl-plugin tables"))
def create_stream(self, src_ip_if, dst_ip_if, reverse, packet_sizes,
is_ip6, expect_blocked, expect_established,
last_info[i.sw_if_index] = None
dst_ip_sw_if_index = dst_ip_if.sw_if_index
- return
for packet in capture:
l3 = IP if packet.haslayer(IP) else IPv6
data = scapy.compat.raw(ICMPv6Unknown(
scapy.compat.raw(packet[l3].payload)).msgbody)
udp_or_icmp = packet[l3].payload
- payload_info = self.payload_to_info(data)
+ data_obj = Raw(data)
+ # FIXME: make framework believe we are on object
+ payload_info = self.payload_to_info(data_obj)
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
if l4 == UDP:
self.assertEqual(udp_or_icmp.sport, saved_packet[l4].sport)
self.assertEqual(udp_or_icmp.dport, saved_packet[l4].dport)
- else:
- print("Saved packet is none")
# self.assertEqual(ip.dst, host.ip4)
# UDP:
acls=[acl_idx['L2']])
self.applied_acl_shuffle(self.pg0.sw_if_index)
self.applied_acl_shuffle(self.pg2.sw_if_index)
+ return {'L2': acl_idx['L2'], 'L3': acl_idx['L3']}
def apply_acl_ip46_both_directions_reflect(self,
primary_is_bridged_to_routed,
def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
is_reflect, add_eh):
- self.apply_acl_ip46_x_to_y(False, test_l2_deny, is_ip6,
- is_reflect, add_eh)
+ return self.apply_acl_ip46_x_to_y(False, test_l2_deny, is_ip6,
+ is_reflect, add_eh)
def apply_acl_ip46_bridged_to_routed(self, test_l2_deny, is_ip6,
is_reflect, add_eh):
- self.apply_acl_ip46_x_to_y(True, test_l2_deny, is_ip6,
- is_reflect, add_eh)
+ return self.apply_acl_ip46_x_to_y(True, test_l2_deny, is_ip6,
+ is_reflect, add_eh)
+
+ def verify_acl_packet_count(self, acl_idx, packet_count):
+ matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
+ self.logger.info("stat seg for ACL %d: %s" % (acl_idx, repr(matches)))
+ total_count = 0
+ for p in matches[0]:
+ total_count = total_count + p['packets']
+ self.assertEqual(total_count, packet_count)
def run_traffic_ip46_x_to_y(self, bridged_to_routed,
test_l2_deny, is_ip6,
packet_count = self.get_packet_count_for_if_idx(self.loop0.sw_if_index)
rcvd1 = rx_if.get_capture(packet_count)
self.verify_capture(self.loop0, self.pg2, rcvd1, bridged_to_routed)
+ return len(stream)
def run_traffic_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
is_reflect, is_established, add_eh,
stateful_icmp=False):
- self.run_traffic_ip46_x_to_y(False, test_l2_deny, is_ip6,
- is_reflect, is_established, add_eh,
- stateful_icmp)
+ return self.run_traffic_ip46_x_to_y(False, test_l2_deny, is_ip6,
+ is_reflect, is_established, add_eh,
+ stateful_icmp)
def run_traffic_ip46_bridged_to_routed(self, test_l2_deny, is_ip6,
is_reflect, is_established, add_eh,
stateful_icmp=False):
- self.run_traffic_ip46_x_to_y(True, test_l2_deny, is_ip6,
- is_reflect, is_established, add_eh,
- stateful_icmp)
+ return self.run_traffic_ip46_x_to_y(True, test_l2_deny, is_ip6,
+ is_reflect, is_established, add_eh,
+ stateful_icmp)
def run_test_ip46_routed_to_bridged(self, test_l2_deny,
is_ip6, is_reflect, add_eh):
- self.apply_acl_ip46_routed_to_bridged(test_l2_deny,
- is_ip6, is_reflect, add_eh)
- self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6,
- is_reflect, False, add_eh)
+ acls = self.apply_acl_ip46_routed_to_bridged(test_l2_deny,
+ is_ip6, is_reflect,
+ add_eh)
+ pkts = self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6,
+ is_reflect, False,
+ add_eh)
+ self.verify_acl_packet_count(acls['L3'], pkts)
def run_test_ip46_bridged_to_routed(self, test_l2_deny,
is_ip6, is_reflect, add_eh):
- self.apply_acl_ip46_bridged_to_routed(test_l2_deny,
- is_ip6, is_reflect, add_eh)
- self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6,
- is_reflect, False, add_eh)
+ acls = self.apply_acl_ip46_bridged_to_routed(test_l2_deny,
+ is_ip6, is_reflect,
+ add_eh)
+ pkts = self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6,
+ is_reflect, False,
+ add_eh)
+ self.verify_acl_packet_count(acls['L2'], pkts)
def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action,
is_ip6, add_eh,