from framework import VppTestCase, VppTestRunner
from util import Host, ppp
+from vpp_lo_interface import VppLoInterface
+
class TestACLplugin(VppTestCase):
""" ACL plugin Test Case """
"""
super(TestACLplugin, cls).setUpClass()
- random.seed()
-
try:
# Create 2 pg interfaces
cls.create_pg_interfaces(range(2))
cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
learn=1)
for pg_if in cls.pg_interfaces:
- cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
- bd_id=cls.bd_id)
+ cls.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
# Set up all interfaces
for i in cls.pg_interfaces:
# warm-up the mac address tables
# self.warmup_test()
+ count = 16
+ start = 0
+ n_int = len(cls.pg_interfaces)
+ macs_per_if = count / n_int
+ i = -1
+ for pg_if in cls.pg_interfaces:
+ i += 1
+ start_nr = macs_per_if * i + start
+ end_nr = count + start if i == (n_int - 1) \
+ else macs_per_if * (i + 1) + start
+ hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
+ for j in range(start_nr, end_nr):
+ host = Host(
+ "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
+ "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
+ "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
+ hosts.append(host)
except Exception:
super(TestACLplugin, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(TestACLplugin, cls).tearDownClass()
+
def setUp(self):
super(TestACLplugin, self).setUp()
self.reset_packet_infos()
Show various debug prints after each test.
"""
super(TestACLplugin, self).tearDown()
- if not self.vpp_dead:
- self.logger.info(self.vapi.ppcli("show l2fib verbose"))
- self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
- self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
- self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
- self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
- % self.bd_id))
-
- def create_hosts(self, count, start=0):
- """
- Create required number of host MAC addresses and distribute them among
- interfaces. Create host IPv4 address for every host MAC address.
- :param int count: Number of hosts to create MAC/IPv4 addresses for.
- :param int start: Number to start numbering from.
- """
- n_int = len(self.pg_interfaces)
- macs_per_if = count / n_int
- i = -1
- for pg_if in self.pg_interfaces:
- i += 1
- start_nr = macs_per_if * i + start
- end_nr = count + start if i == (n_int - 1) \
- else macs_per_if * (i + 1) + start
- hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
- for j in range(start_nr, end_nr):
- host = Host(
- "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
- "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
- "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
- hosts.append(host)
+ def show_commands_at_teardown(self):
+ cli = "show vlib graph l2-input-feat-arc"
+ self.logger.info(self.vapi.ppcli(cli))
+ cli = "show vlib graph l2-input-feat-arc-end"
+ self.logger.info(self.vapi.ppcli(cli))
+ cli = "show vlib graph l2-output-feat-arc"
+ self.logger.info(self.vapi.ppcli(cli))
+ cli = "show vlib graph l2-output-feat-arc-end"
+ self.logger.info(self.vapi.ppcli(cli))
+ self.logger.info(self.vapi.ppcli("show l2fib verbose"))
+ self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
+ self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
+ self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
+ self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
+ % self.bd_id))
def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
s_prefix=0, s_ip='\x00\x00\x00\x00',
'dst_ip_addr': d_ip})
return rule
- def apply_rules(self, rules, tag=''):
+ def apply_rules(self, rules, tag=b''):
reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
tag=tag)
self.logger.info("Dumped ACL: " + str(
acls=[reply.acl_index])
return
+ def apply_rules_to(self, rules, tag=b'', sw_if_index=0xFFFFFFFF):
+ reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
+ tag=tag)
+ self.logger.info("Dumped ACL: " + str(
+ self.vapi.acl_dump(reply.acl_index)))
+ # Apply a ACL on the interface as inbound
+ self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
+ n_input=1,
+ acls=[reply.acl_index])
+ return
+
+ def etype_whitelist(self, whitelist, n_input):
+ # Apply whitelists on all the interfaces
+ for i in self.pg_interfaces:
+ # checkstyle can't read long names. Help them.
+ fun = self.vapi.acl_interface_set_etype_whitelist
+ fun(sw_if_index=i.sw_if_index, n_input=n_input,
+ whitelist=whitelist)
+ return
+
def create_upper_layer(self, packet_index, proto, ports=0):
p = self.proto_map[proto]
if p == 'UDP':
return ''
def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
- proto=-1, ports=0, fragments=False, pkt_raw=True):
+ proto=-1, ports=0, fragments=False,
+ pkt_raw=True, etype=-1):
"""
Create input packet stream for defined interface using hosts or
deleted_hosts list.
pkt_info.proto = proto
payload = self.info_to_payload(pkt_info)
p = Ether(dst=dst_host.mac, src=src_host.mac)
+ if etype > 0:
+ p = Ether(dst=dst_host.mac,
+ src=src_host.mac,
+ type=etype)
if pkt_info.ip:
p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
if fragments:
pkts.append(p)
return pkts
- def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
+ def verify_capture(self, pg_if, capture,
+ traffic_type=0, ip_type=0, etype=-1):
"""
Verify captured input packet stream for defined interface.
last_info[i.sw_if_index] = None
dst_sw_if_index = pg_if.sw_if_index
for packet in capture:
+ if etype > 0:
+ if packet[Ether].type != etype:
+ self.logger.error(ppp("Unexpected ethertype in packet:",
+ packet))
+ else:
+ continue
try:
# Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
if traffic_type == self.ICMP and ip_type == self.IPV6:
payload_info = self.payload_to_info(
- packet[ICMPv6EchoRequest].data)
+ packet[ICMPv6EchoRequest], 'data')
payload = packet[ICMPv6EchoRequest]
else:
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
payload = packet[self.proto_map[payload_info.proto]]
except:
self.logger.error(ppp("Unexpected or invalid packet "
self.pg_start()
def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
- frags=False, pkt_raw=True):
+ frags=False, pkt_raw=True, etype=-1):
# Test
# Create incoming packet streams for packet-generator interfaces
pkts_cnt = 0
if self.flows.__contains__(i):
pkts = self.create_stream(i, self.pg_if_packet_sizes,
traffic_type, ip_type, proto, ports,
- frags, pkt_raw)
+ frags, pkt_raw, etype)
if len(pkts) > 0:
i.add_stream(pkts)
pkts_cnt += len(pkts)
# Enable packet capture and start packet sendingself.IPV
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
+ self.logger.info("sent packets count: %d" % pkts_cnt)
# Verify
# Verify outgoing packet streams per packet-generator interface
capture = dst_if.get_capture(pkts_cnt)
self.logger.info("Verifying capture on interface %s" %
dst_if.name)
- self.verify_capture(dst_if, capture, traffic_type, ip_type)
+ self.verify_capture(dst_if, capture,
+ traffic_type, ip_type, etype)
def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
- ports=0, frags=False):
+ ports=0, frags=False, etype=-1):
# Test
+ pkts_cnt = 0
self.reset_packet_infos()
for i in self.pg_interfaces:
if self.flows.__contains__(i):
pkts = self.create_stream(i, self.pg_if_packet_sizes,
traffic_type, ip_type, proto, ports,
- frags)
+ frags, True, etype)
if len(pkts) > 0:
i.add_stream(pkts)
+ pkts_cnt += len(pkts)
# Enable packet capture and start packet sending
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
+ self.logger.info("sent packets count: %d" % pkts_cnt)
# Verify
# Verify outgoing packet streams per packet-generator interface
def test_0000_warmup_test(self):
""" ACL plugin version check; learn MACs
"""
- self.create_hosts(16)
- self.run_traffic_no_check()
reply = self.vapi.papi.acl_plugin_get_version()
self.assertEqual(reply.major, 1)
self.logger.info("Working with ACL plugin version: %d.%d" % (
# self.assertEqual(reply.minor, 0)
def test_0001_acl_create(self):
- """ ACL create test
+ """ ACL create/delete test
"""
self.logger.info("ACLP_TEST_START_0001")
'srcport_or_icmptype_first': 1234,
'srcport_or_icmptype_last': 1235,
'src_ip_prefix_len': 0,
- 'src_ip_addr': '\x00\x00\x00\x00',
+ 'src_ip_addr': b'\x00\x00\x00\x00',
'dstport_or_icmpcode_first': 1234,
'dstport_or_icmpcode_last': 1234,
- 'dst_ip_addr': '\x00\x00\x00\x00',
+ 'dst_ip_addr': b'\x00\x00\x00\x00',
'dst_ip_prefix_len': 0}]
# Test 1: add a new ACL
reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
- tag="permit 1234")
+ tag=b"permit 1234")
self.assertEqual(reply.retval, 0)
# The very first ACL gets #0
self.assertEqual(reply.acl_index, 0)
+ first_acl = reply.acl_index
rr = self.vapi.acl_dump(reply.acl_index)
self.logger.info("Dumped ACL: " + str(rr))
self.assertEqual(len(rr), 1)
r[i_rule][rule_key])
# Add a deny-1234 ACL
- r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
+ r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
'srcport_or_icmptype_first': 1234,
'srcport_or_icmptype_last': 1235,
'src_ip_prefix_len': 0,
- 'src_ip_addr': '\x00\x00\x00\x00',
+ 'src_ip_addr': b'\x00\x00\x00\x00',
'dstport_or_icmpcode_first': 1234,
'dstport_or_icmpcode_last': 1234,
- 'dst_ip_addr': '\x00\x00\x00\x00',
+ 'dst_ip_addr': b'\x00\x00\x00\x00',
'dst_ip_prefix_len': 0},
{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
'srcport_or_icmptype_first': 0,
'srcport_or_icmptype_last': 0,
'src_ip_prefix_len': 0,
- 'src_ip_addr': '\x00\x00\x00\x00',
+ 'src_ip_addr': b'\x00\x00\x00\x00',
'dstport_or_icmpcode_first': 0,
'dstport_or_icmpcode_last': 0,
- 'dst_ip_addr': '\x00\x00\x00\x00',
- 'dst_ip_prefix_len': 0})
+ 'dst_ip_addr': b'\x00\x00\x00\x00',
+ 'dst_ip_prefix_len': 0}]
reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
- tag="deny 1234;permit all")
+ tag=b"deny 1234;permit all")
self.assertEqual(reply.retval, 0)
# The second ACL gets #1
self.assertEqual(reply.acl_index, 1)
+ second_acl = reply.acl_index
# Test 2: try to modify a nonexistent ACL
reply = self.vapi.acl_add_replace(acl_index=432, r=r,
- tag="FFFF:FFFF", expected_retval=-1)
- self.assertEqual(reply.retval, -1)
+ tag=b"FFFF:FFFF", expected_retval=-6)
+ self.assertEqual(reply.retval, -6)
# The ACL number should pass through
self.assertEqual(reply.acl_index, 432)
+ # apply an ACL on an interface inbound, try to delete ACL, must fail
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+ n_input=1,
+ acls=[first_acl])
+ reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
+ # Unapply an ACL and then try to delete it - must be ok
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+ n_input=0,
+ acls=[])
+ reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
+
+ # apply an ACL on an interface outbound, try to delete ACL, must fail
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+ n_input=0,
+ acls=[second_acl])
+ reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
+ # Unapply the ACL and then try to delete it - must be ok
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+ n_input=0,
+ acls=[])
+ reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
+
+ # try to apply a nonexistent ACL - must fail
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+ n_input=1,
+ acls=[first_acl],
+ expected_retval=-6)
self.logger.info("ACLP_TEST_FINISH_0001")
0, self.proto[self.IP][self.TCP]))
# Apply rules
- self.apply_rules(rules, "permit per-flow")
+ self.apply_rules(rules, b"permit per-flow")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, -1)
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "deny per-flow;permit all")
+ self.apply_rules(rules, b"deny per-flow;permit all")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPV4,
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit icmpv4")
+ self.apply_rules(rules, b"permit icmpv4")
# Traffic should still pass
self.run_verify_test(self.ICMP, self.IPV4,
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit icmpv6")
+ self.apply_rules(rules, b"permit icmpv6")
# Traffic should still pass
self.run_verify_test(self.ICMP, self.IPV6,
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "deny icmpv4")
+ self.apply_rules(rules, b"deny icmpv4")
# Traffic should not pass
self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "deny icmpv6")
+ self.apply_rules(rules, b"deny icmpv6")
# Traffic should not pass
self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ipv4 tcp")
+ self.apply_rules(rules, b"permit ipv4 tcp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ip6 tcp")
+ self.apply_rules(rules, b"permit ip6 tcp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ipv udp")
+ self.apply_rules(rules, b"permit ipv udp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ip6 udp")
+ self.apply_rules(rules, b"permit ip6 udp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "deny ip4/ip6 tcp")
+ self.apply_rules(rules, b"deny ip4/ip6 tcp")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "deny ip4/ip6 udp")
+ self.apply_rules(rules, b"deny ip4/ip6 udp")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ip4 tcp "+str(port))
+ self.apply_rules(rules, b"permit ip4 tcp %d" % port)
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4,
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ip4 tcp "+str(port))
+ self.apply_rules(rules, b"permit ip4 tcp %d" % port)
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4,
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ip4 tcp "+str(port))
+ self.apply_rules(rules, b"permit ip4 tcp %d" % port)
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6,
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ip4 tcp "+str(port))
+ self.apply_rules(rules, b"permit ip4 tcp %d" % port)
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6,
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
+ self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
+ self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
self.logger.info("ACLP_TEST_FINISH_0020")
def test_0021_udp_deny_port_verify_fragment_deny(self):
- """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
+ """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
+ blocked
"""
self.logger.info("ACLP_TEST_START_0021")
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
+ self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit empty udp ip4 " + str(port))
+ self.apply_rules(rules, b"permit empty udp ip4 %d" % port)
# Traffic should still pass
# Create incoming packet streams for packet-generator interfaces
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit empty udp ip6 "+str(port))
+ self.apply_rules(rules, b"permit empty udp ip6 %d" % port)
# Traffic should still pass
# Create incoming packet streams for packet-generator interfaces
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ipv4 tcp")
+ self.apply_rules(rules, b"permit ipv4 tcp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ip6 tcp")
+ self.apply_rules(rules, b"permit ip6 tcp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ipv4 udp")
+ self.apply_rules(rules, b"permit ipv4 udp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "permit ip6 udp")
+ self.apply_rules(rules, b"permit ip6 udp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "deny ip4/ip6 tcp")
+ self.apply_rules(rules, b"deny ip4/ip6 tcp")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, "deny ip4/ip6 udp")
+ self.apply_rules(rules, b"deny ip4/ip6 udp")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
self.logger.info("ACLP_TEST_FINISH_0113")
+ def test_0300_tcp_permit_v4_etype_aaaa(self):
+ """ permit TCPv4, send 0xAAAA etype
+ """
+ self.logger.info("ACLP_TEST_START_0300")
+
+ # Add an ACL
+ rules = []
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+ self.proto[self.IP][self.TCP]))
+ rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+ self.proto[self.IP][self.TCP]))
+ # deny ip any any in the end
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+ # Apply rules
+ self.apply_rules(rules, b"permit ipv4 tcp")
+
+ # Traffic should still pass also for an odd ethertype
+ self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
+ 0, False, True, 0xaaaa)
+ self.logger.info("ACLP_TEST_FINISH_0300")
+
+ def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
+ """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
+ """
+ self.logger.info("ACLP_TEST_START_0305")
+
+ # Add an ACL
+ rules = []
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+ self.proto[self.IP][self.TCP]))
+ rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+ self.proto[self.IP][self.TCP]))
+ # deny ip any any in the end
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+ # Apply rules
+ self.apply_rules(rules, b"permit ipv4 tcp")
+
+ # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
+ self.etype_whitelist([0xbbb], 1)
+
+ # The oddball ethertype should be blocked
+ self.run_verify_negat_test(self.IP, self.IPV4,
+ self.proto[self.IP][self.TCP],
+ 0, False, 0xaaaa)
+
+ # remove the whitelist
+ self.etype_whitelist([], 0)
+
+ self.logger.info("ACLP_TEST_FINISH_0305")
+
+ def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
+ """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
+ """
+ self.logger.info("ACLP_TEST_START_0306")
+
+ # Add an ACL
+ rules = []
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+ self.proto[self.IP][self.TCP]))
+ rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+ self.proto[self.IP][self.TCP]))
+ # deny ip any any in the end
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+ # Apply rules
+ self.apply_rules(rules, b"permit ipv4 tcp")
+
+ # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
+ self.etype_whitelist([0xbbb], 1)
+
+ # The whitelisted traffic, should pass
+ self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
+ 0, False, True, 0x0bbb)
+
+ # remove the whitelist, the previously blocked 0xAAAA should pass now
+ self.etype_whitelist([], 0)
+
+ self.logger.info("ACLP_TEST_FINISH_0306")
+
+ def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
+ """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
+ """
+ self.logger.info("ACLP_TEST_START_0307")
+
+ # Add an ACL
+ rules = []
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+ self.proto[self.IP][self.TCP]))
+ rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+ self.proto[self.IP][self.TCP]))
+ # deny ip any any in the end
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+ # Apply rules
+ self.apply_rules(rules, b"permit ipv4 tcp")
+
+ # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
+ self.etype_whitelist([0xbbb], 1)
+ # remove the whitelist, the previously blocked 0xAAAA should pass now
+ self.etype_whitelist([], 0)
+
+ # The whitelisted traffic, should pass
+ self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
+ 0, False, True, 0xaaaa)
+
+ self.logger.info("ACLP_TEST_FINISH_0306")
+
+ def test_0315_del_intf(self):
+ """ apply an acl and delete the interface
+ """
+ self.logger.info("ACLP_TEST_START_0315")
+
+ # Add an ACL
+ rules = []
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
+ self.proto[self.IP][self.TCP]))
+ rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
+ self.proto[self.IP][self.TCP]))
+ # deny ip any any in the end
+ rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
+
+ # create an interface
+ intf = []
+ intf.append(VppLoInterface(self))
+
+ # Apply rules
+ self.apply_rules_to(rules, b"permit ipv4 tcp", intf[0].sw_if_index)
+
+ # Remove the interface
+ intf[0].remove_vpp_config()
+
+ self.logger.info("ACLP_TEST_FINISH_0315")
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)