import unittest
from socket import inet_pton, AF_INET, AF_INET6
-from random import choice
+from random import choice, shuffle
from pprint import pprint
from scapy.packet import Raw
from scapy.layers.inet6 import IPv6ExtHdrFragment
from framework import VppTestCase, VppTestRunner
+from vpp_papi_provider import L2_PORT_TYPE
import time
-class TestIpIrb(VppTestCase):
- """IRB Test Case"""
+class TestACLpluginL2L3(VppTestCase):
+ """TestACLpluginL2L3 Test Case"""
@classmethod
def setUpClass(cls):
#. Loopback BVI interface has remote hosts, one half of hosts are
behind pg0 second behind pg1.
"""
- super(TestIpIrb, cls).setUpClass()
+ super(TestACLpluginL2L3, cls).setUpClass()
cls.pg_if_packet_sizes = [64, 512, 1518, 9018] # packet sizes
cls.bd_id = 10
# create 3 pg interfaces, 1 loopback interface
cls.create_pg_interfaces(range(3))
- cls.create_loopback_interfaces(range(1))
+ cls.create_loopback_interfaces(1)
cls.interfaces = list(cls.pg_interfaces)
cls.interfaces.extend(cls.lo_interfaces)
# Create BD with MAC learning enabled and put interfaces to this BD
cls.vapi.sw_interface_set_l2_bridge(
- cls.loop0.sw_if_index, bd_id=cls.bd_id, bvi=1)
+ cls.loop0.sw_if_index, bd_id=cls.bd_id,
+ port_type=L2_PORT_TYPE.BVI)
cls.vapi.sw_interface_set_l2_bridge(
cls.pg0.sw_if_index, bd_id=cls.bd_id)
cls.vapi.sw_interface_set_l2_bridge(
cls.WITHOUT_EH = False
cls.WITH_EH = True
+ cls.STATELESS_ICMP = False
+ cls.STATEFUL_ICMP = True
# Loopback BVI interface has remote hosts, one half of hosts are behind
# pg0 second behind pg1
``show l2fib verbose``,``show bridge-domain <bd_id> detail``,
``show ip arp``.
"""
- super(TestIpIrb, self).tearDown()
+ super(TestACLpluginL2L3, self).tearDown()
if not self.vpp_dead:
self.logger.info(self.vapi.cli("show l2patch"))
self.logger.info(self.vapi.cli("show classify tables"))
- self.logger.info(self.vapi.cli("show vlib graph"))
self.logger.info(self.vapi.cli("show l2fib verbose"))
self.logger.info(self.vapi.cli("show bridge-domain %s detail" %
self.bd_id))
self.logger.info(self.vapi.cli("show ip arp"))
self.logger.info(self.vapi.cli("show ip6 neighbors"))
- self.logger.info(self.vapi.cli("show acl-plugin sessions"))
-
- def api_acl_add_replace(self, acl_index, r, count, tag="",
- expected_retval=0):
- """Add/replace an ACL
-
- :param int acl_index: ACL index to replace, 4294967295 to create new.
- :param acl_rule r: ACL rules array.
- :param str tag: symbolic tag (description) for this ACL.
- :param int count: number of rules.
- """
- return self.vapi.api(self.vapi.papi.acl_add_replace,
- {'acl_index': acl_index,
- 'r': r,
- 'count': count,
- 'tag': tag
- }, expected_retval=expected_retval)
-
- def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
- expected_retval=0):
- return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
- {'sw_if_index': sw_if_index,
- 'count': count,
- 'n_input': n_input,
- 'acls': acls
- }, expected_retval=expected_retval)
-
- def api_acl_dump(self, acl_index, expected_retval=0):
- return self.vapi.api(self.vapi.papi.acl_dump,
- {'acl_index': acl_index},
- expected_retval=expected_retval)
+ cmd = "show acl-plugin sessions verbose 1"
+ self.logger.info(self.vapi.cli(cmd))
+ self.logger.info(self.vapi.cli("show acl-plugin acl"))
+ self.logger.info(self.vapi.cli("show acl-plugin interface"))
+ self.logger.info(self.vapi.cli("show acl-plugin tables"))
def create_stream(self, src_ip_if, dst_ip_if, reverse, packet_sizes,
is_ip6, expect_blocked, expect_established,
- add_extension_header):
+ add_extension_header, icmp_stateful=False):
pkts = []
rules = []
permit_rules = []
total_packet_count = 8
for i in range(0, total_packet_count):
modulo = (i//2) % 2
- can_reflect_this_packet = (modulo == 0)
+ icmp_type_delta = i % 2
+ icmp_code = i
+ is_udp_packet = (modulo == 0)
+ if is_udp_packet and icmp_stateful:
+ continue
+ is_reflectable_icmp = (icmp_stateful and icmp_type_delta == 0 and
+ not is_udp_packet)
+ is_reflected_icmp = is_reflectable_icmp and expect_established
+ can_reflect_this_packet = is_udp_packet or is_reflectable_icmp
is_permit = i % 2
remote_dst_index = i % len(dst_ip_if.remote_hosts)
remote_dst_host = dst_ip_if.remote_hosts[remote_dst_index]
dst_ip4 = remote_dst_host.ip4
src_l4 = 1234 + i
dst_l4 = 4321 + i
+ if is_reflected_icmp:
+ icmp_type_delta = 1
# default ULP should be something we do not use in tests
ulp_l4 = TCP(sport=src_l4, dport=dst_l4)
# potentially a chain of protocols leading to ULP
ulp = ulp_l4
- if can_reflect_this_packet:
+ if is_udp_packet:
if is_ip6:
ulp_l4 = UDP(sport=src_l4, dport=dst_l4)
if add_extension_header:
Raw(payload))
elif modulo == 1:
if is_ip6:
- ulp_l4 = ICMPv6Unknown(type=128 + (i % 2), code=i % 2)
+ ulp_l4 = ICMPv6Unknown(type=128 + icmp_type_delta,
+ code=icmp_code)
ulp = ulp_l4
p = (Ether(dst=dst_mac, src=src_mac) /
IPv6(src=src_ip6, dst=dst_ip6) /
ulp /
Raw(payload))
else:
- ulp_l4 = ICMP(type=8 + (i % 2), code=i % 2)
+ ulp_l4 = ICMP(type=8 - 8*icmp_type_delta, code=icmp_code)
ulp = ulp_l4
p = (Ether(dst=dst_mac, src=src_mac) /
IP(src=src_ip4, dst=dst_ip4) /
new_rule_permit_and_reflect['is_permit'] = 2
else:
new_rule_permit_and_reflect['is_permit'] = is_permit
+
permit_and_reflect_rules.append(new_rule_permit_and_reflect)
+ self.logger.info("create_stream pkt#%d: %s" % (i, payload))
return {'stream': pkts,
'rules': rules,
# UDP:
+ def applied_acl_shuffle(self, sw_if_index):
+ # first collect what ACLs are applied and what they look like
+ r = self.vapi.acl_interface_list_dump(sw_if_index=sw_if_index)
+ orig_applied_acls = r[0]
+
+ # we will collect these just to save and generate additional rulesets
+ orig_acls = []
+ for acl_num in orig_applied_acls.acls:
+ rr = self.vapi.acl_dump(acl_num)
+ orig_acls.append(rr[0])
+
+ # now create a list of all the rules in all ACLs
+ all_rules = []
+ for old_acl in orig_acls:
+ for rule in old_acl.r:
+ all_rules.append(dict(rule._asdict()))
+
+ # Add a few ACLs made from shuffled rules
+ shuffle(all_rules)
+ reply = self.vapi.acl_add_replace(acl_index=4294967295,
+ r=all_rules[::2],
+ tag="shuffle 1. acl")
+ shuffle_acl_1 = reply.acl_index
+ shuffle(all_rules)
+ reply = self.vapi.acl_add_replace(acl_index=4294967295,
+ r=all_rules[::3],
+ tag="shuffle 2. acl")
+ shuffle_acl_2 = reply.acl_index
+ shuffle(all_rules)
+ reply = self.vapi.acl_add_replace(acl_index=4294967295,
+ r=all_rules[::2],
+ tag="shuffle 3. acl")
+ shuffle_acl_3 = reply.acl_index
+
+ # apply the shuffle ACLs in front
+ input_acls = [shuffle_acl_1, shuffle_acl_2]
+ output_acls = [shuffle_acl_1, shuffle_acl_2]
+
+ # add the currently applied ACLs
+ n_input = orig_applied_acls.n_input
+ input_acls.extend(orig_applied_acls.acls[:n_input])
+ output_acls.extend(orig_applied_acls.acls[n_input:])
+
+ # and the trailing shuffle ACL(s)
+ input_acls.extend([shuffle_acl_3])
+ output_acls.extend([shuffle_acl_3])
+
+ # set the interface ACL list to the result
+ self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
+ n_input=len(input_acls),
+ acls=input_acls + output_acls)
+ # change the ACLs a few times
+ for i in range(1, 10):
+ shuffle(all_rules)
+ reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_1,
+ r=all_rules[::1+(i % 2)],
+ tag="shuffle 1. acl")
+ shuffle(all_rules)
+ reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
+ r=all_rules[::1+(i % 3)],
+ tag="shuffle 2. acl")
+ shuffle(all_rules)
+ reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
+ r=all_rules[::1+(i % 5)],
+ tag="shuffle 3. acl")
+
+ # restore to how it was before and clean up
+ self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
+ n_input=orig_applied_acls.n_input,
+ acls=orig_applied_acls.acls)
+ reply = self.vapi.acl_del(acl_index=shuffle_acl_1)
+ reply = self.vapi.acl_del(acl_index=shuffle_acl_2)
+ reply = self.vapi.acl_del(acl_index=shuffle_acl_3)
+
def create_acls_for_a_stream(self, stream_dict,
test_l2_action, is_reflect):
r = stream_dict['rules']
r_permit = stream_dict['permit_rules']
r_permit_reflect = stream_dict['permit_and_reflect_rules']
r_action = r_permit_reflect if is_reflect else r
- reply = self.api_acl_add_replace(acl_index=4294967295, r=r_action,
- count=len(r_action), tag="action acl")
+ reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_action,
+ tag="act. acl")
action_acl_index = reply.acl_index
- reply = self.api_acl_add_replace(acl_index=4294967295, r=r_permit,
- count=len(r_permit), tag="permit acl")
+ reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_permit,
+ tag="perm. acl")
permit_acl_index = reply.acl_index
return {'L2': action_acl_index if test_l2_action else permit_acl_index,
'L3': permit_acl_index if test_l2_action else action_acl_index,
is_reflect)
n_input_l3 = 0 if bridged_to_routed else 1
n_input_l2 = 1 if bridged_to_routed else 0
- self.api_acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
- count=1,
- n_input=n_input_l3,
- acls=[acl_idx['L3']])
- self.api_acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- count=1,
- n_input=n_input_l2,
- acls=[acl_idx['L2']])
- self.api_acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
- count=1,
- n_input=n_input_l2,
- acls=[acl_idx['L2']])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
+ n_input=n_input_l3,
+ acls=[acl_idx['L3']])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+ n_input=n_input_l2,
+ acls=[acl_idx['L2']])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
+ n_input=n_input_l2,
+ acls=[acl_idx['L2']])
+ self.applied_acl_shuffle(self.pg0.sw_if_index)
+ self.applied_acl_shuffle(self.pg2.sw_if_index)
def apply_acl_ip46_both_directions_reflect(self,
primary_is_bridged_to_routed,
- reflect_on_l2, is_ip6, add_eh):
+ reflect_on_l2, is_ip6, add_eh,
+ stateful_icmp):
primary_is_routed_to_bridged = not primary_is_bridged_to_routed
self.reset_packet_infos()
stream_dict_fwd = self.create_stream(self.pg2, self.loop0,
primary_is_bridged_to_routed,
self.pg_if_packet_sizes, is_ip6,
- False, False, add_eh)
+ False, False, add_eh,
+ stateful_icmp)
acl_idx_fwd = self.create_acls_for_a_stream(stream_dict_fwd,
reflect_on_l2, True)
stream_dict_rev = self.create_stream(self.pg2, self.loop0,
not primary_is_bridged_to_routed,
self.pg_if_packet_sizes, is_ip6,
- True, True, add_eh)
+ True, True, add_eh, stateful_icmp)
# We want the primary action to be "deny" rather than reflect
acl_idx_rev = self.create_acls_for_a_stream(stream_dict_rev,
reflect_on_l2, False)
else:
outbound_l3_acl = acl_idx_rev['L3']
- self.api_acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
- count=2,
- n_input=1,
- acls=[inbound_l3_acl,
- outbound_l3_acl])
- self.api_acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- count=2,
- n_input=1,
- acls=[inbound_l2_acl,
- outbound_l2_acl])
- self.api_acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
- count=2,
- n_input=1,
- acls=[inbound_l2_acl,
- outbound_l2_acl])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
+ n_input=1,
+ acls=[inbound_l3_acl,
+ outbound_l3_acl])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+ n_input=1,
+ acls=[inbound_l2_acl,
+ outbound_l2_acl])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
+ n_input=1,
+ acls=[inbound_l2_acl,
+ outbound_l2_acl])
+ self.applied_acl_shuffle(self.pg0.sw_if_index)
+ self.applied_acl_shuffle(self.pg2.sw_if_index)
def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
is_reflect, add_eh):
def run_traffic_ip46_x_to_y(self, bridged_to_routed,
test_l2_deny, is_ip6,
- is_reflect, is_established, add_eh):
+ is_reflect, is_established, add_eh,
+ stateful_icmp=False):
self.reset_packet_infos()
stream_dict = self.create_stream(self.pg2, self.loop0,
bridged_to_routed,
self.pg_if_packet_sizes, is_ip6,
not is_reflect, is_established,
- add_eh)
+ add_eh, stateful_icmp)
stream = stream_dict['stream']
tx_if = self.pg0 if bridged_to_routed else self.pg2
self.verify_capture(self.loop0, self.pg2, rcvd1, bridged_to_routed)
def run_traffic_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
- is_reflect, is_established, add_eh):
+ is_reflect, is_established, add_eh,
+ stateful_icmp=False):
self.run_traffic_ip46_x_to_y(False, test_l2_deny, is_ip6,
- is_reflect, is_established, add_eh)
+ is_reflect, is_established, add_eh,
+ stateful_icmp)
def run_traffic_ip46_bridged_to_routed(self, test_l2_deny, is_ip6,
- is_reflect, is_established, add_eh):
+ is_reflect, is_established, add_eh,
+ stateful_icmp=False):
self.run_traffic_ip46_x_to_y(True, test_l2_deny, is_ip6,
- is_reflect, is_established, add_eh)
+ is_reflect, is_established, add_eh,
+ stateful_icmp)
def run_test_ip46_routed_to_bridged(self, test_l2_deny,
is_ip6, is_reflect, add_eh):
is_reflect, False, add_eh)
def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action,
- is_ip6, add_eh):
+ is_ip6, add_eh,
+ stateful_icmp=False):
self.apply_acl_ip46_both_directions_reflect(False, test_l2_action,
- is_ip6, add_eh)
+ is_ip6, add_eh,
+ stateful_icmp)
self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6,
- True, False, add_eh)
+ True, False, add_eh,
+ stateful_icmp)
self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6,
- False, True, add_eh)
+ False, True, add_eh,
+ stateful_icmp)
def run_test_ip46_bridged_to_routed_and_back(self, test_l2_action,
- is_ip6, add_eh):
+ is_ip6, add_eh,
+ stateful_icmp=False):
self.apply_acl_ip46_both_directions_reflect(True, test_l2_action,
- is_ip6, add_eh)
+ is_ip6, add_eh,
+ stateful_icmp)
self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6,
- True, False, add_eh)
+ True, False, add_eh,
+ stateful_icmp)
self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6,
- False, True, add_eh)
+ False, True, add_eh,
+ stateful_icmp)
def test_0000_ip6_irb_1(self):
""" ACL plugin prepare"""
""" ACL IPv4+MF bridged -> routed, L3 ACL permit+reflect"""
self.run_test_ip46_bridged_to_routed_and_back(False, False,
self.WITH_EH)
+ # Stateful ACL tests with stateful ICMP
+
+ def test_1401_ip6_irb_1(self):
+ """ IPv6 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
+ self.run_test_ip46_routed_to_bridged_and_back(True, True,
+ self.WITHOUT_EH,
+ self.STATEFUL_ICMP)
+
+ def test_1402_ip6_irb_1(self):
+ """ IPv6 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
+ self.run_test_ip46_bridged_to_routed_and_back(True, True,
+ self.WITHOUT_EH,
+ self.STATEFUL_ICMP)
+
+ def test_1403_ip4_irb_1(self):
+ """ IPv4 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
+ self.run_test_ip46_routed_to_bridged_and_back(True, False,
+ self.WITHOUT_EH,
+ self.STATEFUL_ICMP)
+
+ def test_1404_ip4_irb_1(self):
+ """ IPv4 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
+ self.run_test_ip46_bridged_to_routed_and_back(True, False,
+ self.WITHOUT_EH,
+ self.STATEFUL_ICMP)
+
+ def test_1411_ip6_irb_1(self):
+ """ IPv6 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
+ self.run_test_ip46_routed_to_bridged_and_back(False, True,
+ self.WITHOUT_EH,
+ self.STATEFUL_ICMP)
+
+ def test_1412_ip6_irb_1(self):
+ """ IPv6 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
+ self.run_test_ip46_bridged_to_routed_and_back(False, True,
+ self.WITHOUT_EH,
+ self.STATEFUL_ICMP)
+
+ def test_1413_ip4_irb_1(self):
+ """ IPv4 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
+ self.run_test_ip46_routed_to_bridged_and_back(False, False,
+ self.WITHOUT_EH,
+ self.STATEFUL_ICMP)
+
+ def test_1414_ip4_irb_1(self):
+ """ IPv4 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
+ self.run_test_ip46_bridged_to_routed_and_back(False, False,
+ self.WITHOUT_EH,
+ self.STATEFUL_ICMP)
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)