-#!/usr/bin/env python
+#!/usr/bin/env python3
"""ACL plugin Test Case HLD:
"""
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
from scapy.layers.inet6 import IPv6ExtHdrFragment
from framework import VppTestCase, VppTestRunner
+from framework import tag_fixme_vpp_workers
from util import Host, ppp
+from ipaddress import IPv4Network, IPv6Network
from vpp_lo_interface import VppLoInterface
+from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist
+from vpp_ip import INVALID_INDEX
+@tag_fixme_vpp_workers
class TestACLplugin(VppTestCase):
""" ACL plugin Test Case """
count = 16
start = 0
n_int = len(cls.pg_interfaces)
- macs_per_if = count / n_int
+ macs_per_if = count // n_int
i = -1
for pg_if in cls.pg_interfaces:
i += 1
end_nr = count + start if i == (n_int - 1) \
else macs_per_if * (i + 1) + start
hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
- for j in range(start_nr, end_nr):
+ for j in range(int(start_nr), int(end_nr)):
host = Host(
"00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
"172.17.1%02x.%u" % (pg_if.sw_if_index, j),
Show various debug prints after each test.
"""
super(TestACLplugin, self).tearDown()
- if not self.vpp_dead:
- cli = "show vlib graph l2-input-feat-arc"
- self.logger.info(self.vapi.ppcli(cli))
- cli = "show vlib graph l2-input-feat-arc-end"
- self.logger.info(self.vapi.ppcli(cli))
- cli = "show vlib graph l2-output-feat-arc"
- self.logger.info(self.vapi.ppcli(cli))
- cli = "show vlib graph l2-output-feat-arc-end"
- self.logger.info(self.vapi.ppcli(cli))
- self.logger.info(self.vapi.ppcli("show l2fib verbose"))
- self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
- self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
- self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
- self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
- % self.bd_id))
+
+ def show_commands_at_teardown(self):
+ cli = "show vlib graph l2-input-feat-arc"
+ self.logger.info(self.vapi.ppcli(cli))
+ cli = "show vlib graph l2-input-feat-arc-end"
+ self.logger.info(self.vapi.ppcli(cli))
+ cli = "show vlib graph l2-output-feat-arc"
+ self.logger.info(self.vapi.ppcli(cli))
+ cli = "show vlib graph l2-output-feat-arc-end"
+ self.logger.info(self.vapi.ppcli(cli))
+ self.logger.info(self.vapi.ppcli("show l2fib verbose"))
+ self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
+ self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
+ self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
+ self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
+ % self.bd_id))
def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
- s_prefix=0, s_ip='\x00\x00\x00\x00',
- d_prefix=0, d_ip='\x00\x00\x00\x00'):
- if proto == -1:
- return
- if ports == self.PORTS_ALL:
- sport_from = 0
- dport_from = 0
- sport_to = 65535 if proto != 1 and proto != 58 else 255
- dport_to = sport_to
- elif ports == self.PORTS_RANGE:
- if proto == 1:
- sport_from = self.icmp4_type
- sport_to = self.icmp4_type
- dport_from = self.icmp4_code
- dport_to = self.icmp4_code
- elif proto == 58:
- sport_from = self.icmp6_type
- sport_to = self.icmp6_type
- dport_from = self.icmp6_code
- dport_to = self.icmp6_code
- elif proto == self.proto[self.IP][self.TCP]:
- sport_from = self.tcp_sport_from
- sport_to = self.tcp_sport_to
- dport_from = self.tcp_dport_from
- dport_to = self.tcp_dport_to
- elif proto == self.proto[self.IP][self.UDP]:
- sport_from = self.udp_sport_from
- sport_to = self.udp_sport_to
- dport_from = self.udp_dport_from
- dport_to = self.udp_dport_to
- elif ports == self.PORTS_RANGE_2:
- if proto == 1:
- sport_from = self.icmp4_type_2
- sport_to = self.icmp4_type_2
- dport_from = self.icmp4_code_from_2
- dport_to = self.icmp4_code_to_2
- elif proto == 58:
- sport_from = self.icmp6_type_2
- sport_to = self.icmp6_type_2
- dport_from = self.icmp6_code_from_2
- dport_to = self.icmp6_code_to_2
- elif proto == self.proto[self.IP][self.TCP]:
- sport_from = self.tcp_sport_from_2
- sport_to = self.tcp_sport_to_2
- dport_from = self.tcp_dport_from_2
- dport_to = self.tcp_dport_to_2
- elif proto == self.proto[self.IP][self.UDP]:
- sport_from = self.udp_sport_from_2
- sport_to = self.udp_sport_to_2
- dport_from = self.udp_dport_from_2
- dport_to = self.udp_dport_to_2
+ s_prefix=0, s_ip=0,
+ d_prefix=0, d_ip=0):
+ if ip:
+ src_prefix = IPv6Network((s_ip, s_prefix))
+ dst_prefix = IPv6Network((d_ip, d_prefix))
else:
- sport_from = ports
- sport_to = ports
- dport_from = ports
- dport_to = ports
-
- rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
- 'srcport_or_icmptype_first': sport_from,
- 'srcport_or_icmptype_last': sport_to,
- 'src_ip_prefix_len': s_prefix,
- 'src_ip_addr': s_ip,
- 'dstport_or_icmpcode_first': dport_from,
- 'dstport_or_icmpcode_last': dport_to,
- 'dst_ip_prefix_len': d_prefix,
- 'dst_ip_addr': d_ip})
- return rule
-
- def apply_rules(self, rules, tag=b''):
- reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
- tag=tag)
- self.logger.info("Dumped ACL: " + str(
- self.vapi.acl_dump(reply.acl_index)))
+ src_prefix = IPv4Network((s_ip, s_prefix))
+ dst_prefix = IPv4Network((d_ip, d_prefix))
+ return AclRule(is_permit=permit_deny, ports=ports, proto=proto,
+ src_prefix=src_prefix, dst_prefix=dst_prefix)
+
+ def apply_rules(self, rules, tag=None):
+ acl = VppAcl(self, rules, tag=tag)
+ acl.add_vpp_config()
+ self.logger.info("Dumped ACL: " + str(acl.dump()))
# Apply a ACL on the interface as inbound
for i in self.pg_interfaces:
- self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
- n_input=1,
- acls=[reply.acl_index])
- return
-
- def apply_rules_to(self, rules, tag=b'', sw_if_index=0xFFFFFFFF):
- reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
- tag=tag)
- self.logger.info("Dumped ACL: " + str(
- self.vapi.acl_dump(reply.acl_index)))
+ acl_if = VppAclInterface(
+ self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl])
+ acl_if.add_vpp_config()
+ return acl.acl_index
+
+ def apply_rules_to(self, rules, tag=None, sw_if_index=INVALID_INDEX):
+ acl = VppAcl(self, rules, tag=tag)
+ acl.add_vpp_config()
+ self.logger.info("Dumped ACL: " + str(acl.dump()))
# Apply a ACL on the interface as inbound
- self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
- n_input=1,
- acls=[reply.acl_index])
- return
+ acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1,
+ acls=[acl])
+ return acl.acl_index
- def etype_whitelist(self, whitelist, n_input):
+ def etype_whitelist(self, whitelist, n_input, add=True):
# Apply whitelists on all the interfaces
- for i in self.pg_interfaces:
- # checkstyle can't read long names. Help them.
- fun = self.vapi.acl_interface_set_etype_whitelist
- fun(sw_if_index=i.sw_if_index, n_input=n_input,
- whitelist=whitelist)
- return
+ if add:
+ self._wl = []
+ for i in self.pg_interfaces:
+ self._wl.append(VppEtypeWhitelist(
+ self, sw_if_index=i.sw_if_index, whitelist=whitelist,
+ n_input=n_input).add_vpp_config())
+ else:
+ if hasattr(self, "_wl"):
+ for wl in self._wl:
+ wl.remove_vpp_config()
def create_upper_layer(self, packet_index, proto, ports=0):
p = self.proto_map[proto]
dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
n_int = len(dst_hosts) * len(src_hosts)
for i in range(0, n_int):
- dst_host = dst_hosts[i / len(src_hosts)]
+ dst_host = dst_hosts[int(i / len(src_hosts))]
src_host = src_hosts[i % len(src_hosts)]
pkt_info = self.create_packet_info(src_if, dst_if)
if ipv6 == 1:
"""
self.logger.info("ACLP_TEST_START_0001")
- # Add an ACL
- r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
- 'srcport_or_icmptype_first': 1234,
- 'srcport_or_icmptype_last': 1235,
- 'src_ip_prefix_len': 0,
- 'src_ip_addr': b'\x00\x00\x00\x00',
- 'dstport_or_icmpcode_first': 1234,
- 'dstport_or_icmpcode_last': 1234,
- 'dst_ip_addr': b'\x00\x00\x00\x00',
- 'dst_ip_prefix_len': 0}]
+ # Create a permit-1234 ACL
+ r = [AclRule(is_permit=1, proto=17, ports=1234, sport_to=1235)]
# Test 1: add a new ACL
- reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
- tag=b"permit 1234")
- self.assertEqual(reply.retval, 0)
+ first_acl = VppAcl(self, rules=r, tag="permit 1234")
+ first_acl.add_vpp_config()
+ self.assertTrue(first_acl.query_vpp_config())
# The very first ACL gets #0
- self.assertEqual(reply.acl_index, 0)
- first_acl = reply.acl_index
- rr = self.vapi.acl_dump(reply.acl_index)
+ self.assertEqual(first_acl.acl_index, 0)
+ rr = first_acl.dump()
self.logger.info("Dumped ACL: " + str(rr))
self.assertEqual(len(rr), 1)
# We should have the same number of ACL entries as we had asked
# are different types, we need to iterate over rules and keys to get
# to basic values.
for i_rule in range(0, len(r) - 1):
- for rule_key in r[i_rule]:
+ encoded_rule = r[i_rule].encode()
+ for rule_key in encoded_rule:
self.assertEqual(rr[0].r[i_rule][rule_key],
- r[i_rule][rule_key])
-
- # Add a deny-1234 ACL
- r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
- 'srcport_or_icmptype_first': 1234,
- 'srcport_or_icmptype_last': 1235,
- 'src_ip_prefix_len': 0,
- 'src_ip_addr': b'\x00\x00\x00\x00',
- 'dstport_or_icmpcode_first': 1234,
- 'dstport_or_icmpcode_last': 1234,
- 'dst_ip_addr': b'\x00\x00\x00\x00',
- 'dst_ip_prefix_len': 0},
- {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
- 'srcport_or_icmptype_first': 0,
- 'srcport_or_icmptype_last': 0,
- 'src_ip_prefix_len': 0,
- 'src_ip_addr': b'\x00\x00\x00\x00',
- 'dstport_or_icmpcode_first': 0,
- 'dstport_or_icmpcode_last': 0,
- 'dst_ip_addr': b'\x00\x00\x00\x00',
- 'dst_ip_prefix_len': 0}]
-
- reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
- tag=b"deny 1234;permit all")
- self.assertEqual(reply.retval, 0)
+ encoded_rule[rule_key])
+
+ # Create a deny-1234 ACL
+ r_deny = [AclRule(is_permit=0, proto=17, ports=1234, sport_to=1235),
+ AclRule(is_permit=1, proto=17, ports=0)]
+ second_acl = VppAcl(self, rules=r_deny, tag="deny 1234;permit all")
+ second_acl.add_vpp_config()
+ self.assertTrue(second_acl.query_vpp_config())
# The second ACL gets #1
- self.assertEqual(reply.acl_index, 1)
- second_acl = reply.acl_index
+ self.assertEqual(second_acl.acl_index, 1)
# Test 2: try to modify a nonexistent ACL
- reply = self.vapi.acl_add_replace(acl_index=432, r=r,
- tag=b"FFFF:FFFF", expected_retval=-6)
- self.assertEqual(reply.retval, -6)
- # The ACL number should pass through
- self.assertEqual(reply.acl_index, 432)
+ invalid_acl = VppAcl(self, acl_index=432, rules=r, tag="FFFF:FFFF")
+ reply = invalid_acl.add_vpp_config(expect_error=True)
+
+ # apply an ACL on an interface inbound, try to delete ACL, must fail
+ acl_if_list = VppAclInterface(
+ self, sw_if_index=self.pg0.sw_if_index, n_input=1,
+ acls=[first_acl])
+ acl_if_list.add_vpp_config()
+ first_acl.remove_vpp_config(expect_error=True)
+ # Unapply an ACL and then try to delete it - must be ok
+ acl_if_list.remove_vpp_config()
+ first_acl.remove_vpp_config()
+
# apply an ACL on an interface inbound, try to delete ACL, must fail
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- n_input=1,
- acls=[first_acl])
- reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
+ acl_if_list = VppAclInterface(
+ self, sw_if_index=self.pg0.sw_if_index, n_input=0,
+ acls=[second_acl])
+ acl_if_list.add_vpp_config()
+ second_acl.remove_vpp_config(expect_error=True)
# Unapply an ACL and then try to delete it - must be ok
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- n_input=0,
- acls=[])
- reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
-
- # apply an ACL on an interface outbound, try to delete ACL, must fail
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- n_input=0,
- acls=[second_acl])
- reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
- # Unapply the ACL and then try to delete it - must be ok
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- n_input=0,
- acls=[])
- reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
+ acl_if_list.remove_vpp_config()
+ second_acl.remove_vpp_config()
# try to apply a nonexistent ACL - must fail
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- n_input=1,
- acls=[first_acl],
- expected_retval=-6)
+ acl_if_list = VppAclInterface(
+ self, sw_if_index=self.pg0.sw_if_index, n_input=0,
+ acls=[invalid_acl])
+ acl_if_list.add_vpp_config(expect_error=True)
self.logger.info("ACLP_TEST_FINISH_0001")
rules = []
rules.append(self.create_rule(self.IPV4, self.PERMIT,
- 0, self.proto[self.IP][self.UDP]))
+ 0, self.proto[self.IP][self.UDP]))
rules.append(self.create_rule(self.IPV4, self.PERMIT,
- 0, self.proto[self.IP][self.TCP]))
+ 0, self.proto[self.IP][self.TCP]))
# Apply rules
- self.apply_rules(rules, b"permit per-flow")
+ acl_idx = self.apply_rules(rules, "permit per-flow")
+
+ # enable counters
+ reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, -1)
+
+ matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
+ self.logger.info("stat segment counters: %s" % repr(matches))
+ cli = "show acl-plugin acl"
+ self.logger.info(self.vapi.ppcli(cli))
+ cli = "show acl-plugin tables"
+ self.logger.info(self.vapi.ppcli(cli))
+
+ total_hits = matches[0][0]['packets'] + matches[0][1]['packets']
+ self.assertEqual(total_hits, 64)
+
+ # disable counters
+ reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
+
self.logger.info("ACLP_TEST_FINISH_0002")
def test_0003_acl_deny_apply(self):
self.logger.info("ACLP_TEST_START_0003")
# Add a deny-flows ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY,
- self.PORTS_ALL, self.proto[self.IP][self.UDP]))
+ rules.append(self.create_rule(
+ self.IPV4, self.DENY, self.PORTS_ALL,
+ self.proto[self.IP][self.UDP]))
# Permit ip any any in the end
rules.append(self.create_rule(self.IPV4, self.PERMIT,
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny per-flow;permit all")
+ acl_idx = self.apply_rules(rules, "deny per-flow;permit all")
+
+ # enable counters
+ reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPV4,
self.proto[self.IP][self.UDP])
+
+ matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
+ self.logger.info("stat segment counters: %s" % repr(matches))
+ cli = "show acl-plugin acl"
+ self.logger.info(self.vapi.ppcli(cli))
+ cli = "show acl-plugin tables"
+ self.logger.info(self.vapi.ppcli(cli))
+ self.assertEqual(matches[0][0]['packets'], 64)
+ # disable counters
+ reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
self.logger.info("ACLP_TEST_FINISH_0003")
- # self.assertEqual(1, 0)
+ # self.assertEqual(, 0)
def test_0004_vpp624_permit_icmpv4(self):
""" VPP_624 permit ICMPv4
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit icmpv4")
+ self.apply_rules(rules, "permit icmpv4")
# Traffic should still pass
self.run_verify_test(self.ICMP, self.IPV4,
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit icmpv6")
+ self.apply_rules(rules, "permit icmpv6")
# Traffic should still pass
self.run_verify_test(self.ICMP, self.IPV6,
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny icmpv4")
+ self.apply_rules(rules, "deny icmpv4")
# Traffic should not pass
self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny icmpv6")
+ self.apply_rules(rules, "deny icmpv6")
# Traffic should not pass
self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv4 tcp")
+ self.apply_rules(rules, "permit ipv4 tcp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip6 tcp")
+ self.apply_rules(rules, "permit ip6 tcp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv udp")
+ self.apply_rules(rules, "permit ipv udp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip6 udp")
+ self.apply_rules(rules, "permit ip6 udp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny ip4/ip6 tcp")
+ self.apply_rules(rules, "deny ip4/ip6 tcp")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny ip4/ip6 udp")
+ self.apply_rules(rules, "deny ip4/ip6 udp")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
for i in range(len(r)):
rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
- reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
- result = self.vapi.acl_dump(reply.acl_index)
+ acl = VppAcl(self, rules=rules)
+ acl.add_vpp_config()
+ result = acl.dump()
i = 0
for drules in result:
for dr in drules.r:
- self.assertEqual(dr.is_ipv6, r[i][0])
self.assertEqual(dr.is_permit, r[i][1])
self.assertEqual(dr.proto, r[i][3])
"""
self.logger.info("ACLP_TEST_START_0015")
- port = random.randint(0, 65535)
+ port = random.randint(16384, 65535)
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip4 tcp %d" % port)
+ self.apply_rules(rules, "permit ip4 tcp %d" % port)
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4,
"""
self.logger.info("ACLP_TEST_START_0016")
- port = random.randint(0, 65535)
+ port = random.randint(16384, 65535)
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip4 tcp %d" % port)
+ self.apply_rules(rules, "permit ip4 tcp %d" % port)
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4,
"""
self.logger.info("ACLP_TEST_START_0017")
- port = random.randint(0, 65535)
+ port = random.randint(16384, 65535)
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip4 tcp %d" % port)
+ self.apply_rules(rules, "permit ip4 tcp %d" % port)
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6,
self.logger.info("ACLP_TEST_FINISH_0017")
def test_0018_udp_permit_port_v6(self):
- """ permit single UPPv6
+ """ permit single UDPv6
"""
self.logger.info("ACLP_TEST_START_0018")
- port = random.randint(0, 65535)
+ port = random.randint(16384, 65535)
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip4 tcp %d" % port)
+ self.apply_rules(rules, "permit ip4 tcp %d" % port)
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6,
"""
self.logger.info("ACLP_TEST_START_0019")
- port = random.randint(0, 65535)
+ port = random.randint(16384, 65535)
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.DENY, port,
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
+ self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
"""
self.logger.info("ACLP_TEST_START_0020")
- port = random.randint(0, 65535)
+ port = random.randint(16384, 65535)
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.DENY, port,
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
+ self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
"""
self.logger.info("ACLP_TEST_START_0021")
- port = random.randint(0, 65535)
+ port = random.randint(16384, 65535)
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.DENY, port,
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
+ self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
""" VPP-687 zero length udp ipv4 packet"""
self.logger.info("ACLP_TEST_START_0022")
- port = random.randint(0, 65535)
+ port = random.randint(16384, 65535)
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit empty udp ip4 %d" % port)
+ self.apply_rules(rules, "permit empty udp ip4 %d" % port)
# Traffic should still pass
# Create incoming packet streams for packet-generator interfaces
""" VPP-687 zero length udp ipv6 packet"""
self.logger.info("ACLP_TEST_START_0023")
- port = random.randint(0, 65535)
+ port = random.randint(16384, 65535)
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit empty udp ip6 %d" % port)
+ self.apply_rules(rules, "permit empty udp ip6 %d" % port)
# Traffic should still pass
# Create incoming packet streams for packet-generator interfaces
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv4 tcp")
+ self.apply_rules(rules, "permit ipv4 tcp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip6 tcp")
+ self.apply_rules(rules, "permit ip6 tcp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv4 udp")
+ self.apply_rules(rules, "permit ipv4 udp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip6 udp")
+ self.apply_rules(rules, "permit ip6 udp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny ip4/ip6 tcp")
+ self.apply_rules(rules, "deny ip4/ip6 tcp")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny ip4/ip6 udp")
+ self.apply_rules(rules, "deny ip4/ip6 udp")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv4 tcp")
+ self.apply_rules(rules, "permit ipv4 tcp")
# Traffic should still pass also for an odd ethertype
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv4 tcp")
-
+ self.apply_rules(rules, "permit ipv4 tcp")
# whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
self.etype_whitelist([0xbbb], 1)
0, False, 0xaaaa)
# remove the whitelist
- self.etype_whitelist([], 0)
+ self.etype_whitelist([], 0, add=False)
self.logger.info("ACLP_TEST_FINISH_0305")
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv4 tcp")
-
+ self.apply_rules(rules, "permit ipv4 tcp")
# whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
self.etype_whitelist([0xbbb], 1)
0, False, True, 0x0bbb)
# remove the whitelist, the previously blocked 0xAAAA should pass now
- self.etype_whitelist([], 0)
+ self.etype_whitelist([], 0, add=False)
self.logger.info("ACLP_TEST_FINISH_0306")
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv4 tcp")
+ self.apply_rules(rules, "permit ipv4 tcp")
# whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
self.etype_whitelist([0xbbb], 1)
# remove the whitelist, the previously blocked 0xAAAA should pass now
- self.etype_whitelist([], 0)
+ self.etype_whitelist([], 0, add=False)
# The whitelisted traffic, should pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
intf.append(VppLoInterface(self))
# Apply rules
- self.apply_rules_to(rules, b"permit ipv4 tcp", intf[0].sw_if_index)
+ self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
# Remove the interface
intf[0].remove_vpp_config()
self.logger.info("ACLP_TEST_FINISH_0315")
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)